File size: 12,074 Bytes
c2ba4d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
from copy import deepcopy
from functools import partial
from typing import *

import pandas as pd
from fire import Fire

"""
This code assumes dealing with only one instruction
"""


# from varco_arena.tournament
def log2_power_of_two(n):
    # First, let's make sure n is indeed a power of 2
    if n & (n - 1) != 0 or n == 0:
        raise ValueError("n must be a positive power of 2")

    exponent = 0
    while n > 1:
        n >>= 1  # Right shift is like dividing by 2, but faster
        exponent += 1
    return exponent


def get_1st(df: pd.DataFrame, alpha2names: dict) -> Optional[str]:
    finals = df[df["round"] == "final"]
    if len(finals) == 1:
        first = finals.iloc[0].winner_resolved
    else:
        first = (
            None  # error case (no finals match or multiple finals (buggy result file))
        )

    return first


def get_unique_participants(df: pd.DataFrame) -> list:
    participants = pd.concat([df.model_a, df.model_b]).unique().tolist()
    participants = [p for p in participants if p]  # remove None
    participants = sorted(participants)  # make it sorted
    return participants


def _impute_byes(df):
    max_depth = df.depth.max()

    # init
    imputed_parts = dict()
    for depth in range(max_depth + 1):
        imputed_parts[depth] = df[df.depth == depth].copy()

    # reverse
    for depth in range(max_depth, 0, -1):  # always we have 1 proper match for depth=0
        null_v_null = {
            "model_a": "",
            "model_b": "",
            "winner": "model_a",
            "match_order_in_round": "-",
            "depth": depth,
        }

        # fill some_model vs null byes
        players = get_unique_participants(imputed_parts[depth])
        proceeded = get_unique_participants(imputed_parts[depth - 1])
        imputed = []
        for p in proceeded:
            if p not in players:
                p_v_null = deepcopy(null_v_null)
                p_v_null["model_a"] = p
                imputed.append(p_v_null)
        imputed_parts[depth] = pd.concat(
            [
                imputed_parts[depth],
                pd.DataFrame(imputed),
            ],
            axis="index",
        )

        # fill null vs null
        n_null_v_null = 2 ** (depth) - len(imputed_parts[depth])
        if n_null_v_null > 0:
            imputed = pd.DataFrame([null_v_null] * n_null_v_null)
            imputed_parts[depth] = pd.concat(
                [
                    imputed_parts[depth],
                    imputed,
                ],
                axis="index",
            )

    df_imputed = pd.concat(imputed_parts.values(), axis="index")
    df_imputed = df_imputed.sort_values(by="depth").reset_index(drop=True)

    return df_imputed


def index_test_scenario(df) -> pd.DataFrame:
    df["inst_src"] = "inst: " + df.instruction + "\n\nsrc: " + df.source
    df["idx_inst_src"] = df.apply(
        lambda row: f"{row.tournament_idx}:\n{row.inst_src}", axis=1
    )

    # later used for tournament bracket backtrackiung
    if "depth" not in df.columns:
        mappings = {
            "final": 0,
            "semi-final": 1,
            "quarter-final": 2,
        }

        def _convert_round_to_depth(rnd: str, mappings=None) -> int:
            if rnd is None:
                depth = None
            elif rnd in mappings.keys():
                depth = mappings[rnd]
            elif rnd.startswith("round-"):  # assume perfect power of two
                num = int(rnd.replace("round-", "").strip())
                depth = log2_power_of_two(num) - 1
            return depth

        conv = partial(_convert_round_to_depth, mappings=mappings)
        df["depth"] = df["round"].apply(conv)

    return df


def init_tournament_dataframe(df, alpha2names: dict = None) -> pd.DataFrame:
    df = df.sort_values(by="depth").reset_index(drop=True)
    # make winner interpretable (A -> model_a, B -> model_b)
    df.winner = df.winner.apply(lambda txt: f"model_{txt.lower()}")

    # define alpha2names if not given (covers upto 168 participants)
    if alpha2names is None:
        alphabets = "ABCDEFGHIJKLMNOPQRSTUVWXYZ\
            abcdefghijklmnopqrstuvwxyz\
            ⓐⓑⓒⓓⓔⓕⓖⓗⓘⓙⓚⓛⓜⓝⓞⓟⓠⓡⓢⓣⓤⓥⓦⓧⓨⓩ\
                ㉠㉡㉢㉣㉤㉥㉦㉧㉨㉩㉪㉫㉬㉭\
                    ㉮㉯㉰㉱㉲㉳㉴㉵㉶㉷㉸㉹㉺㉻\
                        ㄱㄴㄷㄹㅁㅂㅅㅇㅈㅊㅋㅌㅍㅎ\
                        ΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ\
                            αβγδεζηθικλμνξοπρστυφχψω"
        model_full_names = get_unique_participants(df)
        alpha2names = dict(zip(alphabets, model_full_names))
        if len(alpha2names) < len(model_full_names):
            raise ValueError(
                f"Tournament viewer cannot visualize more than {len(alphabets)=} participants. ({len(model_full_names)=} is given)\n\nOther features will not be affected but the tournament visualizer."
            )
    names2alpha = dict(zip(alpha2names.values(), alpha2names.keys()))
    df = _impute_byes(df)

    # preserve readables for later
    df = _make_readables(df, names2alpha)

    if len(df[df["round"] == "final"]) != 1:
        raise ValueError(f"final match need to be one and only.")

    return df, alpha2names


def _make_readables(df, names2alpha):
    df["human_readable_model_a"] = df.model_a.copy()
    df["human_readable_model_b"] = df.model_b.copy()

    df.model_a = df.model_a.apply(
        lambda modelname: names2alpha[modelname] if modelname else "x"
    )
    df.model_b = df.model_b.apply(
        lambda modelname: names2alpha[modelname] if modelname else "x"
    )

    df["human_readable_idx"] = df.apply(
        lambda row: f"{row.name}: {row.human_readable_model_a} ({row.model_a}) vs. {row.human_readable_model_b} ({row.model_b if row.model_b else 'x'})",
        axis=1,
    )

    df["winner_resolved"] = df.apply(lambda row: row[row.winner], axis=1)
    df["winner_nodes"] = df.apply(
        lambda row: f"{row.winner_resolved}:{row.name}".ljust(4, " "), axis=1
    )  # later for figure representation of winner as a "node"
    return df


# draw
def draw(df: pd.DataFrame, alpha2names: dict = None) -> str:
    def _draw_round(
        df: pd.DataFrame,
        depth: int = None,
        winners_in_order: list = None,
    ) -> Tuple:
        df_now = df[df.depth == depth]
        max_depth = df.depth.max()

        width = 2 ** ((max_depth - depth) + 2)

        connect_left = "─" * (width)
        connect_left = connect_left[4:]
        connect_right = " " * (width)
        connect_right = "┐" + connect_right[1:]

        if winners_in_order is None:
            assert (
                depth == 0
            ), f"{winners_in_order=} is only allowed when drawing the top (=final match)"
            winners_in_order = df_now.winner_nodes

        round_drawing_parts = []
        descending_round_winners = []
        for node in winners_in_order:
            round_drawing_parts.append("".join([node, connect_left, connect_right]))
            # next round winners in sync with winner order
            row_now = df_now.query(f"winner_nodes=='{node}'")
            descending_round_winners.append(row_now.model_a.item())
            descending_round_winners.append(row_now.model_b.item())

        # find descending_round_winners within winner_nodes format (num:alpha)
        if depth == max_depth:
            pass  # keep the descending_round_winners intact
        else:
            df_descend = df[df.depth == depth + 1]
            for i, winner_alpha in enumerate(descending_round_winners):
                node_intr = df_descend.query(
                    f"winner_resolved=='{winner_alpha}'"
                ).winner_nodes.item()
                descending_round_winners[i] = node_intr

        round_drawing = "".join(round_drawing_parts)
        descending_unit = " " * width
        descending_unit = "│" + descending_unit[1:]
        descending_lines_parts = [descending_unit] * len(df_now) * 2
        descending_lines = "".join(descending_lines_parts)

        return round_drawing, descending_lines, descending_round_winners

    drawings = []
    winners_in_order = None
    max_depth = df.depth.max()
    for depth in range(max_depth + 1):
        max_depth = df.depth.max()
        winner_drw, lines_desc, winners_in_order = _draw_round(
            df,
            depth=depth,
            winners_in_order=winners_in_order,
        )
        drawings.append((winner_drw, lines_desc))
    # prepare bracket top
    champion_alphabet = drawings[0][0].split()[0].split(":")[0]
    champion_readable = alpha2names[champion_alphabet]
    bracket_top = [f"🥇winner: {champion_readable}", "│"]
    # prepare mid
    bracket_mid = "\n".join(["\n".join(tup) for tup in drawings])

    # prepare bot
    initial_participants = winners_in_order
    bracket_bot = (" " * 3).join(initial_participants)

    full_figure = "\n".join(bracket_top + [bracket_mid, bracket_bot])

    return full_figure


def number_breakdown_from_df(result_df: pd.DataFrame) -> str:
    n_models = len(get_unique_participants(result_df))
    size_testset = int(len(result_df) / (n_models - 1))
    interpretation = f"total {len(result_df)} matches = (n_models-1) * size_testset = ({n_models}-1) * {size_testset}"
    return interpretation, n_models, size_testset


def make_legend_str(df, alpha2names) -> str:
    first = get_1st(df, alpha2names)
    alpha2names = {k: v.replace("🥇 ", "") for k, v in alpha2names.items()}
    alpha_ordered = sorted(list(alpha2names.keys()))
    # names_ordered = sorted(list(alpha2names.values()))
    # name2alpha = {v: k for k, v in alpha2names.items()}

    for k, v in alpha2names.items():
        if v == alpha2names[first]:
            alpha2names[k] = f"🥇 {v}"
    res_str = f"\n\nlegend:"
    # for name in names_ordered:
    # alpha = name2alpha[name]
    for alpha in alpha_ordered:
        name_w_medal = alpha2names[alpha]
        res_str += f"\n{alpha}\t{name_w_medal}"
    return res_str


def main(
    jslname: str = "result.json",
):
    """
    테스트 코드
    """

    df = pd.read_json(jslname, orient="records")
    df = df.drop(columns=["tstamp", "logs"])
    df = index_test_scenario(df)

    # 중간에 visualization(df) 여기선 생략. 만약 이거 뺴고 다 따라했는데 문제가 생긴다면 viz 문제다. 근데 안그럴거같긴함
    selections = df.idx_inst_src.unique()
    for i, sel in enumerate(selections):
        try:
            df_now = df[df.idx_inst_src == sel]
            df_now_processed, _alpha2names = init_tournament_dataframe(
                df_now, alpha2names=alpha2names if "alpha2names" in dir() else None
            )
            if "alpha2names" not in dir():
                alpha2names = _alpha2names
                assert "alpha2names" in dir()
            bracket_drawing = draw(df_now_processed, alpha2names=alpha2names)
            legend = make_legend_str(df_now_processed, alpha2names)
            print(bracket_drawing + legend)
            print(bracket_drawing + legend, file=open(f"{i}.txt", "w"))
            print(f"\n\n{sel}", file=open(f"{i}.txt", "a"))

            for match_idx_human in df_now_processed.human_readable_idx:
                match_idx = int(match_idx_human.split(": ")[0])
                row = df_now_processed.loc[match_idx]
                winner = row.winner
        except Exception as e:
            print(e, file=open(f"{i}_err.txt", "w"))
            print("", file=open(f"{i}_err.txt", "a"))
            print(sel, file=open(f"{i}_err.txt", "a"))
            df_now_processed[
                [
                    "depth",
                    "round",
                    "winner_nodes",
                    "winner_resolved",
                    "winner",
                    "model_a",
                    "model_b",
                ]
            ].to_json(f"{i}_err.jsonl", lines=True, orient="records")


if __name__ == "__main__":
    Fire(main)