Models-Datasets-Spaces-Search-Hub / backup-112624-app.py
awacke1's picture
Rename app.py to backup-112624-app.py
a4c2226 verified
from typing import List, Dict
import httpx
import gradio as gr
import pandas as pd
from huggingface_hub import HfApi, ModelCard, snapshot_download, login
import base64
import io
import zipfile
import asyncio
import aiohttp
from pathlib import Path
import emoji
import tempfile
import shutil
import os
# Initialize HuggingFace with access token
def init_huggingface(token: str):
"""Initialize HuggingFace with access token."""
try:
login(token=token)
return True
except Exception as e:
print(f"Error logging in: {str(e)}")
return False
def format_link(item: Dict, number: int, search_type: str) -> str:
"""Format a link for display in the UI."""
link = item['link']
readme_link = f"{link}/blob/main/README.md"
title = f"{number}. {item['id']}"
metadata = f"Author: {item['author']}"
if 'downloads' in item:
metadata += f", Downloads: {item['downloads']}"
html = f"""
<div style="margin-bottom: 10px;">
<strong>{title}</strong><br>
<a href="{link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View {search_type[:-1]}</a> |
<a href="{readme_link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View README</a><br>
<small>{metadata}</small>
</div>
"""
return html
def display_results(df: pd.DataFrame):
"""Display search results in HTML format."""
if df is not None and not df.empty:
html = "<div style='max-height: 400px; overflow-y: auto;'>"
for _, row in df.iterrows():
html += row['formatted_link']
html += "</div>"
return html
else:
return "<p>No results found.</p>"
def SwarmyTime(data: List[Dict]) -> Dict:
"""Aggregates all content from the given data."""
aggregated = {
"total_items": len(data),
"unique_authors": set(),
"total_downloads": 0,
"item_types": {"Models": 0, "Datasets": 0, "Spaces": 0}
}
for item in data:
aggregated["unique_authors"].add(item.get("author", "Unknown"))
aggregated["total_downloads"] += item.get("downloads", 0)
if "modelId" in item:
aggregated["item_types"]["Models"] += 1
elif "dataset" in item.get("id", ""):
aggregated["item_types"]["Datasets"] += 1
else:
aggregated["item_types"]["Spaces"] += 1
aggregated["unique_authors"] = len(aggregated["unique_authors"])
return aggregated
def search_hub(query: str, search_type: str, token: str = None) -> pd.DataFrame:
"""Search the Hugging Face Hub for models, datasets, or spaces."""
api = HfApi(token=token)
if search_type == "Models":
results = api.list_models(search=query)
data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results]
elif search_type == "Datasets":
results = api.list_datasets(search=query)
data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results]
elif search_type == "Spaces":
results = api.list_spaces(search=query)
data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results]
else:
data = []
for i, item in enumerate(data, 1):
item['number'] = i
item['formatted_link'] = format_link(item, i, search_type)
return pd.DataFrame(data)
async def download_readme(session: aiohttp.ClientSession, item: Dict, token: str) -> tuple[str, str]:
"""Download README.md file for a given item."""
item_id = item['id']
# Different base URLs for different repository types
if 'datasets' in item['link']:
raw_url = f"https://huggingface.co/datasets/{item_id}/raw/main/README.md"
alt_url = f"https://huggingface.co/datasets/{item_id}/raw/master/README.md"
elif 'spaces' in item['link']:
raw_url = f"https://huggingface.co/spaces/{item_id}/raw/main/README.md"
alt_url = f"https://huggingface.co/spaces/{item_id}/raw/master/README.md"
else: # Models
raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md"
alt_url = f"https://huggingface.co/{item_id}/raw/master/README.md"
headers = {"Authorization": f"Bearer {token}"} if token else {}
try:
# Try main branch first
async with session.get(raw_url, headers=headers) as response:
if response.status == 200:
content = await response.text()
return item_id.replace('/', '_'), content
# If main branch fails, try master branch
if response.status in [401, 404]:
async with session.get(alt_url, headers=headers) as alt_response:
if alt_response.status == 200:
content = await alt_response.text()
return item_id.replace('/', '_'), content
# If both attempts fail, return error message
error_msg = f"# Error downloading README for {item_id}\n"
if response.status == 401:
error_msg += "Authentication required. Please provide a valid HuggingFace token."
else:
error_msg += f"Status code: {response.status}"
return item_id.replace('/', '_'), error_msg
except Exception as e:
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}"
async def download_all_readmes(data: List[Dict], token: str) -> tuple[str, str]:
"""Download all README files and create a zip archive."""
if not data:
return "", "No results to download"
zip_buffer = io.BytesIO()
status_message = "Downloading READMEs..."
failed_downloads = []
async with aiohttp.ClientSession() as session:
tasks = [download_readme(session, item, token) for item in data]
results = await asyncio.gather(*tasks)
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for filename, content in results:
if "Error downloading README" in content:
failed_downloads.append(filename)
zip_file.writestr(f"{filename}.md", content)
zip_buffer.seek(0)
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode()
status = "READMEs ready for download!"
if failed_downloads:
status += f" (Failed to download {len(failed_downloads)} READMEs)"
download_link = f"""
<div style="margin-top: 10px;">
<a href="data:application/zip;base64,{base64_zip}"
download="readmes.zip"
style="display: inline-block; padding: 10px 20px;
background-color: #4CAF50; color: white;
text-decoration: none; border-radius: 5px;">
📥 Download READMEs Archive
</a>
{f'<p style="color: #ff6b6b; margin-top: 10px;">Note: Some READMEs could not be downloaded. Please check the zip file for details.</p>' if failed_downloads else ''}
</div>
"""
return download_link, status
def download_repository(repo_id: str, repo_type: str, temp_dir: str, token: str) -> str:
"""Download a single repository."""
try:
repo_path = snapshot_download(
repo_id=repo_id,
repo_type=repo_type.lower()[:-1], # Remove 's' from 'Models'/'Datasets'/'Spaces'
local_dir=os.path.join(temp_dir, repo_id.replace('/', '_')),
ignore_patterns=["*.bin", "*.pt", "*.pth", "*.ckpt", "*.safetensors"], # Ignore large binary files
token=token
)
return repo_path
except Exception as e:
print(f"Error downloading {repo_id}: {str(e)}")
return None
def create_repo_zip(data: List[Dict], search_type: str, token: str) -> tuple[str, str]:
"""Download repositories and create a zip archive."""
if not data:
return "", "No repositories to download"
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
successful_downloads = []
# Download each repository
for item in data:
repo_path = download_repository(item['id'], search_type, temp_dir, token)
if repo_path:
successful_downloads.append(repo_path)
if not successful_downloads:
return "", "No repositories were successfully downloaded"
# Create zip file in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for repo_path in successful_downloads:
repo_name = os.path.basename(repo_path)
for root, _, files in os.walk(repo_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join(repo_name, os.path.relpath(file_path, repo_path))
zip_file.write(file_path, arcname)
# Convert to base64
zip_buffer.seek(0)
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode()
download_link = f"""
<div style="margin-top: 10px;">
<a href="data:application/zip;base64,{base64_zip}"
download="repositories.zip"
style="display: inline-block; padding: 10px 20px;
background-color: #4CAF50; color: white;
text-decoration: none; border-radius: 5px;">
📥 Download Repositories Archive
</a>
</div>
"""
return download_link, f"Successfully downloaded {len(successful_downloads)} repositories"
# Gradio Interface
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# Search the Hugging Face Hub
Search and download models, datasets, and spaces from Hugging Face.
""")
with gr.Row():
with gr.Column(scale=3):
hf_token = gr.Textbox(
label="HuggingFace Access Token (optional)",
type="password",
placeholder="Enter your HuggingFace access token...",
)
with gr.Row():
with gr.Column(scale=3):
search_query = gr.Textbox(
label="Search Query",
value="awacke1",
placeholder="Enter search term..."
)
with gr.Column(scale=2):
search_type = gr.Radio(
["Models", "Datasets", "Spaces"],
label="Search Type",
value="Models",
container=True
)
with gr.Column(scale=1):
search_button = gr.Button("🔍 Search", variant="primary", scale=1)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("### Download Options")
with gr.Row():
download_readme_button = gr.Button(
"📚 Download READMEs",
variant="secondary",
)
download_repo_button = gr.Button(
"📦 Download Repositories",
variant="secondary",
)
download_status = gr.Markdown("Status: Ready to download", label="Status")
download_area = gr.HTML("", label="Download Link")
with gr.Row():
with gr.Column(scale=2):
results_html = gr.HTML(label="Search Results")
with gr.Column(scale=1):
aggregated_output = gr.JSON(label="Search Statistics")
search_type_state = gr.State("")
current_results = gr.State([])
def search_and_aggregate(query, search_type, token):
df = search_hub(query, search_type, token)
data = df.to_dict('records')
aggregated = SwarmyTime(data)
html_results = display_results(df)
return [
html_results,
"Status: Ready to download",
"",
aggregated,
search_type,
data
]
async def handle_readme_download(data, token):
if not data:
return ["Status: No results to download", ""]
download_link, status = await download_all_readmes(data, token)
return [f"Status: {status}", download_link]
def handle_repo_download(data, search_type, token):
if not data:
return ["Status: No results to download", ""]
download_link, status = create_repo_zip(data, search_type, token)
return [f"Status: {status}", download_link]
search_button.click(
search_and_aggregate,
inputs=[search_query, search_type, hf_token],
outputs=[
results_html,
download_status,
download_area,
aggregated_output,
search_type_state,
current_results
]
)
download_readme_button.click(
handle_readme_download,
inputs=[current_results, hf_token],
outputs=[download_status, download_area]
)
download_repo_button.click(
handle_repo_download,
inputs=[current_results, search_type_state, hf_token],
outputs=[download_status, download_area]
)
demo.launch(debug=True)