Spaces:
Running
Running
import gradio as gr | |
import subprocess | |
import os | |
import tempfile | |
import datetime | |
from pathlib import Path | |
from huggingface_hub import upload_file | |
from typing import Optional, Tuple, List, Union | |
import logging | |
import shutil | |
import base64 | |
from PIL import Image | |
import io | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
class ImageToDxfConverter: | |
def __init__(self): | |
"""Initialize the converter with configuration.""" | |
# For Hugging Face Spaces, executable should be in the root directory | |
self.executable_path = Path("SimpleImageToDxfHavePass") | |
self.hf_token = os.getenv("HF_TOKEN") | |
self.repo_id = "ArrcttacsrjksX/ImageToAutocadData" | |
# Make executable file executable (Hugging Face Spaces specific) | |
try: | |
os.chmod(self.executable_path, 0o755) | |
logger.info(f"Set executable permissions for {self.executable_path}") | |
except Exception as e: | |
logger.error(f"Failed to set executable permissions: {e}") | |
def _ensure_directory(self, path: Union[str, Path]) -> Path: | |
"""Ensure directory exists and return Path object.""" | |
path = Path(path) | |
path.mkdir(parents=True, exist_ok=True) | |
return path | |
def _generate_output_paths(self, output_folder: Path, timestamp: str) -> dict: | |
"""Generate all required output paths.""" | |
return { | |
'output_dxf': output_folder / f"{timestamp}_output.dxf", | |
'debug_png': output_folder / f"{timestamp}_debug.png", | |
'temp_dxf': output_folder / "_output.dxf", | |
'temp_debug': output_folder / "_debug.png" | |
} | |
def _save_base64_image(self, base64_data: str) -> str: | |
"""Save base64 image data to temporary file.""" | |
try: | |
# Extract actual base64 data after comma | |
img_data = base64_data.split(',')[1] | |
img_bytes = base64.b64decode(img_data) | |
# Create temporary file | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file: | |
temp_file.write(img_bytes) | |
return temp_file.name | |
except Exception as e: | |
logger.error(f"Failed to save base64 image: {str(e)}") | |
return None | |
def convert_image(self, | |
image_input: Union[str, None], | |
use_lines: bool = False) -> Tuple[Optional[str], Optional[str], List[str]]: | |
"""Convert image to DXF format.""" | |
try: | |
# Input validation | |
if not image_input: | |
return None, None, [] | |
# Handle base64 data | |
if isinstance(image_input, str) and image_input.startswith('data:image'): | |
temp_image_path = self._save_base64_image(image_input) | |
if not temp_image_path: | |
return None, None, [] | |
image_path = temp_image_path | |
else: | |
image_path = image_input | |
# Setup output directory | |
output_dir = self._ensure_directory("OutputPDF") | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
paths = self._generate_output_paths(output_dir, timestamp) | |
# Prepare conversion command | |
command = [ | |
f"./{self.executable_path}", | |
f"--imagePath={image_path}", | |
f"--outputPath={paths['temp_dxf']}", | |
f"--debug-output={paths['temp_debug']}" | |
] | |
if use_lines: | |
command.append("--use-lines") | |
# Execute conversion | |
try: | |
result = subprocess.run( | |
command, | |
check=True, | |
capture_output=True, | |
text=True | |
) | |
logger.info(f"Conversion output: {result.stdout}") | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Conversion failed: {e.stderr}") | |
return None, None, [] | |
# Move temporary files to final locations | |
shutil.move(paths['temp_dxf'], paths['output_dxf']) | |
if use_lines and os.path.exists(paths['temp_debug']): | |
shutil.move(paths['temp_debug'], paths['debug_png']) | |
# Upload files to Hugging Face | |
uploaded_files = [] | |
if self.hf_token: | |
try: | |
date_folder = timestamp | |
# Upload input image | |
uploaded_input = upload_file( | |
path_or_fileobj=image_path, | |
path_in_repo=f"datasets/{self.repo_id}/{date_folder}/{Path(image_path).name}", | |
repo_id=self.repo_id, | |
token=self.hf_token | |
) | |
uploaded_files.append(uploaded_input) | |
# Upload DXF output | |
uploaded_dxf = upload_file( | |
path_or_fileobj=str(paths['output_dxf']), | |
path_in_repo=f"datasets/{self.repo_id}/{date_folder}/{paths['output_dxf'].name}", | |
repo_id=self.repo_id, | |
token=self.hf_token | |
) | |
uploaded_files.append(uploaded_dxf) | |
# Upload debug image if available | |
if use_lines and os.path.exists(paths['debug_png']): | |
uploaded_debug = upload_file( | |
path_or_fileobj=str(paths['debug_png']), | |
path_in_repo=f"datasets/{self.repo_id}/{date_folder}/{paths['debug_png'].name}", | |
repo_id=self.repo_id, | |
token=self.hf_token | |
) | |
uploaded_files.append(uploaded_debug) | |
except Exception as e: | |
logger.error(f"Upload failed: {str(e)}") | |
# Clean up temporary file if it was created from base64 | |
if isinstance(image_input, str) and image_input.startswith('data:image'): | |
try: | |
os.unlink(image_path) | |
except Exception as e: | |
logger.error(f"Failed to clean up temporary file: {str(e)}") | |
return ( | |
str(paths['output_dxf']), | |
str(paths['debug_png']) if use_lines and os.path.exists(paths['debug_png']) else None, | |
uploaded_files | |
) | |
except Exception as e: | |
logger.error(f"Conversion failed: {str(e)}") | |
return None, None, [] | |
def create_gradio_interface(): | |
"""Create and configure the Gradio interface.""" | |
converter = ImageToDxfConverter() | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# Image to DXF Converter | |
Convert your images to DXF format for CAD software. | |
Press Ctrl+V to paste image from clipboard. | |
""") | |
# Thêm state để lưu base64 data | |
clipboard_image = gr.State() | |
with gr.Row(): | |
with gr.Column(scale=2): | |
image_input = gr.Image( | |
type="filepath", | |
label="Input Image", | |
elem_id="image_input", | |
height=300 | |
) | |
with gr.Column(scale=1): | |
use_lines_checkbox = gr.Checkbox( | |
label="Enable line detection", | |
value=False, | |
elem_id="use_lines" | |
) | |
convert_btn = gr.Button( | |
"Convert to DXF", | |
variant="primary", | |
elem_id="convert_btn" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
dxf_output = gr.File( | |
label="DXF Output", | |
elem_id="dxf_output" | |
) | |
with gr.Column(): | |
debug_output = gr.Image( | |
type="filepath", | |
label="Debug Preview", | |
elem_id="debug_output", | |
height=300 | |
) | |
# Cập nhật JavaScript để xử lý paste | |
demo.load(js=""" | |
function initPasteHandler() { | |
document.addEventListener('paste', function(e) { | |
e.preventDefault(); | |
const items = e.clipboardData.items; | |
for (let i = 0; i < items.length; i++) { | |
const item = items[i]; | |
if (item.type.indexOf('image') !== -1) { | |
const file = item.getAsFile(); | |
const reader = new FileReader(); | |
reader.onload = function(event) { | |
fetch(event.target.result) | |
.then(res => res.blob()) | |
.then(blob => { | |
const file = new File([blob], "pasted_image.png", { type: "image/png" }); | |
const dt = new DataTransfer(); | |
dt.items.add(file); | |
const fileInput = document.querySelector('#image_input input[type="file"]'); | |
if (fileInput) { | |
fileInput.files = dt.files; | |
const event = new Event('change', { 'bubbles': true }); | |
fileInput.dispatchEvent(event); | |
setTimeout(() => { | |
const convertBtn = document.querySelector('#convert_btn'); | |
if (convertBtn) convertBtn.click(); | |
}, 500); // Tăng timeout lên 500ms | |
} | |
}); | |
}; | |
reader.readAsDataURL(file); | |
break; | |
} | |
} | |
}); | |
} | |
if (document.readyState === 'complete') { | |
initPasteHandler(); | |
} else { | |
window.addEventListener('load', initPasteHandler); | |
} | |
// Thêm handler để debug | |
console.log('Paste handler initialized'); | |
""") | |
# Event handlers | |
convert_btn.click( | |
fn=converter.convert_image, | |
inputs=[image_input, use_lines_checkbox], | |
outputs=[dxf_output, debug_output], | |
) | |
return demo | |
def main(): | |
"""Main entry point with proper error handling.""" | |
try: | |
demo = create_gradio_interface() | |
demo.queue() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_api=False, | |
share=False, | |
) | |
except Exception as e: | |
logger.critical(f"Application failed to start: {str(e)}") | |
raise | |
if __name__ == "__main__": | |
main() |