from typing import List, Dict import httpx import gradio as gr import pandas as pd from huggingface_hub import HfApi, ModelCard, snapshot_download, login import base64 import io import zipfile import asyncio import aiohttp from pathlib import Path import emoji import tempfile import shutil import os # Initialize HuggingFace with access token def init_huggingface(token: str): """Initialize HuggingFace with access token.""" try: login(token=token) return True except Exception as e: print(f"Error logging in: {str(e)}") return False def format_link(item: Dict, number: int, search_type: str) -> str: """Format a link for display in the UI.""" link = item['link'] readme_link = f"{link}/blob/main/README.md" title = f"{number}. {item['id']}" metadata = f"Author: {item['author']}" if 'downloads' in item: metadata += f", Downloads: {item['downloads']}" html = f"""
""" return html def display_results(df: pd.DataFrame): """Display search results in HTML format.""" if df is not None and not df.empty: html = "No results found.
" def SwarmyTime(data: List[Dict]) -> Dict: """Aggregates all content from the given data.""" aggregated = { "total_items": len(data), "unique_authors": set(), "total_downloads": 0, "item_types": {"Models": 0, "Datasets": 0, "Spaces": 0} } for item in data: aggregated["unique_authors"].add(item.get("author", "Unknown")) aggregated["total_downloads"] += item.get("downloads", 0) if "modelId" in item: aggregated["item_types"]["Models"] += 1 elif "dataset" in item.get("id", ""): aggregated["item_types"]["Datasets"] += 1 else: aggregated["item_types"]["Spaces"] += 1 aggregated["unique_authors"] = len(aggregated["unique_authors"]) return aggregated def search_hub(query: str, search_type: str, token: str = None) -> pd.DataFrame: """Search the Hugging Face Hub for models, datasets, or spaces.""" api = HfApi(token=token) if search_type == "Models": results = api.list_models(search=query) data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results] elif search_type == "Datasets": results = api.list_datasets(search=query) data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results] elif search_type == "Spaces": results = api.list_spaces(search=query) data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results] else: data = [] for i, item in enumerate(data, 1): item['number'] = i item['formatted_link'] = format_link(item, i, search_type) return pd.DataFrame(data) async def download_readme(session: aiohttp.ClientSession, item: Dict, token: str) -> tuple[str, str]: """Download README.md file for a given item.""" item_id = item['id'] raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md" headers = {"Authorization": f"Bearer {token}"} if token else {} try: async with session.get(raw_url, headers=headers) as response: if response.status == 200: content = await response.text() return item_id.replace('/', '_'), content return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nStatus code: {response.status}" except Exception as e: return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}" async def download_all_readmes(data: List[Dict], token: str) -> tuple[str, str]: """Download all README files and create a zip archive.""" if not data: return "", "No results to download" zip_buffer = io.BytesIO() status_message = "Downloading READMEs..." async with aiohttp.ClientSession() as session: tasks = [download_readme(session, item, token) for item in data] results = await asyncio.gather(*tasks) with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for filename, content in results: zip_file.writestr(f"{filename}.md", content) zip_buffer.seek(0) base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() download_link = f""" """ return download_link, "READMEs ready for download!" def download_repository(repo_id: str, repo_type: str, temp_dir: str, token: str) -> str: """Download a single repository.""" try: repo_path = snapshot_download( repo_id=repo_id, repo_type=repo_type.lower()[:-1], # Remove 's' from 'Models'/'Datasets'/'Spaces' local_dir=os.path.join(temp_dir, repo_id.replace('/', '_')), ignore_patterns=["*.bin", "*.pt", "*.pth", "*.ckpt", "*.safetensors"], # Ignore large binary files token=token ) return repo_path except Exception as e: print(f"Error downloading {repo_id}: {str(e)}") return None def create_repo_zip(data: List[Dict], search_type: str, token: str) -> tuple[str, str]: """Download repositories and create a zip archive.""" if not data: return "", "No repositories to download" # Create temporary directory with tempfile.TemporaryDirectory() as temp_dir: successful_downloads = [] # Download each repository for item in data: repo_path = download_repository(item['id'], search_type, temp_dir, token) if repo_path: successful_downloads.append(repo_path) if not successful_downloads: return "", "No repositories were successfully downloaded" # Create zip file in memory zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for repo_path in successful_downloads: repo_name = os.path.basename(repo_path) for root, _, files in os.walk(repo_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.join(repo_name, os.path.relpath(file_path, repo_path)) zip_file.write(file_path, arcname) # Convert to base64 zip_buffer.seek(0) base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() download_link = f""" """ return download_link, f"Successfully downloaded {len(successful_downloads)} repositories" # Gradio Interface with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(""" # Search the Hugging Face Hub Search and download models, datasets, and spaces from Hugging Face. """) with gr.Row(): with gr.Column(scale=3): hf_token = gr.Textbox( label="HuggingFace Access Token (optional)", type="password", placeholder="Enter your HuggingFace access token...", ) with gr.Row(): with gr.Column(scale=3): search_query = gr.Textbox( label="Search Query", value="awacke1", placeholder="Enter search term..." ) with gr.Column(scale=2): search_type = gr.Radio( ["Models", "Datasets", "Spaces"], label="Search Type", value="Models", container=True ) with gr.Column(scale=1): search_button = gr.Button("🔍 Search", variant="primary", scale=1) with gr.Row(variant="panel"): with gr.Column(scale=1): gr.Markdown("### Download Options") with gr.Row(): download_readme_button = gr.Button( "📚 Download READMEs", variant="secondary", ) download_repo_button = gr.Button( "📦 Download Repositories", variant="secondary", ) download_status = gr.Markdown("Status: Ready to download", label="Status") download_area = gr.HTML("", label="Download Link") with gr.Row(): with gr.Column(scale=2): results_html = gr.HTML(label="Search Results") with gr.Column(scale=1): aggregated_output = gr.JSON(label="Search Statistics") search_type_state = gr.State("") current_results = gr.State([]) def search_and_aggregate(query, search_type, token): df = search_hub(query, search_type, token) data = df.to_dict('records') aggregated = SwarmyTime(data) html_results = display_results(df) return [ html_results, "Status: Ready to download", "", aggregated, search_type, data ] async def handle_readme_download(data, token): if not data: return ["Status: No results to download", ""] download_link, status = await download_all_readmes(data, token) return [f"Status: {status}", download_link] def handle_repo_download(data, search_type, token): if not data: return ["Status: No results to download", ""] download_link, status = create_repo_zip(data, search_type, token) return [f"Status: {status}", download_link] search_button.click( search_and_aggregate, inputs=[search_query, search_type, hf_token], outputs=[ results_html, download_status, download_area, aggregated_output, search_type_state, current_results ] ) download_readme_button.click( handle_readme_download, inputs=[current_results, hf_token], outputs=[download_status, download_area] ) download_repo_button.click( handle_repo_download, inputs=[current_results, search_type_state, hf_token], outputs=[download_status, download_area] ) demo.launch(debug=True)