|
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session |
|
import json |
|
import random |
|
import os |
|
import string |
|
import logging |
|
from datetime import datetime |
|
from huggingface_hub import login, HfApi, hf_hub_download |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("app.log"), |
|
logging.StreamHandler() |
|
]) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
if hf_token: |
|
login(token=hf_token) |
|
else: |
|
logger.error("HF_TOKEN not found in environment variables") |
|
|
|
app = Flask(__name__) |
|
app.config['SECRET_KEY'] = 'supersecretkey' |
|
|
|
|
|
VISUALIZATION_DIRS = { |
|
"No-XAI": "htmls_NO_XAI_mod", |
|
"Dater": "htmls_DATER_mod2", |
|
"Chain-of-Table": "htmls_COT_mod", |
|
"Plan-of-SQLs": "htmls_POS_mod2" |
|
} |
|
|
|
def get_method_dir(method): |
|
if method == 'No-XAI': |
|
return 'NO_XAI' |
|
elif method == 'Dater': |
|
return 'DATER' |
|
elif method == 'Chain-of-Table': |
|
return 'COT' |
|
elif method == 'Plan-of-SQLs': |
|
return 'POS' |
|
else: |
|
return None |
|
|
|
METHODS = ["No-XAI", "Dater", "Chain-of-Table", "Plan-of-SQLs"] |
|
|
|
|
|
def save_session_data(username, data, is_update=False): |
|
try: |
|
seed = data.get('seed', 'unknown') |
|
start_time = data.get('start_time', datetime.now().isoformat()) |
|
file_name = f'{username}_seed{seed}_{start_time}_session.json' |
|
file_name = "".join(c for c in file_name if c.isalnum() or c in ['_', '-', '.']) |
|
|
|
json_data = json.dumps(data, indent=4) |
|
temp_file_path = f"/tmp/{file_name}" |
|
with open(temp_file_path, 'w') as f: |
|
f.write(json_data) |
|
|
|
api = HfApi() |
|
repo_path = "session_data_foward_simulation" |
|
|
|
if is_update: |
|
|
|
existing_files = api.list_repo_files(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space") |
|
existing_file = next( |
|
(f for f in existing_files if f.startswith(f'{repo_path}/{username}_seed{seed}_{start_time}')), None) |
|
if existing_file: |
|
api.delete_file(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space", |
|
path_in_repo=existing_file) |
|
|
|
api.upload_file( |
|
path_or_fileobj=temp_file_path, |
|
path_in_repo=f"{repo_path}/{file_name}", |
|
repo_id="luulinh90s/Tabular-LLM-Study-Data", |
|
repo_type="space", |
|
) |
|
os.remove(temp_file_path) |
|
logger.info( |
|
f"Session data {'updated' if is_update else 'saved'} for user {username} in Hugging Face Data Space") |
|
except Exception as e: |
|
logger.exception(f"Error {'updating' if is_update else 'saving'} session data for user {username}: {e}") |
|
|
|
|
|
def load_session_data(username): |
|
try: |
|
api = HfApi() |
|
files = api.list_repo_files(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space") |
|
user_files = [f for f in files if |
|
f.startswith(f'session_data_foward_simulation/{username}_') and f.endswith('_session.json')] |
|
if not user_files: |
|
logger.warning(f"No session data found for user {username}") |
|
return None |
|
latest_file = sorted(user_files, reverse=True)[0] |
|
file_path = hf_hub_download(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space", |
|
filename=latest_file) |
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
logger.info(f"Session data loaded for user {username} from Hugging Face Data Space") |
|
return data |
|
except Exception as e: |
|
logger.exception(f"Error loading session data for user {username}: {e}") |
|
return None |
|
|
|
def load_samples(): |
|
common_samples = [] |
|
categories = ["TP", "TN", "FP", "FN"] |
|
|
|
for category in categories: |
|
files = set(os.listdir(f'htmls_NO_XAI_mod/{category}')) |
|
for method in ["Dater", "Chain-of-Table", "Plan-of-SQLs"]: |
|
method_dir = VISUALIZATION_DIRS[method] |
|
files &= set(os.listdir(f'{method_dir}/{category}')) |
|
|
|
for file in files: |
|
common_samples.append({'category': category, 'file': file}) |
|
|
|
logger.info(f"Found {len(common_samples)} common samples across all methods") |
|
return common_samples |
|
|
|
def select_balanced_samples(samples): |
|
try: |
|
if len(samples) < 10: |
|
logger.warning(f"Not enough common samples. Only {len(samples)} available.") |
|
return samples |
|
|
|
selected_samples = random.sample(samples, 10) |
|
logger.info(f"Selected 10 unique samples") |
|
return selected_samples |
|
except Exception as e: |
|
logger.exception("Error selecting balanced samples") |
|
return [] |
|
|
|
@app.route('/', methods=['GET', 'POST']) |
|
def index(): |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
seed = request.form.get('seed') |
|
method = request.form.get('method') |
|
if not username or not seed or not method: |
|
return "Please fill in all fields and select a method.", 400 |
|
try: |
|
seed = int(seed) |
|
random.seed(seed) |
|
all_samples = load_samples() |
|
selected_samples = select_balanced_samples(all_samples) |
|
if len(selected_samples) == 0: |
|
return "No common samples were found", 500 |
|
start_time = datetime.now().isoformat() |
|
session_data = { |
|
'username': username, |
|
'seed': str(seed), |
|
'method': method, |
|
'selected_samples': selected_samples, |
|
'current_index': 0, |
|
'responses': [], |
|
'start_time': start_time |
|
} |
|
save_session_data(username, session_data) |
|
return redirect(url_for('experiment', username=username)) |
|
except Exception as e: |
|
logger.exception(f"Error in index route: {e}") |
|
return "An error occurred", 500 |
|
return render_template('index.html') |
|
|
|
@app.route('/experiment/<username>', methods=['GET', 'POST']) |
|
def experiment(username): |
|
try: |
|
session_data = load_session_data(username) |
|
if not session_data: |
|
return redirect(url_for('index')) |
|
|
|
selected_samples = session_data['selected_samples'] |
|
method = session_data['method'] |
|
current_index = session_data['current_index'] |
|
|
|
if current_index >= len(selected_samples): |
|
return redirect(url_for('completed', username=username)) |
|
|
|
sample = selected_samples[current_index] |
|
visualization_dir = VISUALIZATION_DIRS[method] |
|
visualization_path = f"{visualization_dir}/{sample['category']}/{sample['file']}" |
|
|
|
statement = """ |
|
Please note that in select row function, starting index is 0 for Chain-of-Table 1 for Dater and Index * represents the selection of the whole Table. |
|
Based on the explanation provided, what do you think the AI model will predict? |
|
Will it predict the statement as TRUE or FALSE? |
|
""" |
|
|
|
return render_template('experiment.html', |
|
sample_id=current_index, |
|
statement=statement, |
|
visualization=url_for('send_visualization', filename=visualization_path), |
|
username=username, |
|
method=method) |
|
except Exception as e: |
|
logger.exception(f"An error occurred in the experiment route: {e}") |
|
return "An error occurred", 500 |
|
|
|
|
|
@app.route('/feedback', methods=['POST']) |
|
def feedback(): |
|
try: |
|
username = request.form['username'] |
|
prediction = request.form['prediction'] |
|
|
|
session_data = load_session_data(username) |
|
if not session_data: |
|
logger.error(f"No session data found for user: {username}") |
|
return redirect(url_for('index')) |
|
|
|
session_data['responses'].append({ |
|
'sample_id': session_data['current_index'], |
|
'user_prediction': prediction |
|
}) |
|
|
|
session_data['current_index'] += 1 |
|
save_session_data(username, session_data) |
|
logger.info(f"Prediction saved for user {username}, sample {session_data['current_index'] - 1}") |
|
|
|
if session_data['current_index'] >= len(session_data['selected_samples']): |
|
return redirect(url_for('completed', username=username)) |
|
|
|
return redirect(url_for('experiment', username=username)) |
|
except Exception as e: |
|
logger.exception(f"Error in feedback route: {e}") |
|
return "An error occurred", 500 |
|
|
|
@app.route('/completed/<username>') |
|
def completed(username): |
|
try: |
|
session_data = load_session_data(username) |
|
if not session_data: |
|
logger.error(f"No session data found for user: {username}") |
|
return redirect(url_for('index')) |
|
|
|
session_data['end_time'] = datetime.now().isoformat() |
|
responses = session_data['responses'] |
|
method = session_data['method'] |
|
|
|
if method == "Chain-of-Table": |
|
json_file = 'Tabular_LLMs_human_study_vis_6_COT.json' |
|
elif method == "Plan-of-SQLs": |
|
json_file = 'Tabular_LLMs_human_study_vis_6_POS.json' |
|
elif method == "Dater": |
|
json_file = 'Tabular_LLMs_human_study_vis_6_DATER.json' |
|
elif method == "No-XAI": |
|
json_file = 'Tabular_LLMs_human_study_vis_6_NO_XAI.json' |
|
else: |
|
return "Invalid method", 400 |
|
|
|
with open(json_file, 'r') as f: |
|
ground_truth = json.load(f) |
|
|
|
correct_predictions = 0 |
|
true_predictions = 0 |
|
false_predictions = 0 |
|
|
|
for response in responses: |
|
sample_id = response['sample_id'] |
|
user_prediction = response['user_prediction'] |
|
visualization_file = session_data['selected_samples'][sample_id]['file'] |
|
index = visualization_file.split('-')[1].split('.')[0] |
|
|
|
ground_truth_key = f"{get_method_dir(method)}_test-{index}.html" |
|
logger.info(f"ground_truth_key: {ground_truth_key}") |
|
|
|
if ground_truth_key in ground_truth: |
|
model_prediction = ground_truth[ground_truth_key]['answer'].upper() |
|
if user_prediction.upper() == model_prediction: |
|
correct_predictions += 1 |
|
|
|
if user_prediction.upper() == "TRUE": |
|
true_predictions += 1 |
|
elif user_prediction.upper() == "FALSE": |
|
false_predictions += 1 |
|
else: |
|
logger.warning(f"Missing key in ground truth: {ground_truth_key}") |
|
|
|
accuracy = (correct_predictions / len(responses)) * 100 if responses else 0 |
|
accuracy = round(accuracy, 2) |
|
|
|
true_percentage = (true_predictions / len(responses)) * 100 if len(responses) else 0 |
|
false_percentage = (false_predictions / len(responses)) * 100 if len(responses) else 0 |
|
|
|
true_percentage = round(true_percentage, 2) |
|
false_percentage = round(false_percentage, 2) |
|
|
|
session_data['accuracy'] = accuracy |
|
session_data['true_percentage'] = true_percentage |
|
session_data['false_percentage'] = false_percentage |
|
|
|
save_session_data(username, session_data, is_update=True) |
|
|
|
return render_template('completed.html', |
|
accuracy=accuracy, |
|
true_percentage=true_percentage, |
|
false_percentage=false_percentage) |
|
except Exception as e: |
|
logger.exception(f"An error occurred in the completed route: {e}") |
|
return "An error occurred", 500 |
|
|
|
@app.route('/visualizations/<path:filename>') |
|
def send_visualization(filename): |
|
logger.info(f"Attempting to serve file: {filename}") |
|
base_dir = os.getcwd() |
|
file_path = os.path.normpath(os.path.join(base_dir, filename)) |
|
if not file_path.startswith(base_dir): |
|
return "Access denied", 403 |
|
|
|
if not os.path.exists(file_path): |
|
return "File not found", 404 |
|
|
|
directory = os.path.dirname(file_path) |
|
file_name = os.path.basename(file_path) |
|
logger.info(f"Serving file from directory: {directory}, filename: {file_name}") |
|
return send_from_directory(directory, file_name) |
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=7860, debug=True) |