from typing import List, Dict import httpx import gradio as gr import pandas as pd from huggingface_hub import HfApi, ModelCard, snapshot_download import base64 import io import zipfile import asyncio import aiohttp from pathlib import Path import emoji import tempfile import shutil import os def search_hub(query: str, search_type: str) -> pd.DataFrame: api = HfApi() if search_type == "Models": results = api.list_models(search=query) data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results] elif search_type == "Datasets": results = api.list_datasets(search=query) data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results] elif search_type == "Spaces": results = api.list_spaces(search=query) data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results] else: data = [] for i, item in enumerate(data, 1): item['number'] = i item['formatted_link'] = format_link(item, i, search_type) return pd.DataFrame(data) def format_link(item: Dict, number: int, search_type: str) -> str: link = item['link'] readme_link = f"{link}/blob/main/README.md" title = f"{number}. {item['id']}" metadata = f"Author: {item['author']}" if 'downloads' in item: metadata += f", Downloads: {item['downloads']}" html = f"""
""" return html async def download_readme(session: aiohttp.ClientSession, item: Dict) -> tuple[str, str]: """Download README.md file for a given item.""" item_id = item['id'] raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md" try: async with session.get(raw_url) as response: if response.status == 200: content = await response.text() return item_id.replace('/', '_'), content return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nStatus code: {response.status}" except Exception as e: return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}" async def download_all_readmes(data: List[Dict]) -> tuple[str, str]: """Download all README files and create a zip archive.""" if not data: return "", "No results to download" zip_buffer = io.BytesIO() status_message = "Downloading READMEs..." async with aiohttp.ClientSession() as session: tasks = [download_readme(session, item) for item in data] results = await asyncio.gather(*tasks) with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for filename, content in results: zip_file.writestr(f"{filename}.md", content) zip_buffer.seek(0) base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() download_link = f""" """ return download_link, "READMEs ready for download!" def download_repository(repo_id: str, repo_type: str, temp_dir: str) -> str: """Download a single repository.""" try: repo_path = snapshot_download( repo_id=repo_id, repo_type=repo_type.lower()[:-1], # Remove 's' from 'Models'/'Datasets'/'Spaces' local_dir=os.path.join(temp_dir, repo_id.replace('/', '_')), ignore_patterns=["*.bin", "*.pt", "*.pth", "*.ckpt", "*.safetensors"] # Ignore large binary files ) return repo_path except Exception as e: print(f"Error downloading {repo_id}: {str(e)}") return None def create_repo_zip(data: List[Dict], search_type: str) -> tuple[str, str]: """Download repositories and create a zip archive.""" if not data: return "", "No repositories to download" # Create temporary directory with tempfile.TemporaryDirectory() as temp_dir: successful_downloads = [] # Download each repository for item in data: repo_path = download_repository(item['id'], search_type, temp_dir) if repo_path: successful_downloads.append(repo_path) if not successful_downloads: return "", "No repositories were successfully downloaded" # Create zip file in memory zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for repo_path in successful_downloads: repo_name = os.path.basename(repo_path) for root, _, files in os.walk(repo_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.join(repo_name, os.path.relpath(file_path, repo_path)) zip_file.write(file_path, arcname) # Convert to base64 zip_buffer.seek(0) base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() download_link = f""" """ return download_link, f"Successfully downloaded {len(successful_downloads)} repositories" def display_results(df: pd.DataFrame): if df is not None and not df.empty: html = "No results found.
" def SwarmyTime(data: List[Dict]) -> Dict: """Aggregates all content from the given data.""" aggregated = { "total_items": len(data), "unique_authors": set(), "total_downloads": 0, "item_types": {"Models": 0, "Datasets": 0, "Spaces": 0} } for item in data: aggregated["unique_authors"].add(item.get("author", "Unknown")) aggregated["total_downloads"] += item.get("downloads", 0) if "modelId" in item: aggregated["item_types"]["Models"] += 1 elif "dataset" in item.get("id", ""): aggregated["item_types"]["Datasets"] += 1 else: aggregated["item_types"]["Spaces"] += 1 aggregated["unique_authors"] = len(aggregated["unique_authors"]) return aggregated with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(""" # Search the Hugging Face Hub Search and download models, datasets, and spaces from Hugging Face. """) with gr.Row(): with gr.Column(scale=3): search_query = gr.Textbox( label="Search Query", value="awacke1", placeholder="Enter search term..." ) with gr.Column(scale=2): search_type = gr.Radio( ["Models", "Datasets", "Spaces"], label="Search Type", value="Models", container=True ) with gr.Column(scale=1): search_button = gr.Button("🔍 Search", variant="primary", scale=1) with gr.Row(variant="panel"): with gr.Column(scale=1): gr.Markdown("### Download Options") with gr.Row(): download_readme_button = gr.Button( "📚 Download READMEs", variant="secondary", ) download_repo_button = gr.Button( "📦 Download Repositories", variant="secondary", ) download_status = gr.Markdown("Status: Ready to download", label="Status") download_area = gr.HTML("", label="Download Link") with gr.Row(): with gr.Column(scale=2): results_html = gr.HTML(label="Search Results") with gr.Column(scale=1): aggregated_output = gr.JSON(label="Search Statistics") search_type_state = gr.State("") current_results = gr.State([]) def search_and_aggregate(query, search_type): df = search_hub(query, search_type) data = df.to_dict('records') aggregated = SwarmyTime(data) html_results = display_results(df) return [ html_results, # results_html "Status: Ready to download", # download_status "", # download_area aggregated, # aggregated_output search_type, # search_type_state data # current_results ] async def handle_readme_download(data): if not data: return ["Status: No results to download", ""] download_link, status = await download_all_readmes(data) return [f"Status: {status}", download_link] def handle_repo_download(data, search_type): if not data: return ["Status: No results to download", ""] download_link, status = create_repo_zip(data, search_type) return [f"Status: {status}", download_link] search_button.click( search_and_aggregate, inputs=[search_query, search_type], outputs=[ results_html, download_status, download_area, aggregated_output, search_type_state, current_results ] ) download_readme_button.click( handle_readme_download, inputs=[current_results], outputs=[download_status, download_area] ) download_repo_button.click( handle_repo_download, inputs=[current_results, search_type_state], outputs=[download_status, download_area] ) demo.launch(debug=True)