|
from typing import List, Dict |
|
import httpx |
|
import gradio as gr |
|
import pandas as pd |
|
from huggingface_hub import HfApi, ModelCard, snapshot_download |
|
import base64 |
|
import io |
|
import zipfile |
|
import asyncio |
|
import aiohttp |
|
from pathlib import Path |
|
import emoji |
|
import tempfile |
|
import shutil |
|
import os |
|
|
|
def search_hub(query: str, search_type: str) -> pd.DataFrame: |
|
api = HfApi() |
|
if search_type == "Models": |
|
results = api.list_models(search=query) |
|
data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results] |
|
elif search_type == "Datasets": |
|
results = api.list_datasets(search=query) |
|
data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results] |
|
elif search_type == "Spaces": |
|
results = api.list_spaces(search=query) |
|
data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results] |
|
else: |
|
data = [] |
|
|
|
for i, item in enumerate(data, 1): |
|
item['number'] = i |
|
item['formatted_link'] = format_link(item, i, search_type) |
|
|
|
return pd.DataFrame(data) |
|
|
|
def format_link(item: Dict, number: int, search_type: str) -> str: |
|
link = item['link'] |
|
readme_link = f"{link}/blob/main/README.md" |
|
title = f"{number}. {item['id']}" |
|
|
|
metadata = f"Author: {item['author']}" |
|
if 'downloads' in item: |
|
metadata += f", Downloads: {item['downloads']}" |
|
|
|
html = f""" |
|
<div style="margin-bottom: 10px;"> |
|
<strong>{title}</strong><br> |
|
<a href="{link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View {search_type[:-1]}</a> | |
|
<a href="{readme_link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View README</a><br> |
|
<small>{metadata}</small> |
|
</div> |
|
""" |
|
return html |
|
|
|
async def download_readme(session: aiohttp.ClientSession, item: Dict) -> tuple[str, str]: |
|
"""Download README.md file for a given item.""" |
|
item_id = item['id'] |
|
raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md" |
|
try: |
|
async with session.get(raw_url) as response: |
|
if response.status == 200: |
|
content = await response.text() |
|
return item_id.replace('/', '_'), content |
|
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nStatus code: {response.status}" |
|
except Exception as e: |
|
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}" |
|
|
|
async def download_all_readmes(data: List[Dict]) -> tuple[str, str]: |
|
"""Download all README files and create a zip archive.""" |
|
if not data: |
|
return "", "No results to download" |
|
|
|
zip_buffer = io.BytesIO() |
|
status_message = "Downloading READMEs..." |
|
|
|
async with aiohttp.ClientSession() as session: |
|
tasks = [download_readme(session, item) for item in data] |
|
results = await asyncio.gather(*tasks) |
|
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
|
for filename, content in results: |
|
zip_file.writestr(f"{filename}.md", content) |
|
|
|
zip_buffer.seek(0) |
|
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() |
|
|
|
download_link = f""" |
|
<div style="margin-top: 10px;"> |
|
<a href="data:application/zip;base64,{base64_zip}" |
|
download="readmes.zip" |
|
style="display: inline-block; padding: 10px 20px; |
|
background-color: #4CAF50; color: white; |
|
text-decoration: none; border-radius: 5px;"> |
|
📥 Download READMEs Archive |
|
</a> |
|
</div> |
|
""" |
|
|
|
return download_link, "READMEs ready for download!" |
|
|
|
def download_repository(repo_id: str, repo_type: str, temp_dir: str) -> str: |
|
"""Download a single repository.""" |
|
try: |
|
repo_path = snapshot_download( |
|
repo_id=repo_id, |
|
repo_type=repo_type.lower()[:-1], |
|
local_dir=os.path.join(temp_dir, repo_id.replace('/', '_')), |
|
ignore_patterns=["*.bin", "*.pt", "*.pth", "*.ckpt", "*.safetensors"] |
|
) |
|
return repo_path |
|
except Exception as e: |
|
print(f"Error downloading {repo_id}: {str(e)}") |
|
return None |
|
|
|
def create_repo_zip(data: List[Dict], search_type: str) -> tuple[str, str]: |
|
"""Download repositories and create a zip archive.""" |
|
if not data: |
|
return "", "No repositories to download" |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
successful_downloads = [] |
|
|
|
|
|
for item in data: |
|
repo_path = download_repository(item['id'], search_type, temp_dir) |
|
if repo_path: |
|
successful_downloads.append(repo_path) |
|
|
|
if not successful_downloads: |
|
return "", "No repositories were successfully downloaded" |
|
|
|
|
|
zip_buffer = io.BytesIO() |
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
|
for repo_path in successful_downloads: |
|
repo_name = os.path.basename(repo_path) |
|
for root, _, files in os.walk(repo_path): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
arcname = os.path.join(repo_name, os.path.relpath(file_path, repo_path)) |
|
zip_file.write(file_path, arcname) |
|
|
|
|
|
zip_buffer.seek(0) |
|
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() |
|
|
|
download_link = f""" |
|
<div style="margin-top: 10px;"> |
|
<a href="data:application/zip;base64,{base64_zip}" |
|
download="repositories.zip" |
|
style="display: inline-block; padding: 10px 20px; |
|
background-color: #4CAF50; color: white; |
|
text-decoration: none; border-radius: 5px;"> |
|
📥 Download Repositories Archive |
|
</a> |
|
</div> |
|
""" |
|
|
|
return download_link, f"Successfully downloaded {len(successful_downloads)} repositories" |
|
|
|
def display_results(df: pd.DataFrame): |
|
if df is not None and not df.empty: |
|
html = "<div style='max-height: 400px; overflow-y: auto;'>" |
|
for _, row in df.iterrows(): |
|
html += row['formatted_link'] |
|
html += "</div>" |
|
return html |
|
else: |
|
return "<p>No results found.</p>" |
|
|
|
def SwarmyTime(data: List[Dict]) -> Dict: |
|
"""Aggregates all content from the given data.""" |
|
aggregated = { |
|
"total_items": len(data), |
|
"unique_authors": set(), |
|
"total_downloads": 0, |
|
"item_types": {"Models": 0, "Datasets": 0, "Spaces": 0} |
|
} |
|
|
|
for item in data: |
|
aggregated["unique_authors"].add(item.get("author", "Unknown")) |
|
aggregated["total_downloads"] += item.get("downloads", 0) |
|
|
|
if "modelId" in item: |
|
aggregated["item_types"]["Models"] += 1 |
|
elif "dataset" in item.get("id", ""): |
|
aggregated["item_types"]["Datasets"] += 1 |
|
else: |
|
aggregated["item_types"]["Spaces"] += 1 |
|
|
|
aggregated["unique_authors"] = len(aggregated["unique_authors"]) |
|
return aggregated |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown(""" |
|
# Search the Hugging Face Hub |
|
Search and download models, datasets, and spaces from Hugging Face. |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
search_query = gr.Textbox( |
|
label="Search Query", |
|
value="awacke1", |
|
placeholder="Enter search term..." |
|
) |
|
with gr.Column(scale=2): |
|
search_type = gr.Radio( |
|
["Models", "Datasets", "Spaces"], |
|
label="Search Type", |
|
value="Models", |
|
container=True |
|
) |
|
with gr.Column(scale=1): |
|
search_button = gr.Button("🔍 Search", variant="primary", scale=1) |
|
|
|
with gr.Row(variant="panel"): |
|
with gr.Column(scale=1): |
|
gr.Markdown("### Download Options") |
|
with gr.Row(): |
|
download_readme_button = gr.Button( |
|
"📚 Download READMEs", |
|
variant="secondary", |
|
) |
|
download_repo_button = gr.Button( |
|
"📦 Download Repositories", |
|
variant="secondary", |
|
) |
|
download_status = gr.Markdown("Status: Ready to download", label="Status") |
|
download_area = gr.HTML("", label="Download Link") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
results_html = gr.HTML(label="Search Results") |
|
with gr.Column(scale=1): |
|
aggregated_output = gr.JSON(label="Search Statistics") |
|
|
|
search_type_state = gr.State("") |
|
current_results = gr.State([]) |
|
|
|
def search_and_aggregate(query, search_type): |
|
df = search_hub(query, search_type) |
|
data = df.to_dict('records') |
|
aggregated = SwarmyTime(data) |
|
html_results = display_results(df) |
|
return [ |
|
html_results, |
|
"Status: Ready to download", |
|
"", |
|
aggregated, |
|
search_type, |
|
data |
|
] |
|
|
|
async def handle_readme_download(data): |
|
if not data: |
|
return ["Status: No results to download", ""] |
|
download_link, status = await download_all_readmes(data) |
|
return [f"Status: {status}", download_link] |
|
|
|
def handle_repo_download(data, search_type): |
|
if not data: |
|
return ["Status: No results to download", ""] |
|
download_link, status = create_repo_zip(data, search_type) |
|
return [f"Status: {status}", download_link] |
|
|
|
search_button.click( |
|
search_and_aggregate, |
|
inputs=[search_query, search_type], |
|
outputs=[ |
|
results_html, |
|
download_status, |
|
download_area, |
|
aggregated_output, |
|
search_type_state, |
|
current_results |
|
] |
|
) |
|
|
|
download_readme_button.click( |
|
handle_readme_download, |
|
inputs=[current_results], |
|
outputs=[download_status, download_area] |
|
) |
|
|
|
download_repo_button.click( |
|
handle_repo_download, |
|
inputs=[current_results, search_type_state], |
|
outputs=[download_status, download_area] |
|
) |
|
|
|
demo.launch(debug=True) |