|
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session |
|
import json |
|
import random |
|
import os |
|
import string |
|
import logging |
|
from datetime import datetime |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("app.log"), |
|
logging.StreamHandler() |
|
]) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = Flask(__name__) |
|
app.config['SECRET_KEY'] = 'supersecretkey' |
|
|
|
|
|
VISUALIZATION_DIRS_PLAN_OF_SQLS = { |
|
"TP": "htmls_POS/TP", |
|
"TN": "htmls_POS/TN", |
|
"FP": "htmls_POS/FP", |
|
"FN": "htmls_POS/FN" |
|
} |
|
|
|
VISUALIZATION_DIRS_CHAIN_OF_TABLE = { |
|
"TP": "htmls_COT/TP", |
|
"TN": "htmls_COT/TN", |
|
"FP": "htmls_COT/FP", |
|
"FN": "htmls_COT/FN" |
|
} |
|
|
|
|
|
def save_session_data(username, data): |
|
try: |
|
base_dir = os.path.dirname(os.path.abspath(__file__)) |
|
session_dir = os.path.join(base_dir, 'session_data') |
|
os.makedirs(session_dir, exist_ok=True) |
|
|
|
file_path = os.path.join(session_dir, f'{username}_session.json') |
|
|
|
with open(file_path, 'w') as f: |
|
json.dump(data, f, indent=4) |
|
|
|
logger.info(f"Session data saved for user {username} at {file_path}") |
|
except Exception as e: |
|
logger.exception(f"Error saving session data for user {username}: {e}") |
|
|
|
|
|
|
|
def load_session_data(username): |
|
try: |
|
base_dir = os.path.dirname(os.path.abspath(__file__)) |
|
file_path = os.path.join(base_dir, 'session_data', f'{username}_session.json') |
|
|
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
|
|
logger.info(f"Session data loaded for user {username} from {file_path}") |
|
return data |
|
except FileNotFoundError: |
|
logger.warning(f"No session data found for user {username}") |
|
return None |
|
except Exception as e: |
|
logger.exception(f"Error loading session data for user {username}: {e}") |
|
return None |
|
|
|
|
|
def load_samples(method): |
|
logger.info(f"Loading samples for method: {method}") |
|
if method == "Chain-of-Table": |
|
visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE |
|
else: |
|
visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS |
|
|
|
samples = {"TP": [], "TN": [], "FP": [], "FN": []} |
|
for category, dir_path in visualization_dirs.items(): |
|
try: |
|
for filename in os.listdir(dir_path): |
|
if filename.endswith(".html"): |
|
samples[category].append(filename) |
|
logger.info(f"Loaded {len(samples[category])} samples for category {category}") |
|
except Exception as e: |
|
logger.exception(f"Error loading samples from {dir_path}: {e}") |
|
return samples |
|
|
|
|
|
def select_balanced_samples(samples): |
|
try: |
|
tp_fp_samples = random.sample(samples["TP"] + samples["FP"], 5) |
|
tn_fn_samples = random.sample(samples["TN"] + samples["FN"], 5) |
|
logger.info(f"Selected balanced samples: {len(tp_fp_samples + tn_fn_samples)}") |
|
return tp_fp_samples + tn_fn_samples |
|
except Exception as e: |
|
logger.exception("Error selecting balanced samples") |
|
return [] |
|
|
|
def generate_random_string(length=8): |
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) |
|
|
|
@app.route('/', methods=['GET', 'POST']) |
|
def index(): |
|
logger.info("Rendering index page.") |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
seed = request.form.get('seed') |
|
method = request.form.get('method') |
|
|
|
if not username or not seed or not method: |
|
logger.error("Missing username, seed, or method.") |
|
return "Missing username, seed, or method", 400 |
|
|
|
try: |
|
seed = int(seed) |
|
random.seed(seed) |
|
all_samples = load_samples(method) |
|
selected_samples = select_balanced_samples(all_samples) |
|
logger.info(f"Number of selected samples: {len(selected_samples)}") |
|
|
|
if len(selected_samples) == 0: |
|
logger.error("No samples were selected.") |
|
return "No samples were selected", 500 |
|
|
|
filename = f'{username}_{seed}_{method}_{generate_random_string()}.json' |
|
logger.info(f"Generated filename: {filename}") |
|
|
|
|
|
session_data = { |
|
'responses': [], |
|
'username': username, |
|
'selected_samples': selected_samples, |
|
'method': method, |
|
'filename': filename, |
|
'current_index': 0 |
|
} |
|
save_session_data(username, session_data) |
|
logger.info(f"Session data saved for user: {username}") |
|
|
|
return redirect(url_for('experiment', username=username)) |
|
except Exception as e: |
|
logger.exception(f"Error in index route: {e}") |
|
return "An error occurred", 500 |
|
return render_template('index.html') |
|
|
|
@app.route('/experiment/<username>', methods=['GET', 'POST']) |
|
def experiment(username): |
|
try: |
|
session_data = load_session_data(username) |
|
if not session_data: |
|
logger.error(f"No session data found for user: {username}") |
|
return redirect(url_for('index')) |
|
|
|
selected_samples = session_data['selected_samples'] |
|
method = session_data['method'] |
|
current_index = session_data['current_index'] |
|
|
|
if current_index >= len(selected_samples): |
|
return redirect(url_for('completed', username=username)) |
|
|
|
visualization_file = selected_samples[current_index] |
|
|
|
if method == "Chain-of-Table": |
|
vis_dir = 'htmls_COT' |
|
else: |
|
vis_dir = 'htmls_POS' |
|
|
|
|
|
for category, dir_path in (VISUALIZATION_DIRS_CHAIN_OF_TABLE if method == "Chain-of-Table" else VISUALIZATION_DIRS_PLAN_OF_SQLS).items(): |
|
if visualization_file in os.listdir(dir_path): |
|
visualization_path = os.path.join(vis_dir, category, visualization_file) |
|
break |
|
else: |
|
logger.error(f"Visualization file {visualization_file} not found.") |
|
return "Visualization file not found", 404 |
|
|
|
logger.info(f"Rendering experiment page with visualization: {visualization_path}") |
|
|
|
|
|
statement = """ |
|
A Table Question Answering model is working on Table Fact Verification task (TabFact dataset), verifying if a given Statement is TRUE or FALSE on a given input Table. |
|
You are given an explanation that describes the reasoning process of the Table QA model. |
|
Please carefully analyze the explanation and determine whether you should Accept or Reject the AI prediction. |
|
""" |
|
|
|
return render_template('experiment.html', |
|
sample_id=current_index, |
|
statement=statement, |
|
visualization=url_for('send_visualization', filename=visualization_path), |
|
username=username) |
|
except Exception as e: |
|
logger.exception(f"An error occurred in the experiment route: {e}") |
|
return "An error occurred", 500 |
|
|
|
@app.route('/feedback', methods=['POST']) |
|
def feedback(): |
|
try: |
|
username = request.form['username'] |
|
feedback = request.form['feedback'] |
|
|
|
session_data = load_session_data(username) |
|
if not session_data: |
|
logger.error(f"No session data found for user: {username}") |
|
return redirect(url_for('index')) |
|
|
|
|
|
session_data['responses'].append({ |
|
'sample_id': session_data['current_index'], |
|
'feedback': feedback |
|
}) |
|
|
|
|
|
session_data['current_index'] += 1 |
|
|
|
|
|
save_session_data(username, session_data) |
|
logger.info(f"Feedback saved for user {username}, sample {session_data['current_index'] - 1}") |
|
|
|
if session_data['current_index'] >= len(session_data['selected_samples']): |
|
return redirect(url_for('completed', username=username)) |
|
|
|
return redirect(url_for('experiment', username=username)) |
|
except Exception as e: |
|
logger.exception(f"Error in feedback route: {e}") |
|
return "An error occurred", 500 |
|
|
|
@app.route('/completed/<username>') |
|
def completed(username): |
|
try: |
|
session_data = load_session_data(username) |
|
if not session_data: |
|
logger.error(f"No session data found for user: {username}") |
|
return redirect(url_for('index')) |
|
|
|
responses = session_data['responses'] |
|
method = session_data['method'] |
|
|
|
if method == "Chain-of-Table": |
|
json_file = 'Tabular_LLMs_human_study_vis_6_COT.json' |
|
else: |
|
json_file = 'Tabular_LLMs_human_study_vis_6_POS.json' |
|
|
|
with open(json_file, 'r') as f: |
|
ground_truth = json.load(f) |
|
|
|
correct_responses = 0 |
|
accept_count = 0 |
|
reject_count = 0 |
|
|
|
for response in responses: |
|
sample_id = response['sample_id'] |
|
feedback = response['feedback'] |
|
visualization_file = session_data['selected_samples'][sample_id] |
|
index = visualization_file.split('-')[1].split('.')[0] |
|
|
|
if feedback.upper() == "TRUE": |
|
accept_count += 1 |
|
elif feedback.upper() == "FALSE": |
|
reject_count += 1 |
|
|
|
if method == "Chain-of-Table": |
|
ground_truth_key = f"COT_test-{index}.html" |
|
else: |
|
ground_truth_key = f"POS_test-{index}.html" |
|
|
|
if ground_truth_key in ground_truth and ground_truth[ground_truth_key]['answer'].upper() == feedback.upper(): |
|
correct_responses += 1 |
|
else: |
|
logger.warning(f"Missing or mismatched key: {ground_truth_key}") |
|
|
|
accuracy = (correct_responses / len(responses)) * 100 if responses else 0 |
|
accuracy = round(accuracy, 2) |
|
|
|
accept_percentage = (accept_count / len(responses)) * 100 if len(responses) else 0 |
|
reject_percentage = (reject_count / len(responses)) * 100 if len(responses) else 0 |
|
|
|
accept_percentage = round(accept_percentage, 2) |
|
reject_percentage = round(reject_percentage, 2) |
|
|
|
return render_template('completed.html', |
|
accuracy=accuracy, |
|
accept_percentage=accept_percentage, |
|
reject_percentage=reject_percentage) |
|
except Exception as e: |
|
logger.exception(f"An error occurred in the completed route: {e}") |
|
return "An error occurred", 500 |
|
|
|
|
|
@app.route('/visualizations/<path:filename>') |
|
def send_visualization(filename): |
|
logger.info(f"Attempting to serve file: {filename}") |
|
|
|
base_dir = os.getcwd() |
|
file_path = os.path.normpath(os.path.join(base_dir, filename)) |
|
if not file_path.startswith(base_dir): |
|
return "Access denied", 403 |
|
|
|
if not os.path.exists(file_path): |
|
return "File not found", 404 |
|
|
|
directory = os.path.dirname(file_path) |
|
file_name = os.path.basename(file_path) |
|
logger.info(f"Serving file from directory: {directory}, filename: {file_name}") |
|
return send_from_directory(directory, file_name) |
|
|
|
|
|
if __name__ == "__main__": |
|
os.makedirs('session_data', exist_ok=True) |
|
app.run(host="0.0.0.0", port=7860, debug=True) |