Spaces:
Running
Running
# import shutil | |
import os | |
import select | |
import subprocess | |
import sys | |
import time | |
from datetime import datetime, timedelta, timezone | |
from pathlib import Path | |
from typing import * | |
import streamlit as st | |
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | |
from varco_arena.varco_arena_core.prompts import load_prompt | |
from view_utils import ( | |
default_page_setting, | |
escape_markdown, | |
set_nav_bar, | |
show_linebreak_in_md, | |
) | |
VA_ROOT = Path(os.environ.get("VARCO_ARENA_RESULT_PATH", "./user_submit")) | |
USR_SUB = VA_ROOT.parts[-1] | |
import shutil | |
import pandas as pd | |
import analysis_utils as au | |
from view_utils import visualization | |
class DataCache: | |
def __init__(self): | |
self.cache = {} | |
def store(self, key: str, data: dict): | |
self.cache[key] = data | |
def get(self, key: str) -> Optional[dict]: | |
return self.cache.get(key) | |
# Initialize the cache in session state if it doesn't exist | |
if "data_cache" not in st.session_state: | |
st.session_state.data_cache = DataCache() | |
def purge_user_sub_data(data_path_to_purge: Union[Path, str] = None): | |
if data_path_to_purge is None: | |
print("nothing to purge") | |
return | |
else: | |
shutil.rmtree(data_path_to_purge) | |
print(f"purged {str(data_path_to_purge)}") | |
return | |
def load_and_cache_data(result_file_path: Optional[str] = None) -> Tuple[Dict, Dict]: | |
""" | |
Load data from file, cache it in memory, then remove the file. | |
Returns cached data on subsequent calls. | |
Args: | |
result_file_path: Path to the result JSON file | |
Returns: | |
Tuple of (all_result_dict, df_dict) | |
""" | |
# Check if we already have cached data for this path | |
if result_file_path: | |
cache_key = str(Path(result_file_path)) | |
cached_data = st.session_state.data_cache.get(cache_key) | |
if cached_data: | |
return cached_data["all_result_dict"], cached_data["df_dict"] | |
# Initialize empty dicts | |
all_result_dict = {} | |
df_dict = {} | |
if result_file_path is not None: | |
try: | |
result_file_path = Path(result_file_path) | |
# Read and process data | |
df = pd.read_json(result_file_path) | |
for col in ["tstamp", "logs"]: | |
if col in df.columns: | |
df.drop(columns=[col], inplace=True) | |
df = au.index_test_scenario(df) | |
fig_dict_per_task = {} | |
df_dict_per_task = {} | |
# Process overall data | |
fig_dict_per_task["Overall"] = visualization(df, is_overall=True) | |
df_dict_per_task["Overall"] = df | |
# Process per-task data | |
for task in df["task"].unique(): | |
df_task = df[df["task"] == task] | |
fig_dict_per_task[task] = visualization(df_task, is_overall=False) | |
df_dict_per_task[task] = df_task | |
# Create key from path components | |
prm_name = result_file_path.parts[-2] | |
exp_name = result_file_path.parts[-3] | |
key = f"{exp_name}/{prm_name}" | |
all_result_dict[key] = fig_dict_per_task | |
df_dict[key] = df_dict_per_task | |
# Store in cache before removing file | |
cache_data = {"all_result_dict": all_result_dict, "df_dict": df_dict} | |
st.session_state.data_cache.store(str(result_file_path), cache_data) | |
# Remove user experiment directory | |
purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
except Exception as e: | |
st.error(f"Error processing data: {str(e)}") | |
return {}, {} | |
return all_result_dict, df_dict | |
def upload_files(uploaded_files) -> Path: | |
# prep directory for user submission | |
user_sub_root = VA_ROOT | |
if user_sub_root.exists(): | |
if not user_sub_root.is_dir(): | |
raise ValueError( | |
f"{user_sub_root} file exists and is not a directory. Consider renaming it." | |
) | |
else: | |
user_sub_root.mkdir(parents=True) | |
KST = timezone(timedelta(hours=9)) | |
tstamp = datetime.now(KST) | |
tstr = tstamp.strftime("%m-%d_%H:%M:%S") | |
files_dir_str = "./" + str(user_sub_root / tstr) | |
files_dir = Path(files_dir_str) | |
files_dir.mkdir(parents=True, exist_ok=True) | |
uploaded_files = list(uploaded_files) | |
if not uploaded_files: | |
st.warning("β No files to upload. Please drag/drop or browse files to upload.") | |
# purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
elif len(uploaded_files) < 2: | |
st.error("β You need at least 2 jsonlines files to properly run VA.") | |
purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
else: # properly uploaded | |
for file in uploaded_files: | |
# Create a path for the file in the server directory | |
file_path = files_dir / file.name | |
# Save the file to the server directory | |
with open(file_path, "wb") as f: | |
f.write(file.getbuffer()) | |
jslfiles = list(files_dir.glob("*.jsonl")) | |
st.success(f"β Successfully uploaded {len(jslfiles)} jsonl files.") | |
return files_dir.resolve() | |
def run_varco_arena( | |
price_estimation: bool = False, | |
# upload_dir: Union[str, Path] = None, | |
promptname: str = None, | |
exp_name: str = None, | |
api_key: Optional[str] = None, | |
evaluation_model: str = "gpt-4o-mini", | |
update_interval: float = 1.0, | |
): | |
# Use environment variable for API key | |
ptn = f"{str(st.session_state.upfiles_dir)}" | |
outdir = Path(ptn) | |
if exp_name: | |
outdir = outdir / exp_name | |
command = f"python varco_arena/main.py -i {ptn} -o {outdir} -k {api_key} -p {promptname} -e {evaluation_model} -j 64" | |
if price_estimation: | |
command = f"{command} -c" | |
else: | |
command = command.replace("python", "yes | python ") | |
print(command) | |
api_key = None # clear immediately | |
process = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
stdin=subprocess.PIPE, | |
text=True, | |
bufsize=1, | |
shell=True, | |
) | |
# Set stdout and stdin to non-blocking mode | |
os.set_blocking(process.stdout.fileno(), False) | |
last_update_time = time.time() | |
terminal_output = st.empty() | |
full_output = f"{command}\n" | |
while True: | |
# Check if we have output to read | |
if select.select([process.stdout], [], [], 0)[0]: | |
output = process.stdout.readline() | |
if output: | |
full_output += output | |
if price_estimation: | |
to_show = full_output | |
terminal_output.code(to_show, language="bash") | |
else: | |
current_time = time.time() | |
if current_time - last_update_time > update_interval: | |
lines = full_output.split("\n") | |
if len(lines) < 5: | |
to_show = full_output | |
else: | |
to_show = "\n".join(["...\n..\n.\n"] + lines[-5:]) | |
terminal_output.code(to_show, language="bash") | |
last_update_time = current_time | |
print(output) | |
time.sleep(0.1) | |
# Check if the process has finished | |
if process.poll() is not None: | |
# Read any remaining output | |
remaining_output = process.stdout.read() | |
if remaining_output: | |
lines = remaining_output.split("\n") | |
if len(lines) > 10: | |
to_show += "\n".join(["\n...\n..\n.\n"] + lines[-10:]) | |
else: | |
to_show += remaining_output | |
terminal_output.code(to_show, language="bash") | |
print(remaining_output) | |
break | |
return_code = process.poll() | |
return outdir, return_code | |
def main(): | |
# init lang | |
st.session_state["korean"] = st.session_state.get("korean", False) | |
sidebar_placeholder = default_page_setting() | |
set_nav_bar( | |
False, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_init" | |
) | |
st.title("βοΈ VARCO ARENA βοΈ") | |
if st.session_state.korean: | |
st.write( | |
"""**λ°λ₯΄μ½ μλ λλ ν μ€νΈμ λͺ λ Ήμ΄λ³λ‘ λΉκ΅ν λͺ¨λΈ(μμ±λ¬Έ)μ ν λλ¨ΌνΈλ₯Ό μννκ³ κ²°κ³Όλ€μ μ’ ν©νμ¬ λͺ¨λΈλ€μ μμλ₯Ό 맀기λ λ²€μΉλ§νΉ μμ€ν μ λλ€. μ΄κ²μ reference μμνκ³Ό λΉκ΅νμ¬ μΉλ₯ μ 맀기λ λ°©λ²λ³΄λ€ μ ννλ©° λ μ λ ΄ν©λλ€.** | |
λͺ¨λ²λ΅μμ νμλ‘ νμ§ μμΌλ―λ‘ μ»€μ€ν ν μ€νΈμ (50+ ν) μ νμ©νλ κ²½μ° νΈλ¦¬ν λ²€μΉλ§νΉμ΄ κ°λ₯ν©λλ€.""" | |
) | |
else: | |
st.write( | |
"""**VARCO Arena is an LLM benchmarking system that compares model responses across customized test scenarios (recommend >50 prompts) without requiring reference answers.** | |
VARCO Arena conducts tournaments between models to be compared for each test set command, ranking models accurately at an affordable price. This is more accurate and cost-effective than rating win rates by comparing against reference outputs.""" | |
) | |
st.divider() | |
# Set up the file uploader | |
if st.session_state.korean: | |
st.markdown("### 1. λͺ¨λΈ μΆλ ₯νμΌ μ λ‘λ") | |
else: | |
st.markdown("### 1. Upload LLM responses") | |
uploaded_files = st.file_uploader( | |
"Drag and Drop jsonlines files (.jsonl)", accept_multiple_files=True | |
) | |
if st.session_state.korean: | |
st.info("μ λ‘λ νμ νμΌμ μλμΌλ‘ μμ λλ©° μμ§λκ±°λ μ¬μ©λμ§ μμ΅λλ€.\n- [μ λ ₯ μμ νμΌ (*.jsonl)](https://huggingface.co/spaces/NCSOFT/VARCO_Arena/tree/main/varco_arena/rsc/inputs_for_dbg/dbg_llmbar_brief_inputs)") | |
else: | |
st.info( | |
"Your uploads will be removed automatically, not being collected nor reused for any purpose.\n- [Example input files (*.jsonl)](https://huggingface.co/spaces/NCSOFT/VARCO_Arena/tree/main/varco_arena/rsc/inputs_for_dbg/dbg_llmbar_brief_inputs)" | |
) | |
# upload state | |
if "upfiles_dir" not in st.session_state: | |
st.session_state.upfiles_dir = None | |
if st.button("μ λ‘λνκΈ°" if st.session_state.korean else "Upload Files"): | |
st.session_state.upfiles_dir = upload_files(uploaded_files) | |
if st.button("μ λ‘λν νμΌ μ§μ°κΈ°" if st.session_state.korean else "Purge my uploads"): | |
st.session_state.upfiles_dir = None | |
if VA_ROOT.is_dir(): | |
shutil.rmtree(VA_ROOT) | |
st.success( | |
"β μ λ‘λν νμΌμ μλ²μμ μ§μ μ΅λλ€" | |
if st.session_state.korean | |
else "β Removed your uploads from the server successfully" | |
) | |
else: | |
st.error( | |
"β μ§μΈ νμΌμ΄ μμ΅λλ€" | |
if st.session_state.korean | |
else "β You have nothing uploaded" | |
) | |
if st.session_state.korean: | |
with st.expander("ββ 무μμ μ λ‘λ νλμββ"): | |
st.info(open("guide_mds/input_jsonls_kr.md", encoding="UTF8").read()) | |
else: | |
with st.expander("ββ What should I upload ββ"): | |
st.info(open("guide_mds/input_jsonls_en.md", encoding="UTF8").read()) | |
# Form for cost estimation | |
with st.form("cost_estimation_form"): | |
if st.session_state.korean: | |
st.write("### 2. κ°κ²© μ°μ ") | |
else: | |
st.write("### 2. Cost Estimation") | |
eval_model = st.selectbox( | |
"Select Judge", | |
open("eval_models_list.txt", encoding="UTF8").read().split("\n"), | |
) | |
promptname = st.selectbox( | |
"Select Evalutaion Prompt", | |
open("eval_prompt_list.txt", encoding="UTF8").read().split("\n"), | |
) | |
if st.session_state.korean: | |
st.markdown("*`llmbar`μΈ λ€λ₯Έ ν둬ννΈλ μΆ©λΆν κ²μ¦λ ν둬ννΈλ μλλλ€. (λμμ ν¨)") | |
else: | |
st.markdown( | |
"*Eval prompts other than `llmbar` is working example, not the optimal ones." | |
) | |
if promptname == USR_SUB: | |
raise ValueError( | |
f"{USR_SUB=} is preserved name for the system. Consider another naming for the prompt or consider changing {VA_ROOT=} (USR_SUB == VA_ROOT.parts[-1])." | |
) | |
estimate_button = st.form_submit_button("Calculate Cost!") | |
with st.expander( | |
"LLM Judgeμ νμ©λλ ν둬ννΈ (`Calculate Cost!` ν΄λ¦μ κ°±μ )" | |
if st.session_state.korean | |
else "**Evaluation Prompt for LLM Judge (will refresh after `Calculate Cost!` clicked)**" | |
): | |
prompt = load_prompt(promptname, task="-") | |
kwargs = dict( | |
inst="{inst}", | |
src="{src}", | |
out_a="{out_a}", | |
out_b="{out_b}", | |
task="-", | |
) | |
if promptname == "translation_pair": | |
kwargs["source_lang"] = "{source_lang}" | |
kwargs["target_lang"] = "{target_lang}" | |
prompt_cmpl = prompt.complete_prompt(**kwargs) | |
st.markdown(f"### Evaluation Prompt: {promptname}") | |
for msg in prompt_cmpl: | |
st.markdown(f"**{msg['role']}**") | |
st.info(show_linebreak_in_md(escape_markdown(msg["content"]))) | |
if estimate_button: | |
if st.session_state.get("upfiles_dir") is None: | |
st.error( | |
"β Requirements: You have to upload jsonlines files first to proceed" | |
) | |
else: | |
st.markdown("##### Estimated Cost") | |
dummy_api_key = "dummy" | |
dummy_exp_name = "dummy" | |
result_file_path, return_code = run_varco_arena( | |
# upload_dir=st.session_state.upfiles_dir, | |
promptname=promptname, | |
api_key=dummy_api_key, | |
exp_name=dummy_exp_name, | |
price_estimation=True, | |
evaluation_model=eval_model, | |
) | |
if return_code: | |
st.error( | |
"β RuntimeError: An error occurred during cost estimation. **Restart from file upload!**" | |
) | |
purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
else: | |
st.success("β Cost estimation completed successfully") | |
st.session_state.cost_estimated = True | |
# Form for actual run | |
with st.form("run_arena_form"): | |
if st.session_state.korean: | |
st.write("### 3. Varco Arena ꡬλνκΈ°") | |
else: | |
st.write("### 3. Run Varco Arena") | |
api_key = st.text_input("Enter your OpenAI API Key", type="password") | |
# demo exp name fixated | |
KST = timezone(timedelta(hours=9)) | |
tstamp = datetime.now(KST) | |
tstr = tstamp.strftime("%m-%d_%H:%M:%S") | |
exp_name = f"{tstr}_KST_submit" | |
if st.session_state.korean: | |
st.write("**μ£Όμ**:`Ctrl+C` λ²νΌμ ꡬνλμ§ μμμ΅λλ€. ꡬλ μ μκ³ ν΄μ£ΌμΈμ.") | |
else: | |
st.write("**Caution: `Ctrl+C` button hasn't been implemented.**") | |
run_button = st.form_submit_button( | |
"π₯ Run Arena!", | |
disabled=(not st.session_state.get("cost_estimated", False)) | |
or "result_file_path" | |
in st.session_state.keys(), # run already performed once | |
) | |
if run_button: | |
set_nav_bar( | |
True, | |
sidebar_placeholder=sidebar_placeholder, | |
toggle_hashstr="app_during_run", | |
) | |
if st.session_state.get("upfiles_dir") is None: | |
st.error( | |
"β Requirements: You have to upload jsonlines files first to proceed" | |
) | |
elif not api_key: | |
st.error("β Requirements: OpenAI key required to run VA.") | |
else: | |
result_file_path, return_code = run_varco_arena( | |
# upload_dir=st.session_state.upfiles_dir, | |
promptname=promptname, | |
api_key=api_key, | |
exp_name=exp_name, | |
price_estimation=False, | |
evaluation_model=eval_model, | |
) | |
if return_code: | |
st.error( | |
"β RuntimeError: An error occurred during Varco Arena run. Check the file and **restart from file upload!**" | |
) | |
purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
else: | |
st.success("β Varco Arena run completed successfully") | |
st.session_state.result_file_path = list( | |
result_file_path.glob("**/result.json") | |
)[-1] | |
set_nav_bar( | |
False, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_run_done" | |
) | |
if st.session_state.get("result_file_path", None) is not None: | |
print(f"{st.session_state.get('result_file_path', None)=}") | |
load_and_cache_data(result_file_path=str(st.session_state.result_file_path)) | |
if __name__ == "__main__": | |
main() | |