# /// script # requires-python = ">=3.11" # dependencies = [ # "httpx", # "huggingface_hub", # ] # /// """ Scheduled job: regenerate data.json and upload to the benchmark-race Space. Run locally: uv run update_data.py Schedule on HF Jobs (twice daily): hf jobs scheduled uv run "0 8,20 * * *" \ --secrets HF_TOKEN \ https://huggingface.co/spaces/davanstrien/benchmark-race/resolve/main/update_data.py """ import json import os import re import tempfile from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path import httpx from huggingface_hub import HfApi SPACE_REPO = "davanstrien/benchmark-race" # Benchmarks are auto-discovered from datasets tagged `benchmark:official` on # the Hub. The originals get keys preserved so the UI's hardcoded default # (`sweVerified` in index.html) keeps working; new benchmarks get # slugified keys and a name from cardData.pretty_name (or basename). OVERRIDES = { "SWE-bench/SWE-bench_Verified": ("sweVerified", "SWE-bench Verified"), "ScaleAI/SWE-bench_Pro": ("swePro", "SWE-bench Pro"), "TIGER-Lab/MMLU-Pro": ("mmluPro", "MMLU-Pro"), "Idavidrein/gpqa": ("gpqa", "GPQA Diamond"), "cais/hle": ("hle", "HLE"), "MathArena/aime_2026": ("aime2026", "AIME 2026"), "MathArena/hmmt_feb_2026": ("hmmt2026", "HMMT Feb 2026"), "allenai/olmOCR-bench": ("olmOcr", "olmOCR-bench"), "harborframework/terminal-bench-2.0": ("terminalBench", "Terminal-Bench 2.0"), "FutureMa/EvasionBench": ("evasionBench", "EvasionBench"), } MIN_MODELS = 2 def slugify(dataset_id: str) -> str: base = dataset_id.split("/")[-1] s = re.sub(r"[^a-zA-Z0-9]+", "_", base).strip("_") return s or dataset_id.replace("/", "_") def discover_benchmarks(hf_token: str | None) -> list[dict]: """Fetch every benchmark:official dataset with a usable leaderboard.""" print("Discovering official benchmarks...") resp = httpx.get( "https://huggingface.co/api/datasets", params={"filter": "benchmark:official", "limit": 500}, timeout=30, ) resp.raise_for_status() datasets = resp.json() print(f" found {len(datasets)} datasets with benchmark:official tag") configs = [] for d in datasets: did = d["id"] try: info = httpx.get(f"https://huggingface.co/api/datasets/{did}", timeout=15).json() except Exception as e: print(f" {did}: skipped (info fetch failed: {e})") continue gated = bool(info.get("gated")) card = info.get("cardData") or {} if did in OVERRIDES: key, pretty = OVERRIDES[did] else: key = slugify(did) pretty = card.get("pretty_name") or did.split("/")[-1] headers = {"Authorization": f"Bearer {hf_token}"} if (gated and hf_token) else {} if gated and not hf_token: print(f" {did}: skipped (gated, no token)") continue try: lb = httpx.get( f"https://huggingface.co/api/datasets/{did}/leaderboard", headers=headers, timeout=30, ) except Exception as e: print(f" {did}: skipped (leaderboard fetch failed: {e})") continue if lb.status_code != 200: print(f" {did}: skipped (status {lb.status_code})") continue rows = lb.json() if not isinstance(rows, list) or len(rows) < MIN_MODELS: print(f" {did}: skipped (only {len(rows) if isinstance(rows, list) else '?'} rows)") continue lower_is_better = False for r in rows: if isinstance(r, dict) and "lower_is_better" in r: lower_is_better = bool(r["lower_is_better"]) break configs.append({ "dataset": did, "key": key, "name": pretty, "gated": gated, "lower_is_better": lower_is_better, }) print(f" {did} -> {key} ({len(rows)} rows, lower_is_better={lower_is_better})") return configs PALETTE = [ "#6366f1", "#0d9488", "#d97706", "#e11d48", "#7c3aed", "#16a34a", "#2563eb", "#ea580c", "#8b5cf6", "#0891b2", "#c026d3", "#65a30d", "#dc2626", "#0284c7", "#a21caf", "#059669", "#9333ea", "#ca8a04", "#be185d", "#0369a1", ] def fetch_leaderboard(config: dict, hf_token: str | None) -> list[dict]: url = f"https://huggingface.co/api/datasets/{config['dataset']}/leaderboard" headers = {} if config["gated"] and hf_token: headers["Authorization"] = f"Bearer {hf_token}" elif config["gated"]: print(f" {config['name']}: skipped (gated, no token)") return [] print(f" {config['name']}: fetching scores...") try: resp = httpx.get(url, headers=headers, timeout=30) if resp.status_code != 200: print(f" skip (status {resp.status_code})") return [] data = resp.json() if not isinstance(data, list): return [] except Exception as e: print(f" error: {e}") return [] lower = config.get("lower_is_better", False) seen: dict[str, float] = {} for entry in data: if not isinstance(entry, dict): continue model_id = entry.get("modelId") score = entry.get("value") if model_id and score is not None: try: score = float(score) except (TypeError, ValueError): continue if model_id not in seen: seen[model_id] = score elif (lower and score < seen[model_id]) or (not lower and score > seen[model_id]): seen[model_id] = score print(f" {len(seen)} models") return [{"model_id": mid, "score": s} for mid, s in seen.items()] def fetch_model_dates(model_ids: list[str], hf_token: str | None) -> dict[str, dict]: api = HfApi() results = {} def _get_info(mid): try: info = api.model_info(mid, token=hf_token) params_b = None if info.safetensors and hasattr(info.safetensors, "total"): params_b = round(info.safetensors.total / 1_000_000_000, 1) if params_b is None: m = re.findall(r"[-_/](\d+\.?\d*)[Bb](?:[-_/]|$)", mid) if m: params_b = max(float(x) for x in m) is_quantized = any(t.startswith("base_model:quantized:") for t in (info.tags or [])) return mid, info.created_at.strftime("%Y-%m-%d"), params_b, is_quantized except Exception: return mid, None, None, False with ThreadPoolExecutor(max_workers=8) as pool: futures = {pool.submit(_get_info, mid): mid for mid in model_ids} for f in as_completed(futures): mid, date, params, is_quantized = f.result() if date: results[mid] = {"date": date, "parameters_b": params, "is_quantized": is_quantized} return results def fetch_logo(provider: str) -> str | None: try: resp = httpx.get( f"https://huggingface.co/api/organizations/{provider}/avatar", timeout=5, ) if resp.status_code == 200: return resp.json().get("avatarUrl") except Exception: pass return None def fetch_all_logos(providers: set[str]) -> dict[str, str]: logos = {} with ThreadPoolExecutor(max_workers=8) as pool: futures = {pool.submit(fetch_logo, p): p for p in providers} for f in as_completed(futures): p = futures[f] url = f.result() if url: logos[p] = url return logos def main(): hf_token = os.environ.get("HF_TOKEN") print("Generating data.json for bar chart race\n") benchmark_configs = discover_benchmarks(hf_token) print(f"\n{len(benchmark_configs)} usable benchmarks\n") all_scores: dict[str, dict] = {} all_model_ids: set[str] = set() for config in benchmark_configs: rows = fetch_leaderboard(config, hf_token) if rows: all_scores[config["key"]] = { "name": config["name"], "dataset": config["dataset"], "lower_is_better": config["lower_is_better"], "rows": rows, } all_model_ids.update(r["model_id"] for r in rows) print(f"\n{len(all_model_ids)} unique models across {len(all_scores)} benchmarks") print("Fetching model dates...") model_dates = fetch_model_dates(list(all_model_ids), hf_token) print(f" got dates for {len(model_dates)}/{len(all_model_ids)} models") all_providers: set[str] = set() benchmarks = {} for key, info in all_scores.items(): models = [] for row in info["rows"]: mid = row["model_id"] if mid not in model_dates: continue if model_dates[mid].get("is_quantized"): continue provider = mid.split("/")[0] if "/" in mid else mid short_name = mid.split("/")[-1] all_providers.add(provider) models.append({ "model_id": mid, "short_name": short_name, "provider": provider, "score": round(row["score"], 2), "date": model_dates[mid]["date"], }) if len(models) >= MIN_MODELS: benchmarks[key] = { "name": info["name"], "dataset": info["dataset"], "lower_is_better": info["lower_is_better"], "models": models, } print(f"\nFetching logos for {len(all_providers)} providers...") logos = fetch_all_logos(all_providers) print(f" got {len(logos)} logos") color_map = {} for i, provider in enumerate(sorted(all_providers)): color_map[provider] = PALETTE[i % len(PALETTE)] output = { "benchmarks": benchmarks, "logos": logos, "colors": color_map, "generated_at": datetime.now(timezone.utc).isoformat(), } data_json = json.dumps(output, indent=2) print(f"\nGenerated {len(data_json) / 1024:.1f} KB") for key, bm in benchmarks.items(): print(f" {bm['name']}: {len(bm['models'])} models") # Upload to Space print(f"\nUploading data.json to {SPACE_REPO}...") api = HfApi() with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: f.write(data_json) tmp_path = f.name try: api.upload_file( path_or_fileobj=tmp_path, path_in_repo="data.json", repo_id=SPACE_REPO, repo_type="space", commit_message=f"Update data.json ({datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')})", ) print("Done!") finally: Path(tmp_path).unlink(missing_ok=True) if __name__ == "__main__": main()