Spaces:
Runtime error
Runtime error
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session | |
import json | |
import random | |
import os | |
import string | |
import logging | |
from datetime import datetime | |
import uuid | |
from huggingface_hub import login, HfApi, hf_hub_download | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("app.log"), | |
logging.StreamHandler() | |
]) | |
logger = logging.getLogger(__name__) | |
# Use the Hugging Face token from environment variables | |
hf_token = os.environ.get("HF_TOKEN") | |
if hf_token: | |
login(token=hf_token) | |
else: | |
logger.error("HF_TOKEN not found in environment variables") | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key | |
# Directories for visualizations | |
VISUALIZATION_DIRS = { | |
"No-XAI": "htmls_NO_XAI", | |
"Dater": "htmls_DATER", | |
"Chain-of-Table": "htmls_COT", | |
"Plan-of-SQLs": "htmls_POS" | |
} | |
def generate_session_id(): | |
return str(uuid.uuid4()) | |
def save_session_data(session_id, data): | |
try: | |
file_name = f'{session_id}_session.json' | |
json_data = json.dumps(data, indent=4) | |
temp_file_path = f"/tmp/{file_name}" | |
with open(temp_file_path, 'w') as f: | |
f.write(json_data) | |
api = HfApi() | |
api.upload_file( | |
path_or_fileobj=temp_file_path, | |
path_in_repo=f"session_data_pref/{file_name}", | |
repo_id="luulinh90s/Tabular-LLM-Study-Data", | |
repo_type="space", | |
) | |
os.remove(temp_file_path) | |
logger.info(f"Session data saved for session ID {session_id} in Hugging Face Data Space") | |
except Exception as e: | |
logger.exception(f"Error saving session data for session ID {session_id}: {e}") | |
def load_session_data(session_id): | |
try: | |
file_name = f'{session_id}_session.json' | |
file_path = hf_hub_download( | |
repo_id="luulinh90s/Tabular-LLM-Study-Data", | |
repo_type="space", | |
filename=f"session_data_pref/{file_name}" | |
) | |
with open(file_path, 'r') as f: | |
data = json.load(f) | |
logger.info(f"Session data loaded for session ID {session_id} from Hugging Face Data Space") | |
return data | |
except Exception as e: | |
logger.exception(f"Error loading session data for session ID {session_id}: {e}") | |
return None | |
def load_samples(methods): | |
logger.info(f"Loading samples for methods: {methods}") | |
samples = set() | |
categories = ["TP", "TN", "FP", "FN"] | |
for category in categories: | |
files_a = set(os.listdir(f'{VISUALIZATION_DIRS[methods[0]]}/{category}')) | |
files_b = set(os.listdir(f'{VISUALIZATION_DIRS[methods[1]]}/{category}')) | |
matching_files = files_a & files_b | |
for file in matching_files: | |
samples.add((category, file)) | |
samples = [{'category': category, 'file': file} for category, file in samples] | |
logger.info(f"Loaded {len(samples)} unique samples across all categories") | |
return samples | |
def select_balanced_samples(samples): | |
try: | |
unique_samples = list({(s['category'], s['file']) for s in samples}) | |
if len(unique_samples) < 10: | |
logger.warning(f"Not enough unique samples. Only {len(unique_samples)} available.") | |
selected_samples = unique_samples | |
else: | |
selected_samples = random.sample(unique_samples, 10) | |
selected_samples = [{'category': category, 'file': file} for category, file in selected_samples] | |
logger.info(f"Selected {len(selected_samples)} unique samples") | |
return selected_samples | |
except Exception as e: | |
logger.exception("Error selecting balanced samples") | |
return [] | |
def index(): | |
if request.method == 'POST': | |
username = request.form.get('username') | |
seed = request.form.get('seed') | |
methods = request.form.get('method').split(',') | |
if not username or not seed or len(methods) != 2: | |
return "Please fill in all fields and select exactly two methods.", 400 | |
try: | |
seed_int = int(seed) | |
random.seed(seed_int) | |
all_samples = load_samples(methods) | |
selected_samples = select_balanced_samples(all_samples) | |
if len(selected_samples) == 0: | |
return "No samples were selected", 500 | |
session_id = generate_session_id() | |
start_time = datetime.now().isoformat() | |
session_data = { | |
'session_id': session_id, | |
'username': username, | |
'seed': seed, | |
'methods': methods, | |
'selected_samples': selected_samples, | |
'current_index': 0, | |
'responses': [], | |
'start_time': start_time | |
} | |
save_session_data(session_id, session_data) | |
logger.info(f"New session initialized for user: {username}, session ID: {session_id}") | |
return redirect(url_for('experiment', session_id=session_id)) | |
except Exception as e: | |
logger.exception(f"Error in index route: {e}") | |
return "An error occurred", 500 | |
else: | |
return render_template('index.html') | |
def experiment(session_id): | |
try: | |
session_data = load_session_data(session_id) | |
if not session_data: | |
logger.error(f"No session data found for session ID: {session_id}") | |
return redirect(url_for('index')) | |
selected_samples = session_data['selected_samples'] | |
methods = session_data['methods'] | |
current_index = session_data['current_index'] | |
if current_index >= len(selected_samples): | |
return redirect(url_for('completed', session_id=session_id)) | |
sample = selected_samples[current_index] | |
method_a, method_b = methods | |
file_a = os.path.join(VISUALIZATION_DIRS[method_a], sample['category'], sample['file']) | |
file_b = os.path.join(VISUALIZATION_DIRS[method_b], sample['category'], sample['file']) | |
if not os.path.exists(file_a) or not os.path.exists(file_b): | |
logger.error(f"Missing files for comparison at index {current_index}") | |
session_data['current_index'] += 1 | |
save_session_data(session_id, session_data) | |
return redirect(url_for('experiment', session_id=session_id)) | |
visualization_a = url_for('send_visualization', filename=file_a) | |
visualization_b = url_for('send_visualization', filename=file_b) | |
statement = """ | |
Please note that in select row function, starting index is 0 for Chain-of-Table 1 for Dater and Index * represents the selection of the whole Table. | |
You are now given two explanations that describe the reasoning process of the Table QA model. | |
Please analyze the explanations and determine which one provides a clearer and more accurate reasoning process. | |
""" | |
return render_template('experiment.html', | |
sample_id=current_index, | |
statement=statement, | |
visualization_a=visualization_a, | |
visualization_b=visualization_b, | |
method_a=method_a, | |
method_b=method_b, | |
session_id=session_id) | |
except Exception as e: | |
logger.exception(f"An error occurred in the experiment route: {e}") | |
return "An error occurred", 500 | |
def feedback(): | |
try: | |
session_id = request.form['session_id'] | |
feedback = request.form['feedback'] | |
session_data = load_session_data(session_id) | |
if not session_data: | |
logger.error(f"No session data found for session ID: {session_id}") | |
return redirect(url_for('index')) | |
session_data['responses'].append({ | |
'sample_id': session_data['current_index'], | |
'preferred_method': feedback, | |
'timestamp': datetime.now().isoformat() | |
}) | |
session_data['current_index'] += 1 | |
save_session_data(session_id, session_data) | |
logger.info(f"Feedback saved for session {session_id}, sample {session_data['current_index'] - 1}") | |
if session_data['current_index'] >= len(session_data['selected_samples']): | |
return redirect(url_for('completed', session_id=session_id)) | |
return redirect(url_for('experiment', session_id=session_id)) | |
except Exception as e: | |
logger.exception(f"Error in feedback route: {e}") | |
return "An error occurred", 500 | |
def completed(session_id): | |
try: | |
session_data = load_session_data(session_id) | |
if not session_data: | |
logger.error(f"No session data found for session ID: {session_id}") | |
return redirect(url_for('index')) | |
session_data['end_time'] = datetime.now().isoformat() | |
methods = session_data['methods'] | |
responses = session_data['responses'] | |
preferences = {method: 0 for method in methods} | |
total_responses = len(responses) | |
for response in responses: | |
preferred_method = response['preferred_method'] | |
preferences[preferred_method] += 1 | |
for method in preferences: | |
preferences[method] = round((preferences[method] / total_responses) * 100, 2) | |
session_data['preferences'] = preferences | |
save_session_data(session_id, session_data) | |
return render_template('completed.html', preferences=preferences) | |
except Exception as e: | |
logger.exception(f"An error occurred in the completed route: {e}") | |
return "An error occurred", 500 | |
def introduction(): | |
return render_template('introduction.html') | |
def attribution(): | |
return render_template('attribution.html') | |
def send_visualization(filename): | |
logger.info(f"Attempting to serve file: {filename}") | |
base_dir = os.getcwd() | |
file_path = os.path.normpath(os.path.join(base_dir, filename)) | |
if not file_path.startswith(base_dir): | |
return "Access denied", 403 | |
if not os.path.exists(file_path): | |
return "File not found", 404 | |
directory = os.path.dirname(file_path) | |
file_name = os.path.basename(file_path) | |
logger.info(f"Serving file from directory: {directory}, filename: {file_name}") | |
return send_from_directory(directory, file_name) | |
if __name__ == "__main__": | |
os.makedirs('session_data', exist_ok=True) | |
app.run(host="0.0.0.0", port=7860, debug=True) |