VARCO_Arena / app.py
sonsus's picture
app.py μ•ˆλ‚΄λ¬Έ 쒀더 ꡬ체적으둜
f3edb58 verified
raw
history blame
17 kB
# import shutil
import os
import select
import subprocess
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import *
import streamlit as st
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from varco_arena.varco_arena_core.prompts import load_prompt
from view_utils import (
default_page_setting,
escape_markdown,
set_nav_bar,
show_linebreak_in_md,
)
VA_ROOT = Path(os.environ.get("VARCO_ARENA_RESULT_PATH", "./user_submit"))
USR_SUB = VA_ROOT.parts[-1]
import shutil
import pandas as pd
import analysis_utils as au
from view_utils import visualization
class DataCache:
def __init__(self):
self.cache = {}
def store(self, key: str, data: dict):
self.cache[key] = data
def get(self, key: str) -> Optional[dict]:
return self.cache.get(key)
# Initialize the cache in session state if it doesn't exist
if "data_cache" not in st.session_state:
st.session_state.data_cache = DataCache()
def purge_user_sub_data(data_path_to_purge: Union[Path, str] = None):
if data_path_to_purge is None:
print("nothing to purge")
return
else:
shutil.rmtree(data_path_to_purge)
print(f"purged {str(data_path_to_purge)}")
return
@st.cache_data
def load_and_cache_data(result_file_path: Optional[str] = None) -> Tuple[Dict, Dict]:
"""
Load data from file, cache it in memory, then remove the file.
Returns cached data on subsequent calls.
Args:
result_file_path: Path to the result JSON file
Returns:
Tuple of (all_result_dict, df_dict)
"""
# Check if we already have cached data for this path
if result_file_path:
cache_key = str(Path(result_file_path))
cached_data = st.session_state.data_cache.get(cache_key)
if cached_data:
return cached_data["all_result_dict"], cached_data["df_dict"]
# Initialize empty dicts
all_result_dict = {}
df_dict = {}
if result_file_path is not None:
try:
result_file_path = Path(result_file_path)
# Read and process data
df = pd.read_json(result_file_path)
for col in ["tstamp", "logs"]:
if col in df.columns:
df.drop(columns=[col], inplace=True)
df = au.index_test_scenario(df)
fig_dict_per_task = {}
df_dict_per_task = {}
# Process overall data
fig_dict_per_task["Overall"] = visualization(df, is_overall=True)
df_dict_per_task["Overall"] = df
# Process per-task data
for task in df["task"].unique():
df_task = df[df["task"] == task]
fig_dict_per_task[task] = visualization(df_task, is_overall=False)
df_dict_per_task[task] = df_task
# Create key from path components
prm_name = result_file_path.parts[-2]
exp_name = result_file_path.parts[-3]
key = f"{exp_name}/{prm_name}"
all_result_dict[key] = fig_dict_per_task
df_dict[key] = df_dict_per_task
# Store in cache before removing file
cache_data = {"all_result_dict": all_result_dict, "df_dict": df_dict}
st.session_state.data_cache.store(str(result_file_path), cache_data)
# Remove user experiment directory
purge_user_sub_data(data_path_to_purge=VA_ROOT)
except Exception as e:
st.error(f"Error processing data: {str(e)}")
return {}, {}
return all_result_dict, df_dict
def upload_files(uploaded_files) -> Path:
# prep directory for user submission
user_sub_root = VA_ROOT
if user_sub_root.exists():
if not user_sub_root.is_dir():
raise ValueError(
f"{user_sub_root} file exists and is not a directory. Consider renaming it."
)
else:
user_sub_root.mkdir(parents=True)
KST = timezone(timedelta(hours=9))
tstamp = datetime.now(KST)
tstr = tstamp.strftime("%m-%d_%H:%M:%S")
files_dir_str = "./" + str(user_sub_root / tstr)
files_dir = Path(files_dir_str)
files_dir.mkdir(parents=True, exist_ok=True)
uploaded_files = list(uploaded_files)
if not uploaded_files:
st.warning("❌ No files to upload. Please drag/drop or browse files to upload.")
# purge_user_sub_data(data_path_to_purge=VA_ROOT)
elif len(uploaded_files) < 2:
st.error("❌ You need at least 2 jsonlines files to properly run VA.")
purge_user_sub_data(data_path_to_purge=VA_ROOT)
else: # properly uploaded
for file in uploaded_files:
# Create a path for the file in the server directory
file_path = files_dir / file.name
# Save the file to the server directory
with open(file_path, "wb") as f:
f.write(file.getbuffer())
jslfiles = list(files_dir.glob("*.jsonl"))
st.success(f"βœ… Successfully uploaded {len(jslfiles)} jsonl files.")
return files_dir.resolve()
def run_varco_arena(
price_estimation: bool = False,
# upload_dir: Union[str, Path] = None,
promptname: str = None,
exp_name: str = None,
api_key: Optional[str] = None,
evaluation_model: str = "gpt-4o-mini",
update_interval: float = 1.0,
):
# Use environment variable for API key
ptn = f"{str(st.session_state.upfiles_dir)}"
outdir = Path(ptn)
if exp_name:
outdir = outdir / exp_name
command = f"python varco_arena/main.py -i {ptn} -o {outdir} -k {api_key} -p {promptname} -e {evaluation_model} -j 64"
if price_estimation:
command = f"{command} -c"
else:
command = command.replace("python", "yes | python ")
print(command)
api_key = None # clear immediately
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
text=True,
bufsize=1,
shell=True,
)
# Set stdout and stdin to non-blocking mode
os.set_blocking(process.stdout.fileno(), False)
last_update_time = time.time()
terminal_output = st.empty()
full_output = f"{command}\n"
while True:
# Check if we have output to read
if select.select([process.stdout], [], [], 0)[0]:
output = process.stdout.readline()
if output:
full_output += output
if price_estimation:
to_show = full_output
terminal_output.code(to_show, language="bash")
else:
current_time = time.time()
if current_time - last_update_time > update_interval:
lines = full_output.split("\n")
if len(lines) < 5:
to_show = full_output
else:
to_show = "\n".join(["...\n..\n.\n"] + lines[-5:])
terminal_output.code(to_show, language="bash")
last_update_time = current_time
print(output)
time.sleep(0.1)
# Check if the process has finished
if process.poll() is not None:
# Read any remaining output
remaining_output = process.stdout.read()
if remaining_output:
lines = remaining_output.split("\n")
if len(lines) > 10:
to_show += "\n".join(["\n...\n..\n.\n"] + lines[-10:])
else:
to_show += remaining_output
terminal_output.code(to_show, language="bash")
print(remaining_output)
break
return_code = process.poll()
return outdir, return_code
def main():
# init lang
st.session_state["korean"] = st.session_state.get("korean", False)
sidebar_placeholder = default_page_setting()
set_nav_bar(
False, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_init"
)
st.title("βš”οΈ VARCO ARENA βš”οΈ")
if st.session_state.korean:
st.write(
"**VARCO ArenaλŠ” 각 λͺ¨λΈμ˜ μƒμ„±λœ κ²°κ³Όλ₯Ό 비ꡐ ν‰κ°€ν•˜μ—¬ λͺ¨λΈμ˜ μ„±λŠ₯ μˆœμœ„λ₯Ό μ œκ³΅ν•˜λŠ” μ‹œμŠ€ν…œμž…λ‹ˆλ‹€. λͺ¨λ²”λ‹΅μ•ˆμ„ ν•„μš”λ‘œ ν•˜μ§€ μ•ŠμœΌλ―€λ‘œ μ»€μŠ€ν…€ ν…ŒμŠ€νŠΈμ…‹ (50+ ν–‰) 을 ν™œμš©ν•˜λŠ” 경우 νŽΈλ¦¬ν•œ λ²€μΉ˜λ§ˆν‚Ήμ΄ κ°€λŠ₯ν•©λ‹ˆλ‹€.**"
)
else:
st.write(
"**VARCO Arena is an LLM benchmarking system that compares model responses across customized test scenarios (recommend >50 prompts) without requiring reference answers.**"
)
st.divider()
# Set up the file uploader
if st.session_state.korean:
st.markdown("### 1. λͺ¨λΈ 좜λ ₯파일 μ—…λ‘œλ“œ")
else:
st.markdown("### 1. Upload LLM responses")
uploaded_files = st.file_uploader(
"Drag and Drop jsonlines files (.jsonl)", accept_multiple_files=True
)
if st.session_state.korean:
st.info("μ—…λ‘œλ“œ ν•˜μ‹  νŒŒμΌμ€ μžλ™μœΌλ‘œ μ‚­μ œλ˜λ©° μˆ˜μ§‘λ˜κ±°λ‚˜ μ‚¬μš©λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.\n- [μž…λ ₯ μ˜ˆμ‹œ 파일 (*.jsonl)](https://huggingface.co/spaces/NCSOFT/VARCO_Arena/tree/main/varco_arena/rsc/inputs_for_dbg/dbg_llmbar_brief_inputs)")
else:
st.info(
"Your uploads will be removed automatically, not being collected nor reused for any purpose.\n- [Example input files (*.jsonl)](https://huggingface.co/spaces/NCSOFT/VARCO_Arena/tree/main/varco_arena/rsc/inputs_for_dbg/dbg_llmbar_brief_inputs)"
)
# upload state
if "upfiles_dir" not in st.session_state:
st.session_state.upfiles_dir = None
if st.button("μ—…λ‘œλ“œν•˜κΈ°" if st.session_state.korean else "Upload Files"):
st.session_state.upfiles_dir = upload_files(uploaded_files)
if st.button("μ—…λ‘œλ“œν•œ 파일 μ§€μš°κΈ°" if st.session_state.korean else "Purge my uploads"):
st.session_state.upfiles_dir = None
if VA_ROOT.is_dir():
shutil.rmtree(VA_ROOT)
st.success(
"βœ… μ—…λ‘œλ“œν•œ νŒŒμΌμ„ μ„œλ²„μ—μ„œ μ§€μ› μŠ΅λ‹ˆλ‹€"
if st.session_state.korean
else "βœ… Removed your uploads from the server successfully"
)
else:
st.error(
"❌ μ§€μšΈ 파일이 μ—†μŠ΅λ‹ˆλ‹€"
if st.session_state.korean
else "❌ You have nothing uploaded"
)
if st.session_state.korean:
with st.expander("❓❔ 무엇을 μ—…λ‘œλ“œ ν•˜λ‚˜μš”β“β”"):
st.info(open("guide_mds/input_jsonls_kr.md", encoding="UTF8").read())
else:
with st.expander("❓❔ What should I upload ❓❔"):
st.info(open("guide_mds/input_jsonls_en.md", encoding="UTF8").read())
# Form for cost estimation
with st.form("cost_estimation_form"):
if st.session_state.korean:
st.write("### 2. 가격 μ‚°μ •")
else:
st.write("### 2. Cost Estimation")
eval_model = st.selectbox(
"Select Judge",
open("eval_models_list.txt", encoding="UTF8").read().split("\n"),
)
promptname = st.selectbox(
"Select Evalutaion Prompt",
open("eval_prompt_list.txt", encoding="UTF8").read().split("\n"),
)
if st.session_state.korean:
st.markdown("*`llmbar`μ™Έ λ‹€λ₯Έ ν”„λ‘¬ν”„νŠΈλŠ” μΆ©λΆ„νžˆ κ²€μ¦λœ ν”„λ‘¬ν”„νŠΈλŠ” μ•„λ‹™λ‹ˆλ‹€. (λ™μž‘μ€ 함)")
else:
st.markdown(
"*Eval prompts other than `llmbar` is working example, not the optimal ones."
)
if promptname == USR_SUB:
raise ValueError(
f"{USR_SUB=} is preserved name for the system. Consider another naming for the prompt or consider changing {VA_ROOT=} (USR_SUB == VA_ROOT.parts[-1])."
)
estimate_button = st.form_submit_button("Calculate Cost!")
with st.expander(
"LLM Judge에 ν™œμš©λ˜λŠ” ν”„λ‘¬ν”„νŠΈ (`Calculate Cost!` ν΄λ¦­μ‹œ κ°±μ‹ )"
if st.session_state.korean
else "**Evaluation Prompt for LLM Judge (will refresh after `Calculate Cost!` clicked)**"
):
prompt = load_prompt(promptname, task="-")
kwargs = dict(
inst="{inst}",
src="{src}",
out_a="{out_a}",
out_b="{out_b}",
task="-",
)
if promptname == "translation_pair":
kwargs["source_lang"] = "{source_lang}"
kwargs["target_lang"] = "{target_lang}"
prompt_cmpl = prompt.complete_prompt(**kwargs)
st.markdown(f"### Evaluation Prompt: {promptname}")
for msg in prompt_cmpl:
st.markdown(f"**{msg['role']}**")
st.info(show_linebreak_in_md(escape_markdown(msg["content"])))
if estimate_button:
if st.session_state.get("upfiles_dir") is None:
st.error(
"❌ Requirements: You have to upload jsonlines files first to proceed"
)
else:
st.markdown("##### Estimated Cost")
dummy_api_key = "dummy"
dummy_exp_name = "dummy"
result_file_path, return_code = run_varco_arena(
# upload_dir=st.session_state.upfiles_dir,
promptname=promptname,
api_key=dummy_api_key,
exp_name=dummy_exp_name,
price_estimation=True,
evaluation_model=eval_model,
)
if return_code:
st.error(
"❌ RuntimeError: An error occurred during cost estimation. **Restart from file upload!**"
)
purge_user_sub_data(data_path_to_purge=VA_ROOT)
else:
st.success("βœ… Cost estimation completed successfully")
st.session_state.cost_estimated = True
# Form for actual run
with st.form("run_arena_form"):
if st.session_state.korean:
st.write("### 3. Varco Arena κ΅¬λ™ν•˜κΈ°")
else:
st.write("### 3. Run Varco Arena")
api_key = st.text_input("Enter your OpenAI API Key", type="password")
# demo exp name fixated
KST = timezone(timedelta(hours=9))
tstamp = datetime.now(KST)
tstr = tstamp.strftime("%m-%d_%H:%M:%S")
exp_name = f"{tstr}_KST_submit"
if st.session_state.korean:
st.write("**주의**:`Ctrl+C` λ²„νŠΌμ€ κ΅¬ν˜„λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€. ꡬ동 μ „ μˆ™κ³ ν•΄μ£Όμ„Έμš”.")
else:
st.write("**Caution: `Ctrl+C` button hasn't been implemented.**")
run_button = st.form_submit_button(
"πŸ”₯ Run Arena!",
disabled=(not st.session_state.get("cost_estimated", False))
or "result_file_path"
in st.session_state.keys(), # run already performed once
)
if run_button:
set_nav_bar(
True,
sidebar_placeholder=sidebar_placeholder,
toggle_hashstr="app_during_run",
)
if st.session_state.get("upfiles_dir") is None:
st.error(
"❌ Requirements: You have to upload jsonlines files first to proceed"
)
elif not api_key:
st.error("❌ Requirements: OpenAI key required to run VA.")
else:
result_file_path, return_code = run_varco_arena(
# upload_dir=st.session_state.upfiles_dir,
promptname=promptname,
api_key=api_key,
exp_name=exp_name,
price_estimation=False,
evaluation_model=eval_model,
)
if return_code:
st.error(
"❌ RuntimeError: An error occurred during Varco Arena run. Check the file and **restart from file upload!**"
)
purge_user_sub_data(data_path_to_purge=VA_ROOT)
else:
st.success("βœ… Varco Arena run completed successfully")
st.session_state.result_file_path = list(
result_file_path.glob("**/result.json")
)[-1]
set_nav_bar(
False, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_run_done"
)
if st.session_state.get("result_file_path", None) is not None:
print(f"{st.session_state.get('result_file_path', None)=}")
load_and_cache_data(result_file_path=str(st.session_state.result_file_path))
if __name__ == "__main__":
main()