Models-Datasets-Spaces-Search-Hub / backup-112424-2-app.py
awacke1's picture
Rename app.py to backup-112424-2-app.py
93ce1dd verified
from typing import List, Dict
import httpx
import gradio as gr
import pandas as pd
from huggingface_hub import HfApi, ModelCard, snapshot_download
import base64
import io
import zipfile
import asyncio
import aiohttp
from pathlib import Path
import emoji
import tempfile
import shutil
import os
def search_hub(query: str, search_type: str) -> pd.DataFrame:
api = HfApi()
if search_type == "Models":
results = api.list_models(search=query)
data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results]
elif search_type == "Datasets":
results = api.list_datasets(search=query)
data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results]
elif search_type == "Spaces":
results = api.list_spaces(search=query)
data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results]
else:
data = []
for i, item in enumerate(data, 1):
item['number'] = i
item['formatted_link'] = format_link(item, i, search_type)
return pd.DataFrame(data)
def format_link(item: Dict, number: int, search_type: str) -> str:
link = item['link']
readme_link = f"{link}/blob/main/README.md"
title = f"{number}. {item['id']}"
metadata = f"Author: {item['author']}"
if 'downloads' in item:
metadata += f", Downloads: {item['downloads']}"
html = f"""
<div style="margin-bottom: 10px;">
<strong>{title}</strong><br>
<a href="{link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View {search_type[:-1]}</a> |
<a href="{readme_link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View README</a><br>
<small>{metadata}</small>
</div>
"""
return html
async def download_readme(session: aiohttp.ClientSession, item: Dict) -> tuple[str, str]:
"""Download README.md file for a given item."""
item_id = item['id']
raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md"
try:
async with session.get(raw_url) as response:
if response.status == 200:
content = await response.text()
return item_id.replace('/', '_'), content
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nStatus code: {response.status}"
except Exception as e:
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}"
async def download_all_readmes(data: List[Dict]) -> tuple[str, str]:
"""Download all README files and create a zip archive."""
if not data:
return "", "No results to download"
zip_buffer = io.BytesIO()
status_message = "Downloading READMEs..."
async with aiohttp.ClientSession() as session:
tasks = [download_readme(session, item) for item in data]
results = await asyncio.gather(*tasks)
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for filename, content in results:
zip_file.writestr(f"{filename}.md", content)
zip_buffer.seek(0)
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode()
download_link = f"""
<div style="margin-top: 10px;">
<a href="data:application/zip;base64,{base64_zip}"
download="readmes.zip"
style="display: inline-block; padding: 10px 20px;
background-color: #4CAF50; color: white;
text-decoration: none; border-radius: 5px;">
📥 Download READMEs Archive
</a>
</div>
"""
return download_link, "READMEs ready for download!"
def download_repository(repo_id: str, repo_type: str, temp_dir: str) -> str:
"""Download a single repository."""
try:
repo_path = snapshot_download(
repo_id=repo_id,
repo_type=repo_type.lower()[:-1], # Remove 's' from 'Models'/'Datasets'/'Spaces'
local_dir=os.path.join(temp_dir, repo_id.replace('/', '_')),
ignore_patterns=["*.bin", "*.pt", "*.pth", "*.ckpt", "*.safetensors"] # Ignore large binary files
)
return repo_path
except Exception as e:
print(f"Error downloading {repo_id}: {str(e)}")
return None
def create_repo_zip(data: List[Dict], search_type: str) -> tuple[str, str]:
"""Download repositories and create a zip archive."""
if not data:
return "", "No repositories to download"
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
successful_downloads = []
# Download each repository
for item in data:
repo_path = download_repository(item['id'], search_type, temp_dir)
if repo_path:
successful_downloads.append(repo_path)
if not successful_downloads:
return "", "No repositories were successfully downloaded"
# Create zip file in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for repo_path in successful_downloads:
repo_name = os.path.basename(repo_path)
for root, _, files in os.walk(repo_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join(repo_name, os.path.relpath(file_path, repo_path))
zip_file.write(file_path, arcname)
# Convert to base64
zip_buffer.seek(0)
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode()
download_link = f"""
<div style="margin-top: 10px;">
<a href="data:application/zip;base64,{base64_zip}"
download="repositories.zip"
style="display: inline-block; padding: 10px 20px;
background-color: #4CAF50; color: white;
text-decoration: none; border-radius: 5px;">
📥 Download Repositories Archive
</a>
</div>
"""
return download_link, f"Successfully downloaded {len(successful_downloads)} repositories"
def display_results(df: pd.DataFrame):
if df is not None and not df.empty:
html = "<div style='max-height: 400px; overflow-y: auto;'>"
for _, row in df.iterrows():
html += row['formatted_link']
html += "</div>"
return html
else:
return "<p>No results found.</p>"
def SwarmyTime(data: List[Dict]) -> Dict:
"""Aggregates all content from the given data."""
aggregated = {
"total_items": len(data),
"unique_authors": set(),
"total_downloads": 0,
"item_types": {"Models": 0, "Datasets": 0, "Spaces": 0}
}
for item in data:
aggregated["unique_authors"].add(item.get("author", "Unknown"))
aggregated["total_downloads"] += item.get("downloads", 0)
if "modelId" in item:
aggregated["item_types"]["Models"] += 1
elif "dataset" in item.get("id", ""):
aggregated["item_types"]["Datasets"] += 1
else:
aggregated["item_types"]["Spaces"] += 1
aggregated["unique_authors"] = len(aggregated["unique_authors"])
return aggregated
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# Search the Hugging Face Hub
Search and download models, datasets, and spaces from Hugging Face.
""")
with gr.Row():
with gr.Column(scale=3):
search_query = gr.Textbox(
label="Search Query",
value="awacke1",
placeholder="Enter search term..."
)
with gr.Column(scale=2):
search_type = gr.Radio(
["Models", "Datasets", "Spaces"],
label="Search Type",
value="Models",
container=True
)
with gr.Column(scale=1):
search_button = gr.Button("🔍 Search", variant="primary", scale=1)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("### Download Options")
with gr.Row():
download_readme_button = gr.Button(
"📚 Download READMEs",
variant="secondary",
)
download_repo_button = gr.Button(
"📦 Download Repositories",
variant="secondary",
)
download_status = gr.Markdown("Status: Ready to download", label="Status")
download_area = gr.HTML("", label="Download Link")
with gr.Row():
with gr.Column(scale=2):
results_html = gr.HTML(label="Search Results")
with gr.Column(scale=1):
aggregated_output = gr.JSON(label="Search Statistics")
search_type_state = gr.State("")
current_results = gr.State([])
def search_and_aggregate(query, search_type):
df = search_hub(query, search_type)
data = df.to_dict('records')
aggregated = SwarmyTime(data)
html_results = display_results(df)
return [
html_results, # results_html
"Status: Ready to download", # download_status
"", # download_area
aggregated, # aggregated_output
search_type, # search_type_state
data # current_results
]
async def handle_readme_download(data):
if not data:
return ["Status: No results to download", ""]
download_link, status = await download_all_readmes(data)
return [f"Status: {status}", download_link]
def handle_repo_download(data, search_type):
if not data:
return ["Status: No results to download", ""]
download_link, status = create_repo_zip(data, search_type)
return [f"Status: {status}", download_link]
search_button.click(
search_and_aggregate,
inputs=[search_query, search_type],
outputs=[
results_html,
download_status,
download_area,
aggregated_output,
search_type_state,
current_results
]
)
download_readme_button.click(
handle_readme_download,
inputs=[current_results],
outputs=[download_status, download_area]
)
download_repo_button.click(
handle_repo_download,
inputs=[current_results, search_type_state],
outputs=[download_status, download_area]
)
demo.launch(debug=True)