Spaces:
Running
Running
from copy import deepcopy | |
from functools import partial | |
from typing import * | |
import pandas as pd | |
from fire import Fire | |
""" | |
This code assumes dealing with only one instruction | |
""" | |
# from varco_arena.tournament | |
def log2_power_of_two(n): | |
# First, let's make sure n is indeed a power of 2 | |
if n & (n - 1) != 0 or n == 0: | |
raise ValueError("n must be a positive power of 2") | |
exponent = 0 | |
while n > 1: | |
n >>= 1 # Right shift is like dividing by 2, but faster | |
exponent += 1 | |
return exponent | |
def get_1st(df: pd.DataFrame, alpha2names: dict) -> Optional[str]: | |
finals = df[df["round"] == "final"] | |
if len(finals) == 1: | |
first = finals.iloc[0].winner_resolved | |
else: | |
first = ( | |
None # error case (no finals match or multiple finals (buggy result file)) | |
) | |
return first | |
def get_unique_participants(df: pd.DataFrame) -> list: | |
participants = pd.concat([df.model_a, df.model_b]).unique().tolist() | |
participants = [p for p in participants if p] # remove None | |
participants = sorted(participants) # make it sorted | |
return participants | |
def _impute_byes(df): | |
max_depth = df.depth.max() | |
# init | |
imputed_parts = dict() | |
for depth in range(max_depth + 1): | |
imputed_parts[depth] = df[df.depth == depth].copy() | |
# reverse | |
for depth in range(max_depth, 0, -1): # always we have 1 proper match for depth=0 | |
null_v_null = { | |
"model_a": "", | |
"model_b": "", | |
"winner": "model_a", | |
"match_order_in_round": "-", | |
"depth": depth, | |
} | |
# fill some_model vs null byes | |
players = get_unique_participants(imputed_parts[depth]) | |
proceeded = get_unique_participants(imputed_parts[depth - 1]) | |
imputed = [] | |
for p in proceeded: | |
if p not in players: | |
p_v_null = deepcopy(null_v_null) | |
p_v_null["model_a"] = p | |
imputed.append(p_v_null) | |
imputed_parts[depth] = pd.concat( | |
[ | |
imputed_parts[depth], | |
pd.DataFrame(imputed), | |
], | |
axis="index", | |
) | |
# fill null vs null | |
n_null_v_null = 2 ** (depth) - len(imputed_parts[depth]) | |
if n_null_v_null > 0: | |
imputed = pd.DataFrame([null_v_null] * n_null_v_null) | |
imputed_parts[depth] = pd.concat( | |
[ | |
imputed_parts[depth], | |
imputed, | |
], | |
axis="index", | |
) | |
df_imputed = pd.concat(imputed_parts.values(), axis="index") | |
df_imputed = df_imputed.sort_values(by="depth").reset_index(drop=True) | |
return df_imputed | |
def index_test_scenario(df) -> pd.DataFrame: | |
df["inst_src"] = "inst: " + df.instruction + "\n\nsrc: " + df.source | |
df["idx_inst_src"] = df.apply( | |
lambda row: f"{row.tournament_idx}:\n{row.inst_src}", axis=1 | |
) | |
# later used for tournament bracket backtrackiung | |
if "depth" not in df.columns: | |
mappings = { | |
"final": 0, | |
"semi-final": 1, | |
"quarter-final": 2, | |
} | |
def _convert_round_to_depth(rnd: str, mappings=None) -> int: | |
if rnd is None: | |
depth = None | |
elif rnd in mappings.keys(): | |
depth = mappings[rnd] | |
elif rnd.startswith("round-"): # assume perfect power of two | |
num = int(rnd.replace("round-", "").strip()) | |
depth = log2_power_of_two(num) - 1 | |
return depth | |
conv = partial(_convert_round_to_depth, mappings=mappings) | |
df["depth"] = df["round"].apply(conv) | |
return df | |
def init_tournament_dataframe(df, alpha2names: dict = None) -> pd.DataFrame: | |
df = df.sort_values(by="depth").reset_index(drop=True) | |
# make winner interpretable (A -> model_a, B -> model_b) | |
df.winner = df.winner.apply(lambda txt: f"model_{txt.lower()}") | |
# define alpha2names if not given (covers upto 168 participants) | |
if alpha2names is None: | |
alphabets = "ABCDEFGHIJKLMNOPQRSTUVWXYZ\ | |
abcdefghijklmnopqrstuvwxyz\ | |
ⓐⓑⓒⓓⓔⓕⓖⓗⓘⓙⓚⓛⓜⓝⓞⓟⓠⓡⓢⓣⓤⓥⓦⓧⓨⓩ\ | |
㉠㉡㉢㉣㉤㉥㉦㉧㉨㉩㉪㉫㉬㉭\ | |
㉮㉯㉰㉱㉲㉳㉴㉵㉶㉷㉸㉹㉺㉻\ | |
ㄱㄴㄷㄹㅁㅂㅅㅇㅈㅊㅋㅌㅍㅎ\ | |
ΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ\ | |
αβγδεζηθικλμνξοπρστυφχψω" | |
model_full_names = get_unique_participants(df) | |
alpha2names = dict(zip(alphabets, model_full_names)) | |
if len(alpha2names) < len(model_full_names): | |
raise ValueError( | |
f"Tournament viewer cannot visualize more than {len(alphabets)=} participants. ({len(model_full_names)=} is given)\n\nOther features will not be affected but the tournament visualizer." | |
) | |
names2alpha = dict(zip(alpha2names.values(), alpha2names.keys())) | |
df = _impute_byes(df) | |
# preserve readables for later | |
df = _make_readables(df, names2alpha) | |
if len(df[df["round"] == "final"]) != 1: | |
raise ValueError(f"final match need to be one and only.") | |
return df, alpha2names | |
def _make_readables(df, names2alpha): | |
df["human_readable_model_a"] = df.model_a.copy() | |
df["human_readable_model_b"] = df.model_b.copy() | |
df.model_a = df.model_a.apply( | |
lambda modelname: names2alpha[modelname] if modelname else "x" | |
) | |
df.model_b = df.model_b.apply( | |
lambda modelname: names2alpha[modelname] if modelname else "x" | |
) | |
df["human_readable_idx"] = df.apply( | |
lambda row: f"{row.name}: {row.human_readable_model_a} ({row.model_a}) vs. {row.human_readable_model_b} ({row.model_b if row.model_b else 'x'})", | |
axis=1, | |
) | |
df["winner_resolved"] = df.apply(lambda row: row[row.winner], axis=1) | |
df["winner_nodes"] = df.apply( | |
lambda row: f"{row.winner_resolved}:{row.name}".ljust(4, " "), axis=1 | |
) # later for figure representation of winner as a "node" | |
return df | |
# draw | |
def draw(df: pd.DataFrame, alpha2names: dict = None) -> str: | |
def _draw_round( | |
df: pd.DataFrame, | |
depth: int = None, | |
winners_in_order: list = None, | |
) -> Tuple: | |
df_now = df[df.depth == depth] | |
max_depth = df.depth.max() | |
width = 2 ** ((max_depth - depth) + 2) | |
connect_left = "─" * (width) | |
connect_left = connect_left[4:] | |
connect_right = " " * (width) | |
connect_right = "┐" + connect_right[1:] | |
if winners_in_order is None: | |
assert ( | |
depth == 0 | |
), f"{winners_in_order=} is only allowed when drawing the top (=final match)" | |
winners_in_order = df_now.winner_nodes | |
round_drawing_parts = [] | |
descending_round_winners = [] | |
for node in winners_in_order: | |
round_drawing_parts.append("".join([node, connect_left, connect_right])) | |
# next round winners in sync with winner order | |
row_now = df_now.query(f"winner_nodes=='{node}'") | |
descending_round_winners.append(row_now.model_a.item()) | |
descending_round_winners.append(row_now.model_b.item()) | |
# find descending_round_winners within winner_nodes format (num:alpha) | |
if depth == max_depth: | |
pass # keep the descending_round_winners intact | |
else: | |
df_descend = df[df.depth == depth + 1] | |
for i, winner_alpha in enumerate(descending_round_winners): | |
node_intr = df_descend.query( | |
f"winner_resolved=='{winner_alpha}'" | |
).winner_nodes.item() | |
descending_round_winners[i] = node_intr | |
round_drawing = "".join(round_drawing_parts) | |
descending_unit = " " * width | |
descending_unit = "│" + descending_unit[1:] | |
descending_lines_parts = [descending_unit] * len(df_now) * 2 | |
descending_lines = "".join(descending_lines_parts) | |
return round_drawing, descending_lines, descending_round_winners | |
drawings = [] | |
winners_in_order = None | |
max_depth = df.depth.max() | |
for depth in range(max_depth + 1): | |
max_depth = df.depth.max() | |
winner_drw, lines_desc, winners_in_order = _draw_round( | |
df, | |
depth=depth, | |
winners_in_order=winners_in_order, | |
) | |
drawings.append((winner_drw, lines_desc)) | |
# prepare bracket top | |
champion_alphabet = drawings[0][0].split()[0].split(":")[0] | |
champion_readable = alpha2names[champion_alphabet] | |
bracket_top = [f"🥇winner: {champion_readable}", "│"] | |
# prepare mid | |
bracket_mid = "\n".join(["\n".join(tup) for tup in drawings]) | |
# prepare bot | |
initial_participants = winners_in_order | |
bracket_bot = (" " * 3).join(initial_participants) | |
full_figure = "\n".join(bracket_top + [bracket_mid, bracket_bot]) | |
return full_figure | |
def number_breakdown_from_df(result_df: pd.DataFrame) -> str: | |
n_models = len(get_unique_participants(result_df)) | |
size_testset = int(len(result_df) / (n_models - 1)) | |
interpretation = f"total {len(result_df)} matches = (n_models-1) * size_testset = ({n_models}-1) * {size_testset}" | |
return interpretation, n_models, size_testset | |
def make_legend_str(df, alpha2names) -> str: | |
first = get_1st(df, alpha2names) | |
alpha2names = {k: v.replace("🥇 ", "") for k, v in alpha2names.items()} | |
alpha_ordered = sorted(list(alpha2names.keys())) | |
# names_ordered = sorted(list(alpha2names.values())) | |
# name2alpha = {v: k for k, v in alpha2names.items()} | |
for k, v in alpha2names.items(): | |
if v == alpha2names[first]: | |
alpha2names[k] = f"🥇 {v}" | |
res_str = f"\n\nlegend:" | |
# for name in names_ordered: | |
# alpha = name2alpha[name] | |
for alpha in alpha_ordered: | |
name_w_medal = alpha2names[alpha] | |
res_str += f"\n{alpha}\t{name_w_medal}" | |
return res_str | |
def main( | |
jslname: str = "result.json", | |
): | |
""" | |
테스트 코드 | |
""" | |
df = pd.read_json(jslname, orient="records") | |
df = df.drop(columns=["tstamp", "logs"]) | |
df = index_test_scenario(df) | |
# 중간에 visualization(df) 여기선 생략. 만약 이거 뺴고 다 따라했는데 문제가 생긴다면 viz 문제다. 근데 안그럴거같긴함 | |
selections = df.idx_inst_src.unique() | |
for i, sel in enumerate(selections): | |
try: | |
df_now = df[df.idx_inst_src == sel] | |
df_now_processed, _alpha2names = init_tournament_dataframe( | |
df_now, alpha2names=alpha2names if "alpha2names" in dir() else None | |
) | |
if "alpha2names" not in dir(): | |
alpha2names = _alpha2names | |
assert "alpha2names" in dir() | |
bracket_drawing = draw(df_now_processed, alpha2names=alpha2names) | |
legend = make_legend_str(df_now_processed, alpha2names) | |
print(bracket_drawing + legend) | |
print(bracket_drawing + legend, file=open(f"{i}.txt", "w")) | |
print(f"\n\n{sel}", file=open(f"{i}.txt", "a")) | |
for match_idx_human in df_now_processed.human_readable_idx: | |
match_idx = int(match_idx_human.split(": ")[0]) | |
row = df_now_processed.loc[match_idx] | |
winner = row.winner | |
except Exception as e: | |
print(e, file=open(f"{i}_err.txt", "w")) | |
print("", file=open(f"{i}_err.txt", "a")) | |
print(sel, file=open(f"{i}_err.txt", "a")) | |
df_now_processed[ | |
[ | |
"depth", | |
"round", | |
"winner_nodes", | |
"winner_resolved", | |
"winner", | |
"model_a", | |
"model_b", | |
] | |
].to_json(f"{i}_err.jsonl", lines=True, orient="records") | |
if __name__ == "__main__": | |
Fire(main) | |