# import shutil import os import select import subprocess import sys import time from datetime import datetime, timedelta, timezone from pathlib import Path from typing import * import streamlit as st sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from varco_arena.varco_arena_core.prompts import load_prompt from view_utils import ( default_page_setting, escape_markdown, set_nav_bar, show_linebreak_in_md, ) VA_ROOT = Path(os.environ.get("VARCO_ARENA_RESULT_PATH", "./user_submit")) USR_SUB = VA_ROOT.parts[-1] import shutil import pandas as pd import analysis_utils as au from view_utils import visualization class DataCache: def __init__(self): self.cache = {} def store(self, key: str, data: dict): self.cache[key] = data def get(self, key: str) -> Optional[dict]: return self.cache.get(key) # Initialize the cache in session state if it doesn't exist if "data_cache" not in st.session_state: st.session_state.data_cache = DataCache() def purge_user_sub_data(data_path_to_purge: Union[Path, str] = None): if data_path_to_purge is None: print("nothing to purge") return else: shutil.rmtree(data_path_to_purge) print(f"purged {str(data_path_to_purge)}") return @st.cache_data def load_and_cache_data(result_file_path: Optional[str] = None) -> Tuple[Dict, Dict]: """ Load data from file, cache it in memory, then remove the file. Returns cached data on subsequent calls. Args: result_file_path: Path to the result JSON file Returns: Tuple of (all_result_dict, df_dict) """ # Check if we already have cached data for this path if result_file_path: cache_key = str(Path(result_file_path)) cached_data = st.session_state.data_cache.get(cache_key) if cached_data: return cached_data["all_result_dict"], cached_data["df_dict"] # Initialize empty dicts all_result_dict = {} df_dict = {} if result_file_path is not None: try: result_file_path = Path(result_file_path) # Read and process data df = pd.read_json(result_file_path) for col in ["tstamp", "logs"]: if col in df.columns: df.drop(columns=[col], inplace=True) df = au.index_test_scenario(df) fig_dict_per_task = {} df_dict_per_task = {} # Process overall data fig_dict_per_task["Overall"] = visualization(df, is_overall=True) df_dict_per_task["Overall"] = df # Process per-task data for task in df["task"].unique(): df_task = df[df["task"] == task] fig_dict_per_task[task] = visualization(df_task, is_overall=False) df_dict_per_task[task] = df_task # Create key from path components prm_name = result_file_path.parts[-2] exp_name = result_file_path.parts[-3] key = f"{exp_name}/{prm_name}" all_result_dict[key] = fig_dict_per_task df_dict[key] = df_dict_per_task # Store in cache before removing file cache_data = {"all_result_dict": all_result_dict, "df_dict": df_dict} st.session_state.data_cache.store(str(result_file_path), cache_data) # Remove user experiment directory purge_user_sub_data(data_path_to_purge=VA_ROOT) except Exception as e: st.error(f"Error processing data: {str(e)}") return {}, {} return all_result_dict, df_dict def upload_files(uploaded_files) -> Path: # prep directory for user submission user_sub_root = VA_ROOT if user_sub_root.exists(): if not user_sub_root.is_dir(): raise ValueError( f"{user_sub_root} file exists and is not a directory. Consider renaming it." ) else: user_sub_root.mkdir(parents=True) KST = timezone(timedelta(hours=9)) tstamp = datetime.now(KST) tstr = tstamp.strftime("%m-%d_%H:%M:%S") files_dir_str = "./" + str(user_sub_root / tstr) files_dir = Path(files_dir_str) files_dir.mkdir(parents=True, exist_ok=True) uploaded_files = list(uploaded_files) if not uploaded_files: st.warning("❌ No files to upload. Please drag/drop or browse files to upload.") # purge_user_sub_data(data_path_to_purge=VA_ROOT) elif len(uploaded_files) < 2: st.error("❌ You need at least 2 jsonlines files to properly run VA.") purge_user_sub_data(data_path_to_purge=VA_ROOT) else: # properly uploaded for file in uploaded_files: # Create a path for the file in the server directory file_path = files_dir / file.name # Save the file to the server directory with open(file_path, "wb") as f: f.write(file.getbuffer()) jslfiles = list(files_dir.glob("*.jsonl")) st.success(f"✅ Successfully uploaded {len(jslfiles)} jsonl files.") return files_dir.resolve() def run_varco_arena( price_estimation: bool = False, # upload_dir: Union[str, Path] = None, promptname: str = None, exp_name: str = None, api_key: Optional[str] = None, evaluation_model: str = "gpt-4o-mini", update_interval: float = 1.0, ): # Use environment variable for API key ptn = f"{str(st.session_state.upfiles_dir)}" outdir = Path(ptn) if exp_name: outdir = outdir / exp_name command = f"python varco_arena/main.py -i {ptn} -o {outdir} -k {api_key} -p {promptname} -e {evaluation_model} -j 64" if price_estimation: command = f"{command} -c" else: command = command.replace("python", "yes | python ") print(command) api_key = None # clear immediately process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, text=True, bufsize=1, shell=True, ) # Set stdout and stdin to non-blocking mode os.set_blocking(process.stdout.fileno(), False) last_update_time = time.time() terminal_output = st.empty() full_output = f"{command}\n" while True: # Check if we have output to read if select.select([process.stdout], [], [], 0)[0]: output = process.stdout.readline() if output: full_output += output if price_estimation: to_show = full_output terminal_output.code(to_show, language="bash") else: current_time = time.time() if current_time - last_update_time > update_interval: lines = full_output.split("\n") if len(lines) < 5: to_show = full_output else: to_show = "\n".join(["...\n..\n.\n"] + lines[-5:]) terminal_output.code(to_show, language="bash") last_update_time = current_time print(output) time.sleep(0.1) # Check if the process has finished if process.poll() is not None: # Read any remaining output remaining_output = process.stdout.read() if remaining_output: lines = remaining_output.split("\n") if len(lines) > 10: to_show += "\n".join(["\n...\n..\n.\n"] + lines[-10:]) else: to_show += remaining_output terminal_output.code(to_show, language="bash") print(remaining_output) break return_code = process.poll() return outdir, return_code def main(): # init lang st.session_state["korean"] = st.session_state.get("korean", False) sidebar_placeholder = default_page_setting() set_nav_bar( False, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_init" ) st.title("⚔️ VARCO ARENA ⚔️") if st.session_state.korean: st.write( """**바르코 아레나는 테스트셋 명령어별로 비교할 모델(생성문)의 토너먼트를 수행하고 결과들을 종합하여 모델들의 순위를 매기는 벤치마킹 시스템입니다. 이것은 reference 아웃풋과 비교하여 승률을 매기는 방법보다 정확하며 더 저렴합니다.** 모범답안을 필요로 하지 않으므로 커스텀 테스트셋 (50+ 행) 을 활용하는 경우 편리한 벤치마킹이 가능합니다.""" ) else: st.write( """**VARCO Arena is an LLM benchmarking system that compares model responses across customized test scenarios (recommend >50 prompts) without requiring reference answers.** VARCO Arena conducts tournaments between models to be compared for each test set command, ranking models accurately at an affordable price. This is more accurate and cost-effective than rating win rates by comparing against reference outputs.""" ) st.divider() # Set up the file uploader if st.session_state.korean: st.markdown("### 1. 모델 출력파일 업로드") else: st.markdown("### 1. Upload LLM responses") uploaded_files = st.file_uploader( "Drag and Drop jsonlines files (.jsonl)", accept_multiple_files=True ) if st.session_state.korean: st.info("업로드 하신 파일은 자동으로 삭제되며 수집되거나 사용되지 않습니다.\n- [입력 예시 파일 (*.jsonl)](https://huggingface.co/spaces/NCSOFT/VARCO_Arena/tree/main/varco_arena/rsc/inputs_for_dbg/dbg_llmbar_brief_inputs)") else: st.info( "Your uploads will be removed automatically, not being collected nor reused for any purpose.\n- [Example input files (*.jsonl)](https://huggingface.co/spaces/NCSOFT/VARCO_Arena/tree/main/varco_arena/rsc/inputs_for_dbg/dbg_llmbar_brief_inputs)" ) # upload state if "upfiles_dir" not in st.session_state: st.session_state.upfiles_dir = None if st.button("업로드하기" if st.session_state.korean else "Upload Files"): st.session_state.upfiles_dir = upload_files(uploaded_files) if st.button("업로드한 파일 지우기" if st.session_state.korean else "Purge my uploads"): st.session_state.upfiles_dir = None if VA_ROOT.is_dir(): shutil.rmtree(VA_ROOT) st.success( "✅ 업로드한 파일을 서버에서 지웠습니다" if st.session_state.korean else "✅ Removed your uploads from the server successfully" ) else: st.error( "❌ 지울 파일이 없습니다" if st.session_state.korean else "❌ You have nothing uploaded" ) if st.session_state.korean: with st.expander("❓❔ 무엇을 업로드 하나요❓❔"): st.info(open("guide_mds/input_jsonls_kr.md", encoding="UTF8").read()) else: with st.expander("❓❔ What should I upload ❓❔"): st.info(open("guide_mds/input_jsonls_en.md", encoding="UTF8").read()) # Form for cost estimation with st.form("cost_estimation_form"): if st.session_state.korean: st.write("### 2. 가격 산정") else: st.write("### 2. Cost Estimation") eval_model = st.selectbox( "Select Judge", open("eval_models_list.txt", encoding="UTF8").read().split("\n"), ) promptname = st.selectbox( "Select Evalutaion Prompt", open("eval_prompt_list.txt", encoding="UTF8").read().split("\n"), ) if st.session_state.korean: st.markdown("*`llmbar`외 다른 프롬프트는 충분히 검증된 프롬프트는 아닙니다. (동작은 함)") else: st.markdown( "*Eval prompts other than `llmbar` is working example, not the optimal ones." ) if promptname == USR_SUB: raise ValueError( f"{USR_SUB=} is preserved name for the system. Consider another naming for the prompt or consider changing {VA_ROOT=} (USR_SUB == VA_ROOT.parts[-1])." ) estimate_button = st.form_submit_button("Calculate Cost!") with st.expander( "LLM Judge에 활용되는 프롬프트 (`Calculate Cost!` 클릭시 갱신)" if st.session_state.korean else "**Evaluation Prompt for LLM Judge (will refresh after `Calculate Cost!` clicked)**" ): prompt = load_prompt(promptname, task="-") kwargs = dict( inst="{inst}", src="{src}", out_a="{out_a}", out_b="{out_b}", task="-", ) if promptname == "translation_pair": kwargs["source_lang"] = "{source_lang}" kwargs["target_lang"] = "{target_lang}" prompt_cmpl = prompt.complete_prompt(**kwargs) st.markdown(f"### Evaluation Prompt: {promptname}") for msg in prompt_cmpl: st.markdown(f"**{msg['role']}**") st.info(show_linebreak_in_md(escape_markdown(msg["content"]))) if estimate_button: if st.session_state.get("upfiles_dir") is None: st.error( "❌ Requirements: You have to upload jsonlines files first to proceed" ) else: st.markdown("##### Estimated Cost") dummy_api_key = "dummy" dummy_exp_name = "dummy" result_file_path, return_code = run_varco_arena( # upload_dir=st.session_state.upfiles_dir, promptname=promptname, api_key=dummy_api_key, exp_name=dummy_exp_name, price_estimation=True, evaluation_model=eval_model, ) if return_code: st.error( "❌ RuntimeError: An error occurred during cost estimation. **Restart from file upload!**" ) purge_user_sub_data(data_path_to_purge=VA_ROOT) else: st.success("✅ Cost estimation completed successfully") st.session_state.cost_estimated = True # Form for actual run with st.form("run_arena_form"): if st.session_state.korean: st.write("### 3. Varco Arena 구동하기") else: st.write("### 3. Run Varco Arena") api_key = st.text_input("Enter your OpenAI API Key", type="password") # demo exp name fixated KST = timezone(timedelta(hours=9)) tstamp = datetime.now(KST) tstr = tstamp.strftime("%m-%d_%H:%M:%S") exp_name = f"{tstr}_KST_submit" if st.session_state.korean: st.write("**주의**:`Ctrl+C` 버튼은 구현되지 않았습니다. 구동 전 숙고해주세요.") else: st.write("**Caution: `Ctrl+C` button hasn't been implemented.**") run_button = st.form_submit_button( "🔥 Run Arena!", disabled=(not st.session_state.get("cost_estimated", False)) or "result_file_path" in st.session_state.keys(), # run already performed once ) if run_button: set_nav_bar( True, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_during_run", ) if st.session_state.get("upfiles_dir") is None: st.error( "❌ Requirements: You have to upload jsonlines files first to proceed" ) elif not api_key: st.error("❌ Requirements: OpenAI key required to run VA.") else: result_file_path, return_code = run_varco_arena( # upload_dir=st.session_state.upfiles_dir, promptname=promptname, api_key=api_key, exp_name=exp_name, price_estimation=False, evaluation_model=eval_model, ) if return_code: st.error( "❌ RuntimeError: An error occurred during Varco Arena run. Check the file and **restart from file upload!**" ) purge_user_sub_data(data_path_to_purge=VA_ROOT) else: st.success("✅ Varco Arena run completed successfully") st.session_state.result_file_path = list( result_file_path.glob("**/result.json") )[-1] set_nav_bar( False, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_run_done" ) if st.session_state.get("result_file_path", None) is not None: print(f"{st.session_state.get('result_file_path', None)=}") load_and_cache_data(result_file_path=str(st.session_state.result_file_path)) if __name__ == "__main__": main()