from copy import deepcopy from functools import partial from typing import * import pandas as pd from fire import Fire """ This code assumes dealing with only one instruction """ # from varco_arena.tournament def log2_power_of_two(n): # First, let's make sure n is indeed a power of 2 if n & (n - 1) != 0 or n == 0: raise ValueError("n must be a positive power of 2") exponent = 0 while n > 1: n >>= 1 # Right shift is like dividing by 2, but faster exponent += 1 return exponent def get_1st(df: pd.DataFrame, alpha2names: dict) -> Optional[str]: finals = df[df["round"] == "final"] if len(finals) == 1: first = finals.iloc[0].winner_resolved else: first = ( None # error case (no finals match or multiple finals (buggy result file)) ) return first def get_unique_participants(df: pd.DataFrame) -> list: participants = pd.concat([df.model_a, df.model_b]).unique().tolist() participants = [p for p in participants if p] # remove None participants = sorted(participants) # make it sorted return participants def _impute_byes(df): max_depth = df.depth.max() # init imputed_parts = dict() for depth in range(max_depth + 1): imputed_parts[depth] = df[df.depth == depth].copy() # reverse for depth in range(max_depth, 0, -1): # always we have 1 proper match for depth=0 null_v_null = { "model_a": "", "model_b": "", "winner": "model_a", "match_order_in_round": "-", "depth": depth, } # fill some_model vs null byes players = get_unique_participants(imputed_parts[depth]) proceeded = get_unique_participants(imputed_parts[depth - 1]) imputed = [] for p in proceeded: if p not in players: p_v_null = deepcopy(null_v_null) p_v_null["model_a"] = p imputed.append(p_v_null) imputed_parts[depth] = pd.concat( [ imputed_parts[depth], pd.DataFrame(imputed), ], axis="index", ) # fill null vs null n_null_v_null = 2 ** (depth) - len(imputed_parts[depth]) if n_null_v_null > 0: imputed = pd.DataFrame([null_v_null] * n_null_v_null) imputed_parts[depth] = pd.concat( [ imputed_parts[depth], imputed, ], axis="index", ) df_imputed = pd.concat(imputed_parts.values(), axis="index") df_imputed = df_imputed.sort_values(by="depth").reset_index(drop=True) return df_imputed def index_test_scenario(df) -> pd.DataFrame: df["inst_src"] = "inst: " + df.instruction + "\n\nsrc: " + df.source df["idx_inst_src"] = df.apply( lambda row: f"{row.tournament_idx}:\n{row.inst_src}", axis=1 ) # later used for tournament bracket backtrackiung if "depth" not in df.columns: mappings = { "final": 0, "semi-final": 1, "quarter-final": 2, } def _convert_round_to_depth(rnd: str, mappings=None) -> int: if rnd is None: depth = None elif rnd in mappings.keys(): depth = mappings[rnd] elif rnd.startswith("round-"): # assume perfect power of two num = int(rnd.replace("round-", "").strip()) depth = log2_power_of_two(num) - 1 return depth conv = partial(_convert_round_to_depth, mappings=mappings) df["depth"] = df["round"].apply(conv) return df def init_tournament_dataframe(df, alpha2names: dict = None) -> pd.DataFrame: df = df.sort_values(by="depth").reset_index(drop=True) # make winner interpretable (A -> model_a, B -> model_b) df.winner = df.winner.apply(lambda txt: f"model_{txt.lower()}") # define alpha2names if not given (covers upto 168 participants) if alpha2names is None: alphabets = "ABCDEFGHIJKLMNOPQRSTUVWXYZ\ abcdefghijklmnopqrstuvwxyz\ ⓐⓑⓒⓓⓔⓕⓖⓗⓘⓙⓚⓛⓜⓝⓞⓟⓠⓡⓢⓣⓤⓥⓦⓧⓨⓩ\ ㉠㉡㉢㉣㉤㉥㉦㉧㉨㉩㉪㉫㉬㉭\ ㉮㉯㉰㉱㉲㉳㉴㉵㉶㉷㉸㉹㉺㉻\ ㄱㄴㄷㄹㅁㅂㅅㅇㅈㅊㅋㅌㅍㅎ\ ΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ\ αβγδεζηθικλμνξοπρστυφχψω" model_full_names = get_unique_participants(df) alpha2names = dict(zip(alphabets, model_full_names)) if len(alpha2names) < len(model_full_names): raise ValueError( f"Tournament viewer cannot visualize more than {len(alphabets)=} participants. ({len(model_full_names)=} is given)\n\nOther features will not be affected but the tournament visualizer." ) names2alpha = dict(zip(alpha2names.values(), alpha2names.keys())) df = _impute_byes(df) # preserve readables for later df = _make_readables(df, names2alpha) if len(df[df["round"] == "final"]) != 1: raise ValueError(f"final match need to be one and only.") return df, alpha2names def _make_readables(df, names2alpha): df["human_readable_model_a"] = df.model_a.copy() df["human_readable_model_b"] = df.model_b.copy() df.model_a = df.model_a.apply( lambda modelname: names2alpha[modelname] if modelname else "x" ) df.model_b = df.model_b.apply( lambda modelname: names2alpha[modelname] if modelname else "x" ) df["human_readable_idx"] = df.apply( lambda row: f"{row.name}: {row.human_readable_model_a} ({row.model_a}) vs. {row.human_readable_model_b} ({row.model_b if row.model_b else 'x'})", axis=1, ) df["winner_resolved"] = df.apply(lambda row: row[row.winner], axis=1) df["winner_nodes"] = df.apply( lambda row: f"{row.winner_resolved}:{row.name}".ljust(4, " "), axis=1 ) # later for figure representation of winner as a "node" return df # draw def draw(df: pd.DataFrame, alpha2names: dict = None) -> str: def _draw_round( df: pd.DataFrame, depth: int = None, winners_in_order: list = None, ) -> Tuple: df_now = df[df.depth == depth] max_depth = df.depth.max() width = 2 ** ((max_depth - depth) + 2) connect_left = "─" * (width) connect_left = connect_left[4:] connect_right = " " * (width) connect_right = "┐" + connect_right[1:] if winners_in_order is None: assert ( depth == 0 ), f"{winners_in_order=} is only allowed when drawing the top (=final match)" winners_in_order = df_now.winner_nodes round_drawing_parts = [] descending_round_winners = [] for node in winners_in_order: round_drawing_parts.append("".join([node, connect_left, connect_right])) # next round winners in sync with winner order row_now = df_now.query(f"winner_nodes=='{node}'") descending_round_winners.append(row_now.model_a.item()) descending_round_winners.append(row_now.model_b.item()) # find descending_round_winners within winner_nodes format (num:alpha) if depth == max_depth: pass # keep the descending_round_winners intact else: df_descend = df[df.depth == depth + 1] for i, winner_alpha in enumerate(descending_round_winners): node_intr = df_descend.query( f"winner_resolved=='{winner_alpha}'" ).winner_nodes.item() descending_round_winners[i] = node_intr round_drawing = "".join(round_drawing_parts) descending_unit = " " * width descending_unit = "│" + descending_unit[1:] descending_lines_parts = [descending_unit] * len(df_now) * 2 descending_lines = "".join(descending_lines_parts) return round_drawing, descending_lines, descending_round_winners drawings = [] winners_in_order = None max_depth = df.depth.max() for depth in range(max_depth + 1): max_depth = df.depth.max() winner_drw, lines_desc, winners_in_order = _draw_round( df, depth=depth, winners_in_order=winners_in_order, ) drawings.append((winner_drw, lines_desc)) # prepare bracket top champion_alphabet = drawings[0][0].split()[0].split(":")[0] champion_readable = alpha2names[champion_alphabet] bracket_top = [f"🥇winner: {champion_readable}", "│"] # prepare mid bracket_mid = "\n".join(["\n".join(tup) for tup in drawings]) # prepare bot initial_participants = winners_in_order bracket_bot = (" " * 3).join(initial_participants) full_figure = "\n".join(bracket_top + [bracket_mid, bracket_bot]) return full_figure def number_breakdown_from_df(result_df: pd.DataFrame) -> str: n_models = len(get_unique_participants(result_df)) size_testset = int(len(result_df) / (n_models - 1)) interpretation = f"total {len(result_df)} matches = (n_models-1) * size_testset = ({n_models}-1) * {size_testset}" return interpretation, n_models, size_testset def make_legend_str(df, alpha2names) -> str: first = get_1st(df, alpha2names) alpha2names = {k: v.replace("🥇 ", "") for k, v in alpha2names.items()} alpha_ordered = sorted(list(alpha2names.keys())) # names_ordered = sorted(list(alpha2names.values())) # name2alpha = {v: k for k, v in alpha2names.items()} for k, v in alpha2names.items(): if v == alpha2names[first]: alpha2names[k] = f"🥇 {v}" res_str = f"\n\nlegend:" # for name in names_ordered: # alpha = name2alpha[name] for alpha in alpha_ordered: name_w_medal = alpha2names[alpha] res_str += f"\n{alpha}\t{name_w_medal}" return res_str def main( jslname: str = "result.json", ): """ 테스트 코드 """ df = pd.read_json(jslname, orient="records") df = df.drop(columns=["tstamp", "logs"]) df = index_test_scenario(df) # 중간에 visualization(df) 여기선 생략. 만약 이거 뺴고 다 따라했는데 문제가 생긴다면 viz 문제다. 근데 안그럴거같긴함 selections = df.idx_inst_src.unique() for i, sel in enumerate(selections): try: df_now = df[df.idx_inst_src == sel] df_now_processed, _alpha2names = init_tournament_dataframe( df_now, alpha2names=alpha2names if "alpha2names" in dir() else None ) if "alpha2names" not in dir(): alpha2names = _alpha2names assert "alpha2names" in dir() bracket_drawing = draw(df_now_processed, alpha2names=alpha2names) legend = make_legend_str(df_now_processed, alpha2names) print(bracket_drawing + legend) print(bracket_drawing + legend, file=open(f"{i}.txt", "w")) print(f"\n\n{sel}", file=open(f"{i}.txt", "a")) for match_idx_human in df_now_processed.human_readable_idx: match_idx = int(match_idx_human.split(": ")[0]) row = df_now_processed.loc[match_idx] winner = row.winner except Exception as e: print(e, file=open(f"{i}_err.txt", "w")) print("", file=open(f"{i}_err.txt", "a")) print(sel, file=open(f"{i}_err.txt", "a")) df_now_processed[ [ "depth", "round", "winner_nodes", "winner_resolved", "winner", "model_a", "model_b", ] ].to_json(f"{i}_err.jsonl", lines=True, orient="records") if __name__ == "__main__": Fire(main)