File size: 11,839 Bytes
2fb4bb5 6b6ee18 2fb4bb5 6b6ee18 90a56f9 2fb4bb5 8184a3a 2fb4bb5 90a56f9 2fb4bb5 6b6ee18 2fb4bb5 6b6ee18 2fb4bb5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
from typing import List, Dict
import httpx
import gradio as gr
import pandas as pd
from huggingface_hub import HfApi, ModelCard, snapshot_download, login
import base64
import io
import zipfile
import asyncio
import aiohttp
from pathlib import Path
import emoji
import tempfile
import shutil
import os
# Initialize HuggingFace with access token
def init_huggingface(token: str):
"""Initialize HuggingFace with access token."""
try:
login(token=token)
return True
except Exception as e:
print(f"Error logging in: {str(e)}")
return False
def format_link(item: Dict, number: int, search_type: str) -> str:
"""Format a link for display in the UI."""
link = item['link']
readme_link = f"{link}/blob/main/README.md"
title = f"{number}. {item['id']}"
metadata = f"Author: {item['author']}"
if 'downloads' in item:
metadata += f", Downloads: {item['downloads']}"
html = f"""
<div style="margin-bottom: 10px;">
<strong>{title}</strong><br>
<a href="{link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View {search_type[:-1]}</a> |
<a href="{readme_link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View README</a><br>
<small>{metadata}</small>
</div>
"""
return html
def display_results(df: pd.DataFrame):
"""Display search results in HTML format."""
if df is not None and not df.empty:
html = "<div style='max-height: 400px; overflow-y: auto;'>"
for _, row in df.iterrows():
html += row['formatted_link']
html += "</div>"
return html
else:
return "<p>No results found.</p>"
def SwarmyTime(data: List[Dict]) -> Dict:
"""Aggregates all content from the given data."""
aggregated = {
"total_items": len(data),
"unique_authors": set(),
"total_downloads": 0,
"item_types": {"Models": 0, "Datasets": 0, "Spaces": 0}
}
for item in data:
aggregated["unique_authors"].add(item.get("author", "Unknown"))
aggregated["total_downloads"] += item.get("downloads", 0)
if "modelId" in item:
aggregated["item_types"]["Models"] += 1
elif "dataset" in item.get("id", ""):
aggregated["item_types"]["Datasets"] += 1
else:
aggregated["item_types"]["Spaces"] += 1
aggregated["unique_authors"] = len(aggregated["unique_authors"])
return aggregated
def search_hub(query: str, search_type: str, token: str = None) -> pd.DataFrame:
"""Search the Hugging Face Hub for models, datasets, or spaces."""
api = HfApi(token=token)
if search_type == "Models":
results = api.list_models(search=query)
data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results]
elif search_type == "Datasets":
results = api.list_datasets(search=query)
data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results]
elif search_type == "Spaces":
results = api.list_spaces(search=query)
data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results]
else:
data = []
for i, item in enumerate(data, 1):
item['number'] = i
item['formatted_link'] = format_link(item, i, search_type)
return pd.DataFrame(data)
async def download_readme(session: aiohttp.ClientSession, item: Dict, token: str) -> tuple[str, str]:
"""Download README.md file for a given item."""
item_id = item['id']
raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md"
headers = {"Authorization": f"Bearer {token}"} if token else {}
try:
async with session.get(raw_url, headers=headers) as response:
if response.status == 200:
content = await response.text()
return item_id.replace('/', '_'), content
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nStatus code: {response.status}"
except Exception as e:
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}"
async def download_all_readmes(data: List[Dict], token: str) -> tuple[str, str]:
"""Download all README files and create a zip archive."""
if not data:
return "", "No results to download"
zip_buffer = io.BytesIO()
status_message = "Downloading READMEs..."
async with aiohttp.ClientSession() as session:
tasks = [download_readme(session, item, token) for item in data]
results = await asyncio.gather(*tasks)
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for filename, content in results:
zip_file.writestr(f"{filename}.md", content)
zip_buffer.seek(0)
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode()
download_link = f"""
<div style="margin-top: 10px;">
<a href="data:application/zip;base64,{base64_zip}"
download="readmes.zip"
style="display: inline-block; padding: 10px 20px;
background-color: #4CAF50; color: white;
text-decoration: none; border-radius: 5px;">
📥 Download READMEs Archive
</a>
</div>
"""
return download_link, "READMEs ready for download!"
def download_repository(repo_id: str, repo_type: str, temp_dir: str, token: str) -> str:
"""Download a single repository."""
try:
repo_path = snapshot_download(
repo_id=repo_id,
repo_type=repo_type.lower()[:-1], # Remove 's' from 'Models'/'Datasets'/'Spaces'
local_dir=os.path.join(temp_dir, repo_id.replace('/', '_')),
ignore_patterns=["*.bin", "*.pt", "*.pth", "*.ckpt", "*.safetensors"], # Ignore large binary files
token=token
)
return repo_path
except Exception as e:
print(f"Error downloading {repo_id}: {str(e)}")
return None
def create_repo_zip(data: List[Dict], search_type: str, token: str) -> tuple[str, str]:
"""Download repositories and create a zip archive."""
if not data:
return "", "No repositories to download"
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
successful_downloads = []
# Download each repository
for item in data:
repo_path = download_repository(item['id'], search_type, temp_dir, token)
if repo_path:
successful_downloads.append(repo_path)
if not successful_downloads:
return "", "No repositories were successfully downloaded"
# Create zip file in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for repo_path in successful_downloads:
repo_name = os.path.basename(repo_path)
for root, _, files in os.walk(repo_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join(repo_name, os.path.relpath(file_path, repo_path))
zip_file.write(file_path, arcname)
# Convert to base64
zip_buffer.seek(0)
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode()
download_link = f"""
<div style="margin-top: 10px;">
<a href="data:application/zip;base64,{base64_zip}"
download="repositories.zip"
style="display: inline-block; padding: 10px 20px;
background-color: #4CAF50; color: white;
text-decoration: none; border-radius: 5px;">
📥 Download Repositories Archive
</a>
</div>
"""
return download_link, f"Successfully downloaded {len(successful_downloads)} repositories"
# Gradio Interface
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# Search the Hugging Face Hub
Search and download models, datasets, and spaces from Hugging Face.
""")
with gr.Row():
with gr.Column(scale=3):
hf_token = gr.Textbox(
label="HuggingFace Access Token (optional)",
type="password",
placeholder="Enter your HuggingFace access token...",
)
with gr.Row():
with gr.Column(scale=3):
search_query = gr.Textbox(
label="Search Query",
value="awacke1",
placeholder="Enter search term..."
)
with gr.Column(scale=2):
search_type = gr.Radio(
["Models", "Datasets", "Spaces"],
label="Search Type",
value="Models",
container=True
)
with gr.Column(scale=1):
search_button = gr.Button("🔍 Search", variant="primary", scale=1)
with gr.Row(variant="panel"):
with gr.Column(scale=1):
gr.Markdown("### Download Options")
with gr.Row():
download_readme_button = gr.Button(
"📚 Download READMEs",
variant="secondary",
)
download_repo_button = gr.Button(
"📦 Download Repositories",
variant="secondary",
)
download_status = gr.Markdown("Status: Ready to download", label="Status")
download_area = gr.HTML("", label="Download Link")
with gr.Row():
with gr.Column(scale=2):
results_html = gr.HTML(label="Search Results")
with gr.Column(scale=1):
aggregated_output = gr.JSON(label="Search Statistics")
search_type_state = gr.State("")
current_results = gr.State([])
def search_and_aggregate(query, search_type, token):
df = search_hub(query, search_type, token)
data = df.to_dict('records')
aggregated = SwarmyTime(data)
html_results = display_results(df)
return [
html_results,
"Status: Ready to download",
"",
aggregated,
search_type,
data
]
async def handle_readme_download(data, token):
if not data:
return ["Status: No results to download", ""]
download_link, status = await download_all_readmes(data, token)
return [f"Status: {status}", download_link]
def handle_repo_download(data, search_type, token):
if not data:
return ["Status: No results to download", ""]
download_link, status = create_repo_zip(data, search_type, token)
return [f"Status: {status}", download_link]
search_button.click(
search_and_aggregate,
inputs=[search_query, search_type, hf_token],
outputs=[
results_html,
download_status,
download_area,
aggregated_output,
search_type_state,
current_results
]
)
download_readme_button.click(
handle_readme_download,
inputs=[current_results, hf_token],
outputs=[download_status, download_area]
)
download_repo_button.click(
handle_repo_download,
inputs=[current_results, search_type_state, hf_token],
outputs=[download_status, download_area]
)
demo.launch(debug=True) |