Spaces:
Running
Running
import gradio as gr | |
from gradio_client import Client | |
import os | |
import zipfile | |
from datasets import Dataset | |
from huggingface_hub import HfApi | |
import logging | |
import time # Import time module for adding delays | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Initialize the Gradio client | |
client = Client("MiniMaxAI/MiniMax-Text-01") | |
# Function to call the API and get the result | |
def call_api(prompt): | |
try: | |
logger.info(f"Calling API with prompt: {prompt[:100]}...") # Log the first 100 chars of the prompt | |
result = client.predict( | |
message=prompt, | |
max_tokens=12800, | |
temperature=0.1, | |
top_p=0.9, | |
api_name="/chat" | |
) | |
logger.info("API call successful.") | |
return result | |
except Exception as e: | |
logger.error(f"API call failed: {e}") | |
raise gr.Error(f"API call failed: {str(e)}") | |
# Function to segment the text file into chunks of 3000 words | |
def segment_text(file_path): | |
try: | |
logger.info(f"Reading file: {file_path}") | |
# Try reading with UTF-8 encoding first | |
with open(file_path, "r", encoding="utf-8") as f: | |
text = f.read() | |
logger.info("File read successfully with UTF-8 encoding.") | |
except UnicodeDecodeError: | |
logger.warning("UTF-8 encoding failed. Trying latin-1 encoding.") | |
# Fallback to latin-1 encoding if UTF-8 fails | |
with open(file_path, "r", encoding="latin-1") as f: | |
text = f.read() | |
logger.info("File read successfully with latin-1 encoding.") | |
except Exception as e: | |
logger.error(f"Failed to read file: {e}") | |
raise gr.Error(f"Failed to read file: {str(e)}") | |
# Split the text into chunks of 3000 words | |
words = text.split() | |
chunks = [" ".join(words[i:i + 3000]) for i in range(0, len(words), 3000)] | |
logger.info(f"Segmented text into {len(chunks)} chunks.") | |
return chunks | |
# Function to process the text file and make API calls with rate limiting | |
def process_text(file, prompt): | |
try: | |
logger.info("Starting text processing...") | |
# Segment the text file into chunks | |
file_path = file.name if hasattr(file, "name") else file | |
chunks = segment_text(file_path) | |
# Initialize Hugging Face API | |
hf_api = HfApi(token=os.environ.get("HUGGINGFACE_TOKEN")) | |
if not hf_api.token: | |
raise ValueError("Hugging Face token not found in environment variables.") | |
# Process each chunk with a 20-second delay between API calls | |
results = [] | |
for idx, chunk in enumerate(chunks): | |
logger.info(f"Processing chunk {idx + 1}/{len(chunks)}") | |
try: | |
# Call the API | |
result = call_api(f"{prompt}\n\n{chunk}") | |
results.append(result) | |
logger.info(f"Chunk {idx + 1} processed successfully.") | |
# Save the result to a file | |
os.makedirs("outputs", exist_ok=True) | |
output_file = f"outputs/output_{idx}.txt" | |
with open(output_file, "w", encoding="utf-8") as f: | |
f.write(result) | |
logger.info(f"Saved result to {output_file}") | |
# Upload the chunk to Hugging Face | |
try: | |
logger.info(f"Uploading chunk {idx + 1} to Hugging Face...") | |
dataset = Dataset.from_dict({"text": [result]}) | |
dataset.push_to_hub("TeacherPuffy/book") # Updated dataset name | |
logger.info(f"Chunk {idx + 1} uploaded to Hugging Face successfully.") | |
except Exception as e: | |
logger.error(f"Failed to upload chunk {idx + 1} to Hugging Face: {e}") | |
raise gr.Error(f"Failed to upload chunk {idx + 1} to Hugging Face: {str(e)}") | |
# Wait 20 seconds before the next API call | |
if idx < len(chunks) - 1: # No need to wait after the last chunk | |
logger.info("Waiting 20 seconds before the next API call...") | |
time.sleep(20) | |
except Exception as e: | |
logger.error(f"Failed to process chunk {idx + 1}: {e}") | |
raise gr.Error(f"Failed to process chunk {idx + 1}: {str(e)}") | |
# Create a ZIP file of all outputs | |
try: | |
logger.info("Creating ZIP file...") | |
with zipfile.ZipFile("outputs.zip", "w") as zipf: | |
for root, dirs, files in os.walk("outputs"): | |
for file in files: | |
zipf.write(os.path.join(root, file), file) | |
logger.info("ZIP file created successfully.") | |
except Exception as e: | |
logger.error(f"Failed to create ZIP file: {e}") | |
raise gr.Error(f"Failed to create ZIP file: {str(e)}") | |
return "outputs.zip", "All chunks processed and uploaded to Hugging Face. ZIP file created." | |
except Exception as e: | |
logger.error(f"An error occurred during processing: {e}") | |
raise gr.Error(f"An error occurred: {str(e)}") | |
# Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("## Text File Processor with Rate-Limited API Calls") | |
with gr.Row(): | |
file_input = gr.File(label="Upload Text File") | |
prompt_input = gr.Textbox(label="Enter Prompt") | |
with gr.Row(): | |
output_zip = gr.File(label="Download ZIP File") | |
output_message = gr.Textbox(label="Status Message") | |
submit_button = gr.Button("Submit") | |
submit_button.click( | |
process_text, | |
inputs=[file_input, prompt_input], | |
outputs=[output_zip, output_message] | |
) | |
# Launch the Gradio app with a public link | |
demo.launch(share=True) |