|
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session |
|
import json |
|
import random |
|
import os |
|
import string |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("app.log"), |
|
logging.StreamHandler() |
|
]) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = Flask(__name__) |
|
app.config['SECRET_KEY'] = 'supersecretkey' |
|
|
|
|
|
VISUALIZATION_DIRS_PLAN_OF_SQLS = { |
|
"TP": "visualizations/TP", |
|
"TN": "visualizations/TN", |
|
"FP": "visualizations/FP", |
|
"FN": "visualizations/FN" |
|
} |
|
|
|
VISUALIZATION_DIRS_CHAIN_OF_TABLE = { |
|
"TP": "htmls_COT/TP", |
|
"TN": "htmls_COT/TN", |
|
"FP": "htmls_COT/FP", |
|
"FN": "htmls_COT/FN" |
|
} |
|
|
|
|
|
def load_samples(method): |
|
logger.info(f"Loading samples for method: {method}") |
|
if method == "Chain-of-Table": |
|
visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE |
|
else: |
|
visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS |
|
|
|
samples = {"TP": [], "TN": [], "FP": [], "FN": []} |
|
for category, dir_path in visualization_dirs.items(): |
|
try: |
|
for filename in os.listdir(dir_path): |
|
if filename.endswith(".html"): |
|
samples[category].append(filename) |
|
logger.info(f"Loaded {len(samples[category])} samples for category {category}") |
|
except Exception as e: |
|
logger.exception(f"Error loading samples from {dir_path}: {e}") |
|
return samples |
|
|
|
|
|
def select_balanced_samples(samples): |
|
try: |
|
tp_fp_samples = random.sample(samples["TP"] + samples["FP"], 5) |
|
tn_fn_samples = random.sample(samples["TN"] + samples["FN"], 5) |
|
logger.info(f"Selected balanced samples: {len(tp_fp_samples + tn_fn_samples)}") |
|
return tp_fp_samples + tn_fn_samples |
|
except Exception as e: |
|
logger.exception("Error selecting balanced samples") |
|
return [] |
|
|
|
def generate_random_string(length=8): |
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) |
|
|
|
@app.route('/', methods=['GET', 'POST']) |
|
def index(): |
|
logger.info("Rendering index page.") |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
seed = request.form.get('seed') |
|
method = request.form.get('method') |
|
|
|
if not username or not seed or not method: |
|
logger.error("Missing username, seed, or method.") |
|
return "Missing username, seed, or method", 400 |
|
|
|
try: |
|
|
|
with open(f'session_data/method_{username}.txt', 'w') as f: |
|
f.write(method) |
|
|
|
seed = int(seed) |
|
random.seed(seed) |
|
all_samples = load_samples(method) |
|
selected_samples = select_balanced_samples(all_samples) |
|
logger.info(f"Number of selected samples: {len(selected_samples)}") |
|
if len(selected_samples) == 0: |
|
logger.error("No samples were selected.") |
|
return "No samples were selected", 500 |
|
|
|
random_string = generate_random_string() |
|
filename = f'{username}_{seed}_{method}_{random_string}.json' |
|
|
|
logger.info(f"Generated filename: {filename}") |
|
|
|
|
|
os.makedirs('session_data', exist_ok=True) |
|
with open(f'session_data/{filename}', 'w') as f: |
|
json.dump(selected_samples, f) |
|
|
|
session['responses'] = [] |
|
session['username'] = username |
|
|
|
return redirect(url_for('experiment', username=username, sample_index=0, seed=seed, filename=filename)) |
|
except Exception as e: |
|
logger.exception(f"Error in index route: {e}") |
|
return "An error occurred", 500 |
|
return render_template('index.html') |
|
|
|
@app.route('/experiment/<username>/<sample_index>/<seed>/<filename>', methods=['GET']) |
|
def experiment(username, sample_index, seed, filename): |
|
try: |
|
sample_index = int(sample_index) |
|
|
|
|
|
with open(f'session_data/{filename}', 'r') as f: |
|
selected_samples = json.load(f) |
|
|
|
|
|
method_file_path = f'session_data/method_{username}.txt' |
|
if os.path.exists(method_file_path): |
|
with open(method_file_path, 'r') as f: |
|
method = f.read().strip() |
|
else: |
|
logger.error(f"Method file not found for user {username}.") |
|
return "Method file not found", 500 |
|
|
|
if sample_index >= len(selected_samples): |
|
logger.error(f"Sample index {sample_index} exceeds the number of selected samples {len(selected_samples)}.") |
|
return redirect(url_for('completed', filename=filename)) |
|
|
|
visualization_file = selected_samples[sample_index] |
|
visualization_path = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if method == "Chain-of-Table": |
|
visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE |
|
vis_dir = 'htmls_COT' |
|
else: |
|
visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS |
|
vis_dir = 'visualizations' |
|
|
|
logger.info(f"Checking directories for method: {method}") |
|
|
|
|
|
for category, dir_path in visualization_dirs.items(): |
|
absolute_dir_path = os.path.abspath(dir_path) |
|
logger.info(f"Checking directory: {absolute_dir_path} for file: {visualization_file}") |
|
if visualization_file.strip() in os.listdir(absolute_dir_path): |
|
visualization_path = os.path.join(vis_dir, category, visualization_file) |
|
break |
|
|
|
if not visualization_path: |
|
logger.error( |
|
f"Visualization file {visualization_file} not found. Searched path: {absolute_dir_path}/{visualization_file}") |
|
return "Visualization file not found", 404 |
|
|
|
logger.info(f"Rendering experiment page with visualization: {visualization_path}") |
|
|
|
statement = "Please make a decision to Accept/Reject the AI prediction based on the explanation." |
|
return render_template('experiment.html', |
|
sample_id=sample_index, |
|
statement=statement, |
|
visualization=visualization_path, |
|
username=username, |
|
seed=seed, |
|
sample_index=sample_index, |
|
filename=filename) |
|
except Exception as e: |
|
logger.exception(f"An error occurred in the experiment route: {e}") |
|
return "An error occurred", 500 |
|
|
|
@app.route('/feedback', methods=['POST']) |
|
def feedback(): |
|
try: |
|
sample_id = request.form['sample_id'] |
|
feedback = request.form['feedback'] |
|
username = request.form['username'] |
|
seed = request.form['seed'] |
|
sample_index = int(request.form['sample_index']) |
|
filename = request.form['filename'] |
|
|
|
|
|
with open(f'session_data/{filename}', 'r') as f: |
|
selected_samples = json.load(f) |
|
|
|
responses = session.get('responses', []) |
|
|
|
responses.append({ |
|
'sample_id': sample_id, |
|
'feedback': feedback |
|
}) |
|
session['responses'] = responses |
|
|
|
result_dir = 'human_study' |
|
os.makedirs(result_dir, exist_ok=True) |
|
|
|
filepath = os.path.join(result_dir, filename) |
|
if os.path.exists(filepath): |
|
with open(filepath, 'r') as f: |
|
data = json.load(f) |
|
else: |
|
data = {} |
|
|
|
data[sample_index] = { |
|
'Username': username, |
|
'Seed': seed, |
|
'Sample ID': sample_id, |
|
'Task': "Please make a decision to Accept/Reject the AI prediction based on the explanation.", |
|
'User Feedback': feedback |
|
} |
|
|
|
with open(filepath, 'w') as f: |
|
json.dump(data, f, indent=4) |
|
|
|
logger.info(f"Feedback saved for sample {sample_id}") |
|
|
|
next_sample_index = sample_index + 1 |
|
if next_sample_index >= len(selected_samples): |
|
return redirect(url_for('completed', filename=filename)) |
|
|
|
return redirect( |
|
url_for('experiment', username=username, sample_index=next_sample_index, seed=seed, filename=filename)) |
|
except Exception as e: |
|
logger.exception(f"Error in feedback route: {e}") |
|
return "An error occurred", 500 |
|
|
|
@app.route('/completed/<filename>') |
|
def completed(filename): |
|
try: |
|
responses = session.get('responses', []) |
|
username = session.get('username') |
|
|
|
|
|
method_file_path = f'session_data/method_{username}.txt' |
|
if os.path.exists(method_file_path): |
|
with open(method_file_path, 'r') as f: |
|
method = f.read().strip() |
|
os.remove(method_file_path) |
|
else: |
|
logger.error("Method file not found.") |
|
return "Method file not found", 500 |
|
|
|
if method == "Chain-of-Table": |
|
json_file = 'Tabular_LLMs_human_study_vis_6_COT.json' |
|
else: |
|
json_file = 'Tabular_LLMs_human_study_vis_6.json' |
|
|
|
with open(json_file, 'r') as f: |
|
ground_truth = json.load(f) |
|
|
|
correct_responses = 0 |
|
accept_count = 0 |
|
reject_count = 0 |
|
|
|
for response in responses: |
|
sample_id = response['sample_id'] |
|
feedback = response['feedback'] |
|
index = sample_id.split('-')[1].split('.')[0] |
|
|
|
if feedback.upper() == "TRUE": |
|
accept_count += 1 |
|
elif feedback.upper() == "FALSE": |
|
reject_count += 1 |
|
|
|
if method == "Chain-of-Table": |
|
ground_truth_key = f"COT_test-{index}.html" |
|
else: |
|
ground_truth_key = f"POS_test-{index}.html" |
|
|
|
if ground_truth_key in ground_truth and ground_truth[ground_truth_key]['answer'].upper() == feedback.upper(): |
|
correct_responses += 1 |
|
else: |
|
logger.warning(f"Missing or mismatched key: {ground_truth_key}") |
|
|
|
accuracy = (correct_responses / len(responses)) * 100 if responses else 0 |
|
accuracy = round(accuracy, 2) |
|
|
|
accept_percentage = (accept_count / len(responses)) * 100 if len(responses) else 0 |
|
reject_percentage = (reject_count / len(responses)) * 100 if len(responses) else 0 |
|
|
|
accept_percentage = round(accept_percentage, 2) |
|
reject_percentage = round(reject_percentage, 2) |
|
|
|
return render_template('completed.html', |
|
accuracy=accuracy, |
|
accept_percentage=accept_percentage, |
|
reject_percentage=reject_percentage) |
|
except Exception as e: |
|
logger.exception(f"Error in completed route: {e}") |
|
return "An error occurred", 500 |
|
|
|
if __name__ == "__main__": |
|
os.makedirs('session_data', exist_ok=True) |
|
app.run(host="0.0.0.0", port=7860) |
|
|