File size: 6,969 Bytes
728ab87 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
from typing import List, Dict
import httpx
import gradio as gr
import pandas as pd
from huggingface_hub import HfApi, ModelCard
import base64
import io
import zipfile
import asyncio
import aiohttp
from pathlib import Path
import emoji
def search_hub(query: str, search_type: str) -> pd.DataFrame:
api = HfApi()
if search_type == "Models":
results = api.list_models(search=query)
data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results]
elif search_type == "Datasets":
results = api.list_datasets(search=query)
data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results]
elif search_type == "Spaces":
results = api.list_spaces(search=query)
data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results]
else:
data = []
# Add numbering and format the link
for i, item in enumerate(data, 1):
item['number'] = i
item['formatted_link'] = format_link(item, i, search_type)
return pd.DataFrame(data)
def format_link(item: Dict, number: int, search_type: str) -> str:
link = item['link']
readme_link = f"{link}/blob/main/README.md"
title = f"{number}. {item['id']}"
metadata = f"Author: {item['author']}"
if 'downloads' in item:
metadata += f", Downloads: {item['downloads']}"
html = f"""
<div style="margin-bottom: 10px;">
<strong>{title}</strong><br>
<a href="{link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View {search_type[:-1]}</a> |
<a href="{readme_link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View README</a><br>
<small>{metadata}</small>
</div>
"""
return html
async def download_readme(session: aiohttp.ClientSession, item: Dict) -> tuple[str, str]:
"""Download README.md file for a given item."""
item_id = item['id']
raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md"
try:
async with session.get(raw_url) as response:
if response.status == 200:
content = await response.text()
return item_id.replace('/', '_'), content
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nStatus code: {response.status}"
except Exception as e:
return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}"
async def download_all_readmes(data: List[Dict]) -> str:
"""Download all README files and create a zip archive."""
zip_buffer = io.BytesIO()
async with aiohttp.ClientSession() as session:
# Download all READMEs concurrently
tasks = [download_readme(session, item) for item in data]
results = await asyncio.gather(*tasks)
# Create zip file
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for filename, content in results:
zip_file.writestr(f"{filename}.md", content)
# Convert to base64
zip_buffer.seek(0)
base64_zip = base64.b64encode(zip_buffer.getvalue()).decode()
return base64_zip
def create_download_link(base64_zip: str) -> str:
"""Create an HTML download link for the zip file."""
download_link = f"""
<a href="data:application/zip;base64,{base64_zip}"
download="readmes.zip"
style="display: inline-block; padding: 10px 20px;
background-color: #4CAF50; color: white;
text-decoration: none; border-radius: 5px;
margin-top: 10px;">
📥 Download READMEs Archive
</a>
"""
return download_link
def display_results(df: pd.DataFrame):
if df is not None and not df.empty:
html = "<div style='max-height: 400px; overflow-y: auto;'>"
for _, row in df.iterrows():
html += row['formatted_link']
html += "</div>"
return html
else:
return "<p>No results found.</p>"
def SwarmyTime(data: List[Dict]) -> Dict:
"""Aggregates all content from the given data."""
aggregated = {
"total_items": len(data),
"unique_authors": set(),
"total_downloads": 0,
"item_types": {"Models": 0, "Datasets": 0, "Spaces": 0}
}
for item in data:
aggregated["unique_authors"].add(item.get("author", "Unknown"))
aggregated["total_downloads"] += item.get("downloads", 0)
if "modelId" in item:
aggregated["item_types"]["Models"] += 1
elif "dataset" in item.get("id", ""):
aggregated["item_types"]["Datasets"] += 1
else:
aggregated["item_types"]["Spaces"] += 1
aggregated["unique_authors"] = len(aggregated["unique_authors"])
return aggregated
with gr.Blocks() as demo:
gr.Markdown("## Search the Hugging Face Hub")
with gr.Row():
search_query = gr.Textbox(label="Search Query", value="awacke1")
search_type = gr.Radio(["Models", "Datasets", "Spaces"], label="Search Type", value="Models")
search_button = gr.Button("Search")
results_html = gr.HTML(label="Search Results")
download_html = gr.HTML(label="Download Link")
metadata_output = gr.Textbox(label="Metadata", lines=10)
aggregated_output = gr.JSON(label="Aggregated Content")
current_results = gr.State([]) # Store current search results
async def search_and_aggregate(query, search_type):
df = search_hub(query, search_type)
data = df.to_dict('records')
aggregated = SwarmyTime(data)
html_results = display_results(df)
# Create download button
download_button = """
<button onclick="downloadReadmes()"
style="padding: 10px 20px;
background-color: #4CAF50;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;">
📚 Download All READMEs
</button>
"""
return html_results, download_button, aggregated, data
async def download_readmes(data):
if not data:
return "No results to download"
base64_zip = await download_all_readmes(data)
return create_download_link(base64_zip)
search_button.click(
search_and_aggregate,
inputs=[search_query, search_type],
outputs=[results_html, download_html, aggregated_output, current_results]
)
# Add download button click handler
download_html.click(
download_readmes,
inputs=[current_results],
outputs=[download_html]
)
demo.launch(debug=True) |