hynky HF staff commited on
Commit
2ce0cc2
1 Parent(s): 66a9bee

migrate to fsspec

Browse files
Files changed (1) hide show
  1. app.py +45 -35
app.py CHANGED
@@ -9,12 +9,8 @@ import numpy as np
9
  from datetime import datetime
10
 
11
  import gradio as gr
12
- import huggingface_hub
13
  import pandas as pd
14
- import plotly.graph_objects as go
15
- from huggingface_hub.file_download import repo_folder_name
16
- from huggingface_hub.hf_api import RepoFile
17
- from huggingface_hub.utils import EntryNotFoundError
18
 
19
  FALLBACK_TOKEN_NAME = "HF_TOKEN"
20
 
@@ -41,20 +37,20 @@ def get_run_name_seed(run_name):
41
  run_name, seed = run_name.split("-seed-")
42
  return run_name, int(seed)
43
 
44
- def fetch_repo_structure(repo_name, oauth_token: gr.OAuthToken | None = None):
45
  token = os.environ.get(FALLBACK_TOKEN_NAME)
46
  if oauth_token:
47
  token = oauth_token.token
48
 
49
- files = list(huggingface_hub.list_repo_tree(repo_name, "details", recursive=False, token=token))
50
-
51
- runs = {file.path.split('/')[-1] for file in files if isinstance(file, huggingface_hub.hf_api.RepoFolder)}
52
  if not runs:
53
  return {}, gr.update(choices=[], value=None)
54
 
55
  def process_run(run):
56
- run_files = list(huggingface_hub.list_repo_tree(repo_name, f"details/{run}", recursive=False, token=token))
57
- return run, [file.path.split('/')[-1] for file in run_files if isinstance(file, huggingface_hub.hf_api.RepoFolder)]
58
 
59
  with ThreadPoolExecutor() as executor:
60
  results = list(executor.map(process_run, runs))
@@ -86,14 +82,16 @@ def select_runs_by_language(runs, current_selected, language):
86
  return select_runs_by_regex(runs, current_selected, f".*-{language}-.*")
87
  return current_selected
88
 
89
- def fetch_available_tasks(repo_name, runs_to_fetch, checkpoint) -> dict[str, dict[str, str]]:
90
  token = os.environ.get(FALLBACK_TOKEN_NAME)
91
 
 
92
  all_tasks = defaultdict(lambda: defaultdict(dict))
 
93
  for run in runs_to_fetch:
94
  try:
95
- files = huggingface_hub.list_repo_tree(repo_name, f"details/{run}/{checkpoint}", token=token)
96
- parquet_files = [f.path.split('/')[-1] for f in files if f.path.endswith('.parquet')]
97
 
98
  for full_filename in parquet_files:
99
  task_name, date_str = full_filename.replace('.parquet', '').rsplit('_', 1)
@@ -101,8 +99,10 @@ def fetch_available_tasks(repo_name, runs_to_fetch, checkpoint) -> dict[str, dic
101
 
102
  if run not in all_tasks[task_name] or date > all_tasks[task_name][run]['date']:
103
  all_tasks[task_name][run] = {'filename': full_filename, 'date': date}
104
- except EntryNotFoundError:
105
  print(f"Checkpoint not found for run: {run}")
 
 
106
 
107
  available_tasks = {
108
  task: {run: info['filename'] for run, info in runs.items()}
@@ -112,17 +112,17 @@ def fetch_available_tasks(repo_name, runs_to_fetch, checkpoint) -> dict[str, dic
112
 
113
  return available_tasks
114
 
115
- def fetch_run_results(repo_name, runs_to_fetch, checkpoint,
116
  oauth_token: gr.OAuthToken | None = None, progress=gr.Progress()):
117
 
118
- task_runs_dict = fetch_available_tasks(repo_name, runs_to_fetch, checkpoint)
119
  task_names = list(task_runs_dict.keys())
120
  return gr.update(choices=task_names, value=task_names[0] if task_names else None), task_runs_dict
121
 
122
 
123
  def render_table(df, selected_runs, metric_names):
124
  if df is None or not selected_runs or not metric_names:
125
- return None
126
  kept_metrics = [f"metric_{metric_name}_{run_name}" for run_name in selected_runs for metric_name in metric_names]
127
  other_metrics = [col for col in df.columns if col.startswith(f"metric_") and col not in kept_metrics]
128
  df = df.drop(columns=other_metrics)
@@ -130,8 +130,9 @@ def render_table(df, selected_runs, metric_names):
130
  df = shorten_column_names(df, selected_runs, metric_names)
131
 
132
  # Sample 100
 
133
  df = df.sample(n=min(100, len(df)), random_state=42)
134
- return df
135
 
136
  def get_column_widths(df):
137
  column_widths = []
@@ -170,19 +171,25 @@ def shorten_column_names(df, run_names: list[str], metric_names: list[str]):
170
  return df
171
 
172
 
173
- def load_task_data(repo_name, runs_to_fetch, checkpoint, task_name, tasks_files, progress=gr.Progress()):
174
  token = os.environ.get(FALLBACK_TOKEN_NAME)
175
  if not runs_to_fetch or not task_name:
176
  return None, None, None
177
 
 
 
 
 
 
 
178
  def fetch_run_file(run_to_fetch):
179
  file_path = f"details/{run_to_fetch}/{checkpoint}/{tasks_files[task_name][run_to_fetch]}"
180
  try:
181
- cached_path = huggingface_hub.hf_hub_download(repo_name, file_path, token=token)
182
- df = pd.read_parquet(cached_path)
183
  return df, run_to_fetch
184
- except EntryNotFoundError:
185
- print(f"File not found: {file_path}")
186
  return None, run_to_fetch
187
 
188
  with ThreadPoolExecutor() as pool:
@@ -245,7 +252,7 @@ def load_task_data(repo_name, runs_to_fetch, checkpoint, task_name, tasks_files,
245
  # Join all prepared DataFrames
246
  for df, run_name in zip(dfs, run_names):
247
  prepared_df = prepare_df(df, run_name, task_type)
248
- combined_df = combined_df.join(prepared_df, how='outer', )
249
 
250
 
251
  available_metrics = list(set("_".join(col.split('_')[1:-1]) for col in combined_df.columns if col.startswith("metric_")))
@@ -259,7 +266,7 @@ with gr.Blocks() as demo:
259
  results_df_full = gr.State(None)
260
  tasks_files = gr.State({})
261
  login_button = gr.LoginButton(visible=False)
262
- repo = gr.Textbox(label="HF Repo", value="HuggingFaceFW-Dev/multiligual-ablation-logs-dev", visible=True)
263
  with gr.Column():
264
  gr.Markdown("# FineWeb experiments results explorer")
265
  with gr.Row():
@@ -277,11 +284,14 @@ with gr.Blocks() as demo:
277
  task_name = gr.Dropdown(choices=[], interactive=True, label="Task name")
278
  metric_names = gr.Dropdown(choices=[], interactive=True, multiselect=True, label="Metric")
279
  results_df = gr.Dataframe(interactive=False, wrap=True)
 
 
 
280
 
281
  # Run selection
282
  gr.on(
283
- triggers=[repo.change],
284
- fn=fetch_repo_structure, inputs=[repo], outputs=[runs_checkpoints, selected_runs],
285
  )
286
  gr.on(
287
  triggers=[select_by_regex_button.click],
@@ -306,37 +316,37 @@ with gr.Blocks() as demo:
306
  gr.on(
307
  triggers=[fetch_res.click],
308
  fn=fetch_run_results,
309
- inputs=[repo, selected_runs, checkpoint],
310
  outputs=[task_name, tasks_files]
311
  ).then(
312
  fn=load_task_data,
313
- inputs=[repo, selected_runs, checkpoint, task_name, tasks_files],
314
  outputs=[results_df_full, metric_names]
315
  ).then(
316
  fn=render_table,
317
  inputs=[results_df_full, selected_runs, metric_names],
318
- outputs=[results_df]
319
  )
320
 
321
  # Update results when task name or metric changes
322
  gr.on(
323
  triggers=[task_name.input],
324
  fn=load_task_data,
325
- inputs=[repo, selected_runs, checkpoint, task_name, tasks_files],
326
  outputs=[results_df_full, metric_names]
327
  ).then(
328
  fn=render_table,
329
  inputs=[results_df_full, selected_runs, metric_names],
330
- outputs=[results_df]
331
  )
332
 
333
  gr.on(
334
  triggers=[metric_names.input],
335
  fn=render_table,
336
  inputs=[results_df_full, selected_runs, metric_names],
337
- outputs=[results_df]
338
  )
339
 
340
- demo.load(fn=fetch_repo_structure, inputs=[repo], outputs=[runs_checkpoints, selected_runs])
341
 
342
  demo.launch()
 
9
  from datetime import datetime
10
 
11
  import gradio as gr
 
12
  import pandas as pd
13
+ from datatrove.io import DataFolder
 
 
 
14
 
15
  FALLBACK_TOKEN_NAME = "HF_TOKEN"
16
 
 
37
  run_name, seed = run_name.split("-seed-")
38
  return run_name, int(seed)
39
 
40
+ def fetch_repo_structure(results_uri, oauth_token: gr.OAuthToken | None = None):
41
  token = os.environ.get(FALLBACK_TOKEN_NAME)
42
  if oauth_token:
43
  token = oauth_token.token
44
 
45
+ data_folder = DataFolder(results_uri, token=token)
46
+ runs = [f.removeprefix("details/") for f in data_folder.list_files("details", recursive=False, include_directories=True) if f != "details"]
47
+
48
  if not runs:
49
  return {}, gr.update(choices=[], value=None)
50
 
51
  def process_run(run):
52
+ run_files = [f.removeprefix(f"details/{run}/") for f in data_folder.list_files(f"details/{run}", recursive=False, include_directories=True) if f != f"details/{run}"]
53
+ return run, run_files
54
 
55
  with ThreadPoolExecutor() as executor:
56
  results = list(executor.map(process_run, runs))
 
82
  return select_runs_by_regex(runs, current_selected, f".*-{language}-.*")
83
  return current_selected
84
 
85
+ def fetch_available_tasks(results_uri, runs_to_fetch, checkpoint) -> dict[str, dict[str, str]]:
86
  token = os.environ.get(FALLBACK_TOKEN_NAME)
87
 
88
+ data_folder = DataFolder(results_uri, token=token)
89
  all_tasks = defaultdict(lambda: defaultdict(dict))
90
+
91
  for run in runs_to_fetch:
92
  try:
93
+ files = data_folder.list_files(f"details/{run}/{checkpoint}", recursive=False)
94
+ parquet_files = [f.split("/")[-1] for f in files if f.endswith('.parquet')]
95
 
96
  for full_filename in parquet_files:
97
  task_name, date_str = full_filename.replace('.parquet', '').rsplit('_', 1)
 
99
 
100
  if run not in all_tasks[task_name] or date > all_tasks[task_name][run]['date']:
101
  all_tasks[task_name][run] = {'filename': full_filename, 'date': date}
102
+ except FileNotFoundError:
103
  print(f"Checkpoint not found for run: {run}")
104
+
105
+ print(all_tasks)
106
 
107
  available_tasks = {
108
  task: {run: info['filename'] for run, info in runs.items()}
 
112
 
113
  return available_tasks
114
 
115
+ def fetch_run_results(results_uri, runs_to_fetch, checkpoint,
116
  oauth_token: gr.OAuthToken | None = None, progress=gr.Progress()):
117
 
118
+ task_runs_dict = fetch_available_tasks(results_uri, runs_to_fetch, checkpoint)
119
  task_names = list(task_runs_dict.keys())
120
  return gr.update(choices=task_names, value=task_names[0] if task_names else None), task_runs_dict
121
 
122
 
123
  def render_table(df, selected_runs, metric_names):
124
  if df is None or not selected_runs or not metric_names:
125
+ return None, "0"
126
  kept_metrics = [f"metric_{metric_name}_{run_name}" for run_name in selected_runs for metric_name in metric_names]
127
  other_metrics = [col for col in df.columns if col.startswith(f"metric_") and col not in kept_metrics]
128
  df = df.drop(columns=other_metrics)
 
130
  df = shorten_column_names(df, selected_runs, metric_names)
131
 
132
  # Sample 100
133
+ n_samples = len(df)
134
  df = df.sample(n=min(100, len(df)), random_state=42)
135
+ return df, n_samples
136
 
137
  def get_column_widths(df):
138
  column_widths = []
 
171
  return df
172
 
173
 
174
+ def load_task_data(results_uri, runs_to_fetch, checkpoint, task_name, tasks_files, progress=gr.Progress()):
175
  token = os.environ.get(FALLBACK_TOKEN_NAME)
176
  if not runs_to_fetch or not task_name:
177
  return None, None, None
178
 
179
+
180
+ print(runs_to_fetch)
181
+
182
+ data_folder = DataFolder(f"filecache::{results_uri}", token=token, cache_storage="./results-cache")
183
+ print(tasks_files)
184
+
185
  def fetch_run_file(run_to_fetch):
186
  file_path = f"details/{run_to_fetch}/{checkpoint}/{tasks_files[task_name][run_to_fetch]}"
187
  try:
188
+ with data_folder.open(file_path, "rb") as f:
189
+ df = pd.read_parquet(f)
190
  return df, run_to_fetch
191
+ except FileNotFoundError:
192
+ print(f"File not found: {tasks_files[task_name][run_to_fetch]}")
193
  return None, run_to_fetch
194
 
195
  with ThreadPoolExecutor() as pool:
 
252
  # Join all prepared DataFrames
253
  for df, run_name in zip(dfs, run_names):
254
  prepared_df = prepare_df(df, run_name, task_type)
255
+ combined_df = combined_df.join(prepared_df, how='outer')
256
 
257
 
258
  available_metrics = list(set("_".join(col.split('_')[1:-1]) for col in combined_df.columns if col.startswith("metric_")))
 
266
  results_df_full = gr.State(None)
267
  tasks_files = gr.State({})
268
  login_button = gr.LoginButton(visible=False)
269
+ results_uri = gr.Textbox(label="Results URI", value="s3://fineweb-multilingual-v1/evals/test/", visible=True)
270
  with gr.Column():
271
  gr.Markdown("# FineWeb experiments results explorer")
272
  with gr.Row():
 
284
  task_name = gr.Dropdown(choices=[], interactive=True, label="Task name")
285
  metric_names = gr.Dropdown(choices=[], interactive=True, multiselect=True, label="Metric")
286
  results_df = gr.Dataframe(interactive=False, wrap=True)
287
+ with gr.Row():
288
+ with gr.Column():
289
+ num_samples = gr.Text(interactive=False, label="# Samples")
290
 
291
  # Run selection
292
  gr.on(
293
+ triggers=[results_uri.change],
294
+ fn=fetch_repo_structure, inputs=[results_uri], outputs=[runs_checkpoints, selected_runs],
295
  )
296
  gr.on(
297
  triggers=[select_by_regex_button.click],
 
316
  gr.on(
317
  triggers=[fetch_res.click],
318
  fn=fetch_run_results,
319
+ inputs=[results_uri, selected_runs, checkpoint],
320
  outputs=[task_name, tasks_files]
321
  ).then(
322
  fn=load_task_data,
323
+ inputs=[results_uri, selected_runs, checkpoint, task_name, tasks_files],
324
  outputs=[results_df_full, metric_names]
325
  ).then(
326
  fn=render_table,
327
  inputs=[results_df_full, selected_runs, metric_names],
328
+ outputs=[results_df, num_samples]
329
  )
330
 
331
  # Update results when task name or metric changes
332
  gr.on(
333
  triggers=[task_name.input],
334
  fn=load_task_data,
335
+ inputs=[results_uri, selected_runs, checkpoint, task_name, tasks_files],
336
  outputs=[results_df_full, metric_names]
337
  ).then(
338
  fn=render_table,
339
  inputs=[results_df_full, selected_runs, metric_names],
340
+ outputs=[results_df, num_samples]
341
  )
342
 
343
  gr.on(
344
  triggers=[metric_names.input],
345
  fn=render_table,
346
  inputs=[results_df_full, selected_runs, metric_names],
347
+ outputs=[results_df, num_samples]
348
  )
349
 
350
+ demo.load(fn=fetch_repo_structure, inputs=[results_uri], outputs=[runs_checkpoints, selected_runs])
351
 
352
  demo.launch()