from typing import List, Dict import httpx import gradio as gr import pandas as pd from huggingface_hub import HfApi, ModelCard, snapshot_download import base64 import io import zipfile import asyncio import aiohttp from pathlib import Path import emoji import tempfile import shutil import os def search_hub(query: str, search_type: str) -> pd.DataFrame: api = HfApi() if search_type == "Models": results = api.list_models(search=query) data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results] elif search_type == "Datasets": results = api.list_datasets(search=query) data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results] elif search_type == "Spaces": results = api.list_spaces(search=query) data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results] else: data = [] for i, item in enumerate(data, 1): item['number'] = i item['formatted_link'] = format_link(item, i, search_type) return pd.DataFrame(data) def format_link(item: Dict, number: int, search_type: str) -> str: link = item['link'] readme_link = f"{link}/blob/main/README.md" title = f"{number}. {item['id']}" metadata = f"Author: {item['author']}" if 'downloads' in item: metadata += f", Downloads: {item['downloads']}" html = f"""
""" return html async def download_readme(session: aiohttp.ClientSession, item: Dict) -> tuple[str, str]: """Download README.md file for a given item.""" item_id = item['id'] raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md" try: async with session.get(raw_url) as response: if response.status == 200: content = await response.text() return item_id.replace('/', '_'), content return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nStatus code: {response.status}" except Exception as e: return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}" async def download_all_readmes(data: List[Dict]) -> tuple[str, str]: """Download all README files and create a zip archive.""" if not data: return "", "No results to download" zip_buffer = io.BytesIO() status_message = "Downloading READMEs..." async with aiohttp.ClientSession() as session: tasks = [download_readme(session, item) for item in data] results = await asyncio.gather(*tasks) with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for filename, content in results: zip_file.writestr(f"{filename}.md", content) zip_buffer.seek(0) base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() download_link = f""" """ return download_link, "READMEs ready for download!" def download_repository(repo_id: str, repo_type: str, temp_dir: str) -> str: """Download a single repository.""" try: repo_path = snapshot_download( repo_id=repo_id, repo_type=repo_type.lower()[:-1], # Remove 's' from 'Models'/'Datasets'/'Spaces' local_dir=os.path.join(temp_dir, repo_id.replace('/', '_')), ignore_patterns=["*.bin", "*.pt", "*.pth", "*.ckpt", "*.safetensors"] # Ignore large binary files ) return repo_path except Exception as e: print(f"Error downloading {repo_id}: {str(e)}") return None def create_repo_zip(data: List[Dict], search_type: str) -> tuple[str, str]: """Download repositories and create a zip archive.""" if not data: return "", "No repositories to download" # Create temporary directory with tempfile.TemporaryDirectory() as temp_dir: successful_downloads = [] # Download each repository for item in data: repo_path = download_repository(item['id'], search_type, temp_dir) if repo_path: successful_downloads.append(repo_path) if not successful_downloads: return "", "No repositories were successfully downloaded" # Create zip file in memory zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for repo_path in successful_downloads: repo_name = os.path.basename(repo_path) for root, _, files in os.walk(repo_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.join(repo_name, os.path.relpath(file_path, repo_path)) zip_file.write(file_path, arcname) # Convert to base64 zip_buffer.seek(0) base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() download_link = f""" """ return download_link, f"Successfully downloaded {len(successful_downloads)} repositories" def display_results(df: pd.DataFrame): if df is not None and not df.empty: html = "No results found.
" def SwarmyTime(data: List[Dict]) -> Dict: """Aggregates all content from the given data.""" aggregated = { "total_items": len(data), "unique_authors": set(), "total_downloads": 0, "item_types": {"Models": 0, "Datasets": 0, "Spaces": 0} } for item in data: aggregated["unique_authors"].add(item.get("author", "Unknown")) aggregated["total_downloads"] += item.get("downloads", 0) if "modelId" in item: aggregated["item_types"]["Models"] += 1 elif "dataset" in item.get("id", ""): aggregated["item_types"]["Datasets"] += 1 else: aggregated["item_types"]["Spaces"] += 1 aggregated["unique_authors"] = len(aggregated["unique_authors"]) return aggregated with gr.Blocks() as demo: gr.Markdown("## Search the Hugging Face Hub") with gr.Row(): search_query = gr.Textbox(label="Search Query", value="awacke1") search_type = gr.Radio(["Models", "Datasets", "Spaces"], label="Search Type", value="Models") search_button = gr.Button("Search") results_html = gr.HTML(label="Search Results") with gr.Row(): download_readme_button = gr.Button("📚 Download READMEs", visible=False) download_repo_button = gr.Button("📦 Download Repositories", visible=False) download_status = gr.Markdown("", label="Download Status") download_area = gr.HTML("", label="Download Link") metadata_output = gr.Textbox(label="Metadata", lines=10) aggregated_output = gr.JSON(label="Aggregated Content") search_type_state = gr.State("") current_results = gr.State([]) def search_and_aggregate(query, search_type): df = search_hub(query, search_type) data = df.to_dict('records') aggregated = SwarmyTime(data) html_results = display_results(df) show_download = len(data) > 0 return [ html_results, # results_html show_download, # download_readme_button visible show_download, # download_repo_button visible "", # download_status "", # download_area aggregated, # aggregated_output search_type, # search_type_state data # current_results ] async def handle_readme_download(data): if not data: return ["No results to download", ""] download_link, status = await download_all_readmes(data) return [status, download_link] def handle_repo_download(data, search_type): if not data: return ["No results to download", ""] download_link, status = create_repo_zip(data, search_type) return [status, download_link] search_button.click( search_and_aggregate, inputs=[search_query, search_type], outputs=[ results_html, download_readme_button, download_repo_button, download_status, download_area, aggregated_output, search_type_state, current_results ] ) download_readme_button.click( handle_readme_download, inputs=[current_results], outputs=[download_status, download_area] ) download_repo_button.click( handle_repo_download, inputs=[current_results, search_type_state], outputs=[download_status, download_area] ) demo.launch(debug=True)