VARCO_Arena / analysis_utils.py
sonsus's picture
others
c2ba4d5
from copy import deepcopy
from functools import partial
from typing import *
import pandas as pd
from fire import Fire
"""
This code assumes dealing with only one instruction
"""
# from varco_arena.tournament
def log2_power_of_two(n):
# First, let's make sure n is indeed a power of 2
if n & (n - 1) != 0 or n == 0:
raise ValueError("n must be a positive power of 2")
exponent = 0
while n > 1:
n >>= 1 # Right shift is like dividing by 2, but faster
exponent += 1
return exponent
def get_1st(df: pd.DataFrame, alpha2names: dict) -> Optional[str]:
finals = df[df["round"] == "final"]
if len(finals) == 1:
first = finals.iloc[0].winner_resolved
else:
first = (
None # error case (no finals match or multiple finals (buggy result file))
)
return first
def get_unique_participants(df: pd.DataFrame) -> list:
participants = pd.concat([df.model_a, df.model_b]).unique().tolist()
participants = [p for p in participants if p] # remove None
participants = sorted(participants) # make it sorted
return participants
def _impute_byes(df):
max_depth = df.depth.max()
# init
imputed_parts = dict()
for depth in range(max_depth + 1):
imputed_parts[depth] = df[df.depth == depth].copy()
# reverse
for depth in range(max_depth, 0, -1): # always we have 1 proper match for depth=0
null_v_null = {
"model_a": "",
"model_b": "",
"winner": "model_a",
"match_order_in_round": "-",
"depth": depth,
}
# fill some_model vs null byes
players = get_unique_participants(imputed_parts[depth])
proceeded = get_unique_participants(imputed_parts[depth - 1])
imputed = []
for p in proceeded:
if p not in players:
p_v_null = deepcopy(null_v_null)
p_v_null["model_a"] = p
imputed.append(p_v_null)
imputed_parts[depth] = pd.concat(
[
imputed_parts[depth],
pd.DataFrame(imputed),
],
axis="index",
)
# fill null vs null
n_null_v_null = 2 ** (depth) - len(imputed_parts[depth])
if n_null_v_null > 0:
imputed = pd.DataFrame([null_v_null] * n_null_v_null)
imputed_parts[depth] = pd.concat(
[
imputed_parts[depth],
imputed,
],
axis="index",
)
df_imputed = pd.concat(imputed_parts.values(), axis="index")
df_imputed = df_imputed.sort_values(by="depth").reset_index(drop=True)
return df_imputed
def index_test_scenario(df) -> pd.DataFrame:
df["inst_src"] = "inst: " + df.instruction + "\n\nsrc: " + df.source
df["idx_inst_src"] = df.apply(
lambda row: f"{row.tournament_idx}:\n{row.inst_src}", axis=1
)
# later used for tournament bracket backtrackiung
if "depth" not in df.columns:
mappings = {
"final": 0,
"semi-final": 1,
"quarter-final": 2,
}
def _convert_round_to_depth(rnd: str, mappings=None) -> int:
if rnd is None:
depth = None
elif rnd in mappings.keys():
depth = mappings[rnd]
elif rnd.startswith("round-"): # assume perfect power of two
num = int(rnd.replace("round-", "").strip())
depth = log2_power_of_two(num) - 1
return depth
conv = partial(_convert_round_to_depth, mappings=mappings)
df["depth"] = df["round"].apply(conv)
return df
def init_tournament_dataframe(df, alpha2names: dict = None) -> pd.DataFrame:
df = df.sort_values(by="depth").reset_index(drop=True)
# make winner interpretable (A -> model_a, B -> model_b)
df.winner = df.winner.apply(lambda txt: f"model_{txt.lower()}")
# define alpha2names if not given (covers upto 168 participants)
if alpha2names is None:
alphabets = "ABCDEFGHIJKLMNOPQRSTUVWXYZ\
abcdefghijklmnopqrstuvwxyz\
ⓐⓑⓒⓓⓔⓕⓖⓗⓘⓙⓚⓛⓜⓝⓞⓟⓠⓡⓢⓣⓤⓥⓦⓧⓨⓩ\
㉠㉡㉢㉣㉤㉥㉦㉧㉨㉩㉪㉫㉬㉭\
㉮㉯㉰㉱㉲㉳㉴㉵㉶㉷㉸㉹㉺㉻\
ㄱㄴㄷㄹㅁㅂㅅㅇㅈㅊㅋㅌㅍㅎ\
ΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ\
αβγδεζηθικλμνξοπρστυφχψω"
model_full_names = get_unique_participants(df)
alpha2names = dict(zip(alphabets, model_full_names))
if len(alpha2names) < len(model_full_names):
raise ValueError(
f"Tournament viewer cannot visualize more than {len(alphabets)=} participants. ({len(model_full_names)=} is given)\n\nOther features will not be affected but the tournament visualizer."
)
names2alpha = dict(zip(alpha2names.values(), alpha2names.keys()))
df = _impute_byes(df)
# preserve readables for later
df = _make_readables(df, names2alpha)
if len(df[df["round"] == "final"]) != 1:
raise ValueError(f"final match need to be one and only.")
return df, alpha2names
def _make_readables(df, names2alpha):
df["human_readable_model_a"] = df.model_a.copy()
df["human_readable_model_b"] = df.model_b.copy()
df.model_a = df.model_a.apply(
lambda modelname: names2alpha[modelname] if modelname else "x"
)
df.model_b = df.model_b.apply(
lambda modelname: names2alpha[modelname] if modelname else "x"
)
df["human_readable_idx"] = df.apply(
lambda row: f"{row.name}: {row.human_readable_model_a} ({row.model_a}) vs. {row.human_readable_model_b} ({row.model_b if row.model_b else 'x'})",
axis=1,
)
df["winner_resolved"] = df.apply(lambda row: row[row.winner], axis=1)
df["winner_nodes"] = df.apply(
lambda row: f"{row.winner_resolved}:{row.name}".ljust(4, " "), axis=1
) # later for figure representation of winner as a "node"
return df
# draw
def draw(df: pd.DataFrame, alpha2names: dict = None) -> str:
def _draw_round(
df: pd.DataFrame,
depth: int = None,
winners_in_order: list = None,
) -> Tuple:
df_now = df[df.depth == depth]
max_depth = df.depth.max()
width = 2 ** ((max_depth - depth) + 2)
connect_left = "─" * (width)
connect_left = connect_left[4:]
connect_right = " " * (width)
connect_right = "┐" + connect_right[1:]
if winners_in_order is None:
assert (
depth == 0
), f"{winners_in_order=} is only allowed when drawing the top (=final match)"
winners_in_order = df_now.winner_nodes
round_drawing_parts = []
descending_round_winners = []
for node in winners_in_order:
round_drawing_parts.append("".join([node, connect_left, connect_right]))
# next round winners in sync with winner order
row_now = df_now.query(f"winner_nodes=='{node}'")
descending_round_winners.append(row_now.model_a.item())
descending_round_winners.append(row_now.model_b.item())
# find descending_round_winners within winner_nodes format (num:alpha)
if depth == max_depth:
pass # keep the descending_round_winners intact
else:
df_descend = df[df.depth == depth + 1]
for i, winner_alpha in enumerate(descending_round_winners):
node_intr = df_descend.query(
f"winner_resolved=='{winner_alpha}'"
).winner_nodes.item()
descending_round_winners[i] = node_intr
round_drawing = "".join(round_drawing_parts)
descending_unit = " " * width
descending_unit = "│" + descending_unit[1:]
descending_lines_parts = [descending_unit] * len(df_now) * 2
descending_lines = "".join(descending_lines_parts)
return round_drawing, descending_lines, descending_round_winners
drawings = []
winners_in_order = None
max_depth = df.depth.max()
for depth in range(max_depth + 1):
max_depth = df.depth.max()
winner_drw, lines_desc, winners_in_order = _draw_round(
df,
depth=depth,
winners_in_order=winners_in_order,
)
drawings.append((winner_drw, lines_desc))
# prepare bracket top
champion_alphabet = drawings[0][0].split()[0].split(":")[0]
champion_readable = alpha2names[champion_alphabet]
bracket_top = [f"🥇winner: {champion_readable}", "│"]
# prepare mid
bracket_mid = "\n".join(["\n".join(tup) for tup in drawings])
# prepare bot
initial_participants = winners_in_order
bracket_bot = (" " * 3).join(initial_participants)
full_figure = "\n".join(bracket_top + [bracket_mid, bracket_bot])
return full_figure
def number_breakdown_from_df(result_df: pd.DataFrame) -> str:
n_models = len(get_unique_participants(result_df))
size_testset = int(len(result_df) / (n_models - 1))
interpretation = f"total {len(result_df)} matches = (n_models-1) * size_testset = ({n_models}-1) * {size_testset}"
return interpretation, n_models, size_testset
def make_legend_str(df, alpha2names) -> str:
first = get_1st(df, alpha2names)
alpha2names = {k: v.replace("🥇 ", "") for k, v in alpha2names.items()}
alpha_ordered = sorted(list(alpha2names.keys()))
# names_ordered = sorted(list(alpha2names.values()))
# name2alpha = {v: k for k, v in alpha2names.items()}
for k, v in alpha2names.items():
if v == alpha2names[first]:
alpha2names[k] = f"🥇 {v}"
res_str = f"\n\nlegend:"
# for name in names_ordered:
# alpha = name2alpha[name]
for alpha in alpha_ordered:
name_w_medal = alpha2names[alpha]
res_str += f"\n{alpha}\t{name_w_medal}"
return res_str
def main(
jslname: str = "result.json",
):
"""
테스트 코드
"""
df = pd.read_json(jslname, orient="records")
df = df.drop(columns=["tstamp", "logs"])
df = index_test_scenario(df)
# 중간에 visualization(df) 여기선 생략. 만약 이거 뺴고 다 따라했는데 문제가 생긴다면 viz 문제다. 근데 안그럴거같긴함
selections = df.idx_inst_src.unique()
for i, sel in enumerate(selections):
try:
df_now = df[df.idx_inst_src == sel]
df_now_processed, _alpha2names = init_tournament_dataframe(
df_now, alpha2names=alpha2names if "alpha2names" in dir() else None
)
if "alpha2names" not in dir():
alpha2names = _alpha2names
assert "alpha2names" in dir()
bracket_drawing = draw(df_now_processed, alpha2names=alpha2names)
legend = make_legend_str(df_now_processed, alpha2names)
print(bracket_drawing + legend)
print(bracket_drawing + legend, file=open(f"{i}.txt", "w"))
print(f"\n\n{sel}", file=open(f"{i}.txt", "a"))
for match_idx_human in df_now_processed.human_readable_idx:
match_idx = int(match_idx_human.split(": ")[0])
row = df_now_processed.loc[match_idx]
winner = row.winner
except Exception as e:
print(e, file=open(f"{i}_err.txt", "w"))
print("", file=open(f"{i}_err.txt", "a"))
print(sel, file=open(f"{i}_err.txt", "a"))
df_now_processed[
[
"depth",
"round",
"winner_nodes",
"winner_resolved",
"winner",
"model_a",
"model_b",
]
].to_json(f"{i}_err.jsonl", lines=True, orient="records")
if __name__ == "__main__":
Fire(main)