from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session import json import random import os import string import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ]) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key # Directories for visualizations VISUALIZATION_DIRS_PLAN_OF_SQLS = { "TP": "htmls_POS/TP", "TN": "htmls_POS/TN", "FP": "htmls_POS/FP", "FN": "htmls_POS/FN" } VISUALIZATION_DIRS_CHAIN_OF_TABLE = { "TP": "htmls_COT/TP", "TN": "htmls_COT/TN", "FP": "htmls_COT/FP", "FN": "htmls_COT/FN" } # Function to save session data to a file def save_session_data(username, data): os.makedirs('session_data', exist_ok=True) with open(f'session_data/{username}_session.json', 'w') as f: json.dump(data, f) # Function to load session data from a file def load_session_data(username): try: with open(f'session_data/{username}_session.json', 'r') as f: return json.load(f) except FileNotFoundError: return None # Load all sample files from the directories based on the selected method def load_samples(method): logger.info(f"Loading samples for method: {method}") if method == "Chain-of-Table": visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE else: visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS samples = {"TP": [], "TN": [], "FP": [], "FN": []} for category, dir_path in visualization_dirs.items(): try: for filename in os.listdir(dir_path): if filename.endswith(".html"): samples[category].append(filename) logger.info(f"Loaded {len(samples[category])} samples for category {category}") except Exception as e: logger.exception(f"Error loading samples from {dir_path}: {e}") return samples # Randomly select balanced samples def select_balanced_samples(samples): try: tp_fp_samples = random.sample(samples["TP"] + samples["FP"], 5) tn_fn_samples = random.sample(samples["TN"] + samples["FN"], 5) logger.info(f"Selected balanced samples: {len(tp_fp_samples + tn_fn_samples)}") return tp_fp_samples + tn_fn_samples except Exception as e: logger.exception("Error selecting balanced samples") return [] def generate_random_string(length=8): return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) @app.route('/', methods=['GET', 'POST']) def index(): logger.info("Rendering index page.") if request.method == 'POST': username = request.form.get('username') seed = request.form.get('seed') method = request.form.get('method') if not username or not seed or not method: logger.error("Missing username, seed, or method.") return "Missing username, seed, or method", 400 try: seed = int(seed) random.seed(seed) all_samples = load_samples(method) selected_samples = select_balanced_samples(all_samples) logger.info(f"Number of selected samples: {len(selected_samples)}") if len(selected_samples) == 0: logger.error("No samples were selected.") return "No samples were selected", 500 filename = f'{username}_{seed}_{method}_{generate_random_string()}.json' logger.info(f"Generated filename: {filename}") # Save session data session_data = { 'responses': [], 'username': username, 'selected_samples': selected_samples, 'method': method, 'filename': filename, 'current_index': 0 } save_session_data(username, session_data) logger.info(f"Session data saved for user: {username}") return redirect(url_for('experiment', username=username)) except Exception as e: logger.exception(f"Error in index route: {e}") return "An error occurred", 500 return render_template('index.html') @app.route('/experiment/', methods=['GET', 'POST']) def experiment(username): try: session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) selected_samples = session_data['selected_samples'] method = session_data['method'] current_index = session_data['current_index'] if current_index >= len(selected_samples): return redirect(url_for('completed', username=username)) visualization_file = selected_samples[current_index] if method == "Chain-of-Table": vis_dir = 'htmls_COT' else: vis_dir = 'htmls_POS' # Determine the correct visualization directory based on the category for category, dir_path in (VISUALIZATION_DIRS_CHAIN_OF_TABLE if method == "Chain-of-Table" else VISUALIZATION_DIRS_PLAN_OF_SQLS).items(): if visualization_file in os.listdir(dir_path): visualization_path = os.path.join(vis_dir, category, visualization_file) break else: logger.error(f"Visualization file {visualization_file} not found.") return "Visualization file not found", 404 logger.info(f"Rendering experiment page with visualization: {visualization_path}") statement = "Please make a decision to Accept/Reject the AI prediction based on the explanation." return render_template('experiment.html', sample_id=current_index, statement=statement, visualization=url_for('send_visualization', filename=visualization_path), username=username) except Exception as e: logger.exception(f"An error occurred in the experiment route: {e}") return "An error occurred", 500 @app.route('/feedback', methods=['POST']) def feedback(): try: username = request.form['username'] feedback = request.form['feedback'] session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) # Store the feedback session_data['responses'].append({ 'sample_id': session_data['current_index'], 'feedback': feedback }) # Move to the next sample session_data['current_index'] += 1 # Save updated session data save_session_data(username, session_data) if session_data['current_index'] >= len(session_data['selected_samples']): return redirect(url_for('completed', username=username)) return redirect(url_for('experiment', username=username)) except Exception as e: logger.exception(f"Error in feedback route: {e}") return "An error occurred", 500 @app.route('/completed/') def completed(username): try: session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) responses = session_data['responses'] method = session_data['method'] if method == "Chain-of-Table": json_file = 'Tabular_LLMs_human_study_vis_6_COT.json' else: # Default to Plan-of-SQLs json_file = 'Tabular_LLMs_human_study_vis_6.json' with open(json_file, 'r') as f: ground_truth = json.load(f) correct_responses = 0 accept_count = 0 reject_count = 0 for response in responses: sample_id = response['sample_id'] feedback = response['feedback'] visualization_file = session_data['selected_samples'][sample_id] index = visualization_file.split('-')[1].split('.')[0] # Extract index from filename if feedback.upper() == "TRUE": accept_count += 1 elif feedback.upper() == "FALSE": reject_count += 1 if method == "Chain-of-Table": ground_truth_key = f"COT_test-{index}.html" else: ground_truth_key = f"POS_test-{index}.html" if ground_truth_key in ground_truth and ground_truth[ground_truth_key]['answer'].upper() == feedback.upper(): correct_responses += 1 else: logger.warning(f"Missing or mismatched key: {ground_truth_key}") accuracy = (correct_responses / len(responses)) * 100 if responses else 0 accuracy = round(accuracy, 2) accept_percentage = (accept_count / len(responses)) * 100 if len(responses) else 0 reject_percentage = (reject_count / len(responses)) * 100 if len(responses) else 0 accept_percentage = round(accept_percentage, 2) reject_percentage = round(reject_percentage, 2) return render_template('completed.html', accuracy=accuracy, accept_percentage=accept_percentage, reject_percentage=reject_percentage) except Exception as e: logger.exception(f"An error occurred in the completed route: {e}") return "An error occurred", 500 @app.route('/visualizations/') def send_visualization(filename): logger.info(f"Attempting to serve file: {filename}") # Ensure the path is safe and doesn't allow access to files outside the intended directory base_dir = os.getcwd() file_path = os.path.normpath(os.path.join(base_dir, filename)) if not file_path.startswith(base_dir): return "Access denied", 403 if not os.path.exists(file_path): return "File not found", 404 directory = os.path.dirname(file_path) file_name = os.path.basename(file_path) logger.info(f"Serving file from directory: {directory}, filename: {file_name}") return send_from_directory(directory, file_name) if __name__ == "__main__": os.makedirs('session_data', exist_ok=True) # Ensure the directory for session files exists app.run(host="0.0.0.0", port=7860, debug=True)