Spaces:
Paused
Paused
import requests | |
from bs4 import BeautifulSoup | |
import os | |
import json | |
import gradio as gr | |
from datasets import Dataset | |
from PIL import Image | |
import io | |
import uuid | |
import time | |
import random | |
DATA_DIR = "/data" | |
IMAGES_DIR = os.path.join(DATA_DIR, "images") | |
DATASET_FILE = os.path.join(DATA_DIR, "dataset.json") | |
# Add a user agent rotation list | |
USER_AGENTS = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0" | |
] | |
def get_headers(cookies=None): | |
headers = { | |
"User-Agent": random.choice(USER_AGENTS), | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.google.com/", | |
"DNT": "1", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1" | |
} | |
if cookies: | |
headers["Cookie"] = cookies | |
return headers | |
def make_request(url, cookies=None): | |
time.sleep(random.uniform(1, 3)) # Add a random delay between requests | |
return requests.get(url, headers=get_headers(cookies), timeout=10) | |
def extract_image_url(html_content): | |
soup = BeautifulSoup(html_content, 'html.parser') | |
script = soup.find('script', type='text/javascript', string=lambda text: 'image =' in text if text else False) | |
if script: | |
image_data = json.loads(script.string.split('=', 1)[1].strip().rstrip(';')) | |
return f"{image_data['domain']}{image_data['base_dir']}/{image_data['dir']}/{image_data['img']}" | |
img_tag = soup.find('img', alt=True) | |
if img_tag and 'src' in img_tag.attrs: | |
return img_tag['src'] | |
return None | |
def extract_tags(html_content): | |
soup = BeautifulSoup(html_content, 'html.parser') | |
tag_elements = soup.find_all('li', class_='tag-type-general') | |
tags = [] | |
for tag_element in tag_elements: | |
tag_link = tag_element.find_all('a')[1] | |
if tag_link: | |
tags.append(tag_link.text) | |
return ','.join(tags) | |
def download_image(url, cookies=None): | |
try: | |
response = make_request(url, cookies) | |
response.raise_for_status() | |
return Image.open(io.BytesIO(response.content)) | |
except requests.RequestException as e: | |
raise Exception(f"Failed to download image: {str(e)}") | |
class DatasetBuilder: | |
def __init__(self): | |
self.dataset = self.load_dataset() | |
os.makedirs(IMAGES_DIR, exist_ok=True) | |
def load_dataset(self): | |
if os.path.exists(DATASET_FILE): | |
with open(DATASET_FILE, 'r') as f: | |
return json.load(f) | |
return [] | |
def save_dataset(self): | |
with open(DATASET_FILE, 'w') as f: | |
json.dump(self.dataset, f) | |
def add_image(self, url, cookies=None): | |
try: | |
response = make_request(url, cookies) | |
response.raise_for_status() | |
html_content = response.text | |
image_url = extract_image_url(html_content) | |
if not image_url: | |
raise Exception("Failed to extract image URL") | |
tags = extract_tags(html_content) | |
image = download_image(image_url, cookies) | |
# Generate a unique filename | |
filename = f"{uuid.uuid4()}.jpg" | |
filepath = os.path.join(IMAGES_DIR, filename) | |
# Save the image | |
image.save(filepath) | |
self.dataset.append({ | |
'image': filename, | |
'tags': tags | |
}) | |
self.save_dataset() | |
return f"Added image with tags: {tags}" | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def build_huggingface_dataset(self): | |
if not self.dataset: | |
return "Dataset is empty. Add some images first." | |
try: | |
hf_dataset = Dataset.from_dict({ | |
'image': [os.path.join(IMAGES_DIR, item['image']) for item in self.dataset], | |
'tags': [item['tags'] for item in self.dataset] | |
}) | |
return "HuggingFace Dataset created successfully!" | |
except Exception as e: | |
return f"Error creating HuggingFace Dataset: {str(e)}" | |
def get_dataset_info(self): | |
return f"Current dataset size: {len(self.dataset)} images" | |
def get_dataset_preview(self, num_images=5): | |
preview = [] | |
for item in self.dataset[-num_images:]: | |
image_path = os.path.join(IMAGES_DIR, item['image']) | |
preview.append((image_path, item['tags'])) | |
return preview | |
dataset_builder = DatasetBuilder() | |
def add_image_to_dataset(url, cookies): | |
result = dataset_builder.add_image(url, cookies) | |
return result, dataset_builder.get_dataset_info(), dataset_builder.get_dataset_preview() | |
def create_huggingface_dataset(): | |
return dataset_builder.build_huggingface_dataset() | |
def view_dataset(): | |
return dataset_builder.get_dataset_preview(num_images=20) | |
# Create Gradio interface | |
with gr.Blocks(theme="huggingface") as iface: | |
gr.Markdown("# Image Dataset Builder") | |
gr.Markdown("Enter a URL to add an image and its tags to the dataset. Progress is saved automatically.") | |
with gr.Row(): | |
url_input = gr.Textbox(lines=2, placeholder="Enter image URL here...") | |
cookies_input = gr.Textbox(lines=2, placeholder="Enter cookies (optional)") | |
add_button = gr.Button("Add Image") | |
result_output = gr.Textbox(label="Result") | |
dataset_info = gr.Textbox(label="Dataset Info", value=dataset_builder.get_dataset_info()) | |
gr.Markdown("## Dataset Preview") | |
preview_gallery = gr.Gallery(label="Recent Additions", show_label=False, elem_id="preview_gallery", columns=5, rows=1, height="auto") | |
add_button.click(add_image_to_dataset, inputs=[url_input, cookies_input], outputs=[result_output, dataset_info, preview_gallery]) | |
create_hf_button = gr.Button("Create HuggingFace Dataset") | |
hf_result = gr.Textbox(label="Dataset Creation Result") | |
create_hf_button.click(create_huggingface_dataset, inputs=[], outputs=hf_result) | |
view_dataset_button = gr.Button("View Dataset") | |
dataset_gallery = gr.Gallery(label="Dataset Contents", show_label=False, elem_id="dataset_gallery", columns=5, rows=4, height="auto") | |
view_dataset_button.click(view_dataset, inputs=[], outputs=dataset_gallery) | |
# Launch the interface | |
iface.launch() |