PDF-Summarizer / app.py
cstr's picture
Update app.py
d89b36d verified
raw
history blame
31.3 kB
import os
import re
import tempfile
import requests
import gradio as gr
from PyPDF2 import PdfReader
import logging
import webbrowser
from huggingface_hub import InferenceClient
from typing import Dict, List, Optional, Tuple
import time
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Constants
CONTEXT_SIZES = {
"4K": 4000,
"8K": 8000,
"32K": 32000,
"128K": 128000,
"200K": 200000
}
MODEL_CONTEXT_SIZES = {
"OpenAI ChatGPT": 4096,
"HuggingFace Inference": 4096,
"Groq API": {
"llama-3.1-70b-versatile": 32768,
"mixtral-8x7b-32768": 32768,
"llama-3.1-8b-instant": 8192
}
}
class ModelRegistry:
def __init__(self):
self.hf_models = {
"Phi-3 Mini 128k": "microsoft/Phi-3-mini-128k-instruct",
"Custom Model": ""
}
self.groq_models = self._fetch_groq_models()
def _fetch_groq_models(self) -> Dict[str, str]:
"""Fetch available Groq models with proper error handling"""
try:
groq_api_key = os.getenv('GROQ_API_KEY')
if not groq_api_key:
logging.warning("No GROQ_API_KEY found in environment")
return self._get_default_groq_models()
headers = {
"Authorization": f"Bearer {groq_api_key}",
"Content-Type": "application/json"
}
response = requests.get("https://api.groq.com/openai/v1/models", headers=headers)
if response.status_code == 200:
models = response.json().get("data", [])
return {model["id"]: model["id"] for model in models}
else:
logging.error(f"Failed to fetch Groq models: {response.status_code}")
return self._get_default_groq_models()
except Exception as e:
logging.error(f"Error fetching Groq models: {e}")
return self._get_default_groq_models()
def _get_default_groq_models(self) -> Dict[str, str]:
"""Return default Groq models when API is unavailable"""
return {
"llama-3.1-70b-versatile": "llama-3.1-70b-versatile",
"mixtral-8x7b-32768": "mixtral-8x7b-32768",
"llama-3.1-8b-instant": "llama-3.1-8b-instant"
}
def refresh_groq_models(self) -> Dict[str, str]:
"""Refresh the list of available Groq models"""
self.groq_models = self._fetch_groq_models()
return self.groq_models
# Initialize model registry
model_registry = ModelRegistry()
def extract_text_from_pdf(pdf_path: str) -> str:
"""Extract text content from PDF file."""
try:
reader = PdfReader(pdf_path)
text = ""
for page_num, page in enumerate(reader.pages, start=1):
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
else:
logging.warning(f"No text found on page {page_num}.")
if not text.strip():
return "Error: No extractable text found in the PDF."
return text
except Exception as e:
logging.error(f"Error reading PDF file: {e}")
return f"Error reading PDF file: {e}"
def format_content(text: str, format_type: str) -> str:
"""Format extracted text according to specified format."""
if format_type == 'txt':
return text
elif format_type == 'md':
paragraphs = text.split('\n\n')
return '\n\n'.join(paragraphs)
elif format_type == 'html':
paragraphs = text.split('\n\n')
return ''.join([f'<p>{para.strip()}</p>' for para in paragraphs if para.strip()])
else:
logging.error(f"Unsupported format: {format_type}")
return f"Unsupported format: {format_type}"
def split_into_snippets(text: str, context_size: int) -> List[str]:
"""Split text into manageable snippets based on context size."""
sentences = re.split(r'(?<=[.!?]) +', text)
snippets = []
current_snippet = ""
for sentence in sentences:
if len(current_snippet) + len(sentence) + 1 > context_size:
if current_snippet:
snippets.append(current_snippet.strip())
current_snippet = sentence + " "
else:
snippets.append(sentence.strip())
current_snippet = ""
else:
current_snippet += sentence + " "
if current_snippet.strip():
snippets.append(current_snippet.strip())
return snippets
def build_prompts(snippets: List[str], prompt_instruction: str, custom_prompt: Optional[str], snippet_num: Optional[int] = None) -> str:
"""Build formatted prompts from text snippets."""
if snippet_num is not None:
if 1 <= snippet_num <= len(snippets):
selected_snippets = [snippets[snippet_num - 1]]
else:
return f"Error: Invalid snippet number. Please choose between 1 and {len(snippets)}."
else:
selected_snippets = snippets
prompts = []
base_prompt = custom_prompt if custom_prompt else prompt_instruction
for idx, snippet in enumerate(selected_snippets, start=1):
if len(selected_snippets) > 1:
prompt_header = f"{base_prompt} Part {idx} of {len(selected_snippets)}: ---\n"
else:
prompt_header = f"{base_prompt} ---\n"
framed_prompt = f"{prompt_header}{snippet}\n---"
prompts.append(framed_prompt)
return "\n\n".join(prompts)
def send_to_model(*args, **kwargs):
try:
with gr.Progress() as progress:
progress(0, "Preparing to send to model...")
result = send_to_model_impl(*args, **kwargs)
progress(1, "Complete!")
return result
except Exception as e:
return f"Error: {str(e)}", None
def send_to_model_impl(prompt, model_selection, hf_model_choice, hf_custom_model, hf_api_key,
groq_model_choice, groq_api_key, openai_api_key):
"""Implementation of send to model functionality"""
if model_selection == "HuggingFace Inference":
if not hf_api_key:
return "HuggingFace API key required.", []
model_id = hf_custom_model if hf_model_choice == "Custom Model" else model_registry.hf_models[hf_model_choice]
summary = send_to_hf_inference(prompt, model_id, hf_api_key)
elif model_selection == "Groq API":
if not groq_api_key:
return "Groq API key required.", []
summary = send_to_groq(prompt, groq_model_choice, groq_api_key)
elif model_selection == "OpenAI ChatGPT":
if not openai_api_key:
return "OpenAI API key required.", []
summary = send_to_openai(prompt, openai_api_key)
else:
return "Invalid model selection.", []
if summary.startswith("Error"):
return summary, []
# Save summary for download
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f:
f.write(summary)
return summary, [f.name]
def send_to_hf_inference(prompt: str, model_name: str, api_key: str) -> str:
"""Send prompt to HuggingFace using Inference API"""
try:
client = InferenceClient(token=api_key)
response = client.text_generation(
prompt,
model=model_name,
max_new_tokens=500,
temperature=0.7,
details=True, # Get full response details
stream=False # Don't stream output
)
return response.generated_text # Return just the generated text
except Exception as e:
logging.error(f"Error with HF inference: {e}")
return f"Error with HF inference: {e}"
def send_to_groq(prompt: str, model_name: str, api_key: str) -> str:
"""Send prompt to Groq API"""
try:
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
data = {
"model": model_name,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.7,
"max_tokens": 500
}
response = requests.post(
"https://api.groq.com/openai/v1/chat/completions",
headers=headers,
json=data
)
if response.status_code != 200:
return f"Error: Groq API returned status {response.status_code}"
response_json = response.json()
if "choices" not in response_json or not response_json["choices"]:
return "Error: No response from Groq API"
return response_json["choices"][0]["message"]["content"]
except Exception as e:
logging.error(f"Error with Groq API: {e}")
return f"Error with Groq API: {e}"
def send_to_openai(prompt: str, api_key: str) -> str:
"""Send prompt to OpenAI API"""
try:
import openai
openai.api_key = api_key
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=500
)
return response.choices[0].message.content
except Exception as e:
logging.error(f"Error with OpenAI API: {e}")
return f"Error with OpenAI API: {e}"
def copy_to_clipboard(element_id: str) -> str:
return f"""
() => {{
try {{
const text = document.querySelector('#{element_id} textarea').value;
navigator.clipboard.writeText(text);
return "Copied to clipboard!";
}} catch (e) {{
console.error(e);
return "Failed to copy to clipboard";
}}
}}
"""
def open_chatgpt_old() -> str:
webbrowser.open_new_tab('https://chat.openai.com')
return "Opening ChatGPT in new tab"
def open_chatgpt() -> str:
"""Open ChatGPT in new browser tab"""
return """window.open('https://chat.openai.com/', '_blank');"""
def process_pdf(pdf, fmt, ctx_size):
"""Process PDF and return text and snippets"""
try:
if not pdf:
return "Please upload a PDF file.", "", [], None
# Extract text
text = extract_text_from_pdf(pdf.name)
if text.startswith("Error"):
return text, "", [], None
# Format content
formatted_text = format_content(text, fmt)
# Split into snippets
snippets = split_into_snippets(formatted_text, ctx_size)
# Save full text for download
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as text_file:
text_file.write(formatted_text)
snippet_choices = [f"Snippet {i+1} of {len(snippets)}" for i in range(len(snippets))]
return (
"PDF processed successfully!",
formatted_text,
snippets,
snippet_choices,
[text_file.name]
)
except Exception as e:
logging.error(f"Error processing PDF: {e}")
return f"Error processing PDF: {str(e)}", "", [], None
def generate_prompt(text, template, snippet_idx=None):
"""Generate prompt from text or selected snippet"""
try:
if not text:
return "No text available.", "", None
default_prompt = "Summarize the following text:"
prompt_template = template if template else default_prompt
if isinstance(text, list):
# If text is list of snippets
if snippet_idx is not None:
if 0 <= snippet_idx < len(text):
content = text[snippet_idx]
else:
return "Invalid snippet index.", "", None
else:
content = "\n\n".join(text)
else:
content = text
prompt = f"{prompt_template}\n---\n{content}\n---"
# Save prompt for download
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as prompt_file:
prompt_file.write(prompt)
return "Prompt generated!", prompt, [prompt_file.name]
except Exception as e:
logging.error(f"Error generating prompt: {e}")
return f"Error generating prompt: {str(e)}", "", None
def download_file(content: str, prefix: str = "file") -> List[str]:
"""Create a downloadable file with content and better error handling"""
if not content:
return []
try:
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt', prefix=prefix) as f:
f.write(content)
return [f.name]
except Exception as e:
logging.error(f"Error creating download file: {e}")
return []
# Main Interface
with gr.Blocks(theme=gr.themes.Default()) as demo:
# State variables
pdf_content = gr.State("")
snippets = gr.State([])
# Header
gr.Markdown("# πŸ“„ Smart PDF Summarizer")
gr.Markdown("Upload a PDF document and get AI-powered summaries using various AI models.")
with gr.Tabs() as tabs:
# Tab 1: PDF Processing
with gr.Tab("1️⃣ PDF Processing"):
with gr.Row():
with gr.Column(scale=1):
pdf_input = gr.File(
label="πŸ“ Upload PDF",
file_types=[".pdf"]
)
format_type = gr.Radio(
choices=["txt", "md", "html"],
value="txt",
label="πŸ“ Output Format"
)
context_size = gr.Slider(
minimum=1000,
maximum=200000,
step=1000,
value=4096,
label="Context Size"
)
with gr.Row():
for size_name, size_value in CONTEXT_SIZES.items():
gr.Button(
size_name,
size="sm",
scale=1
).click(
lambda v=size_value: v, # Simplified
None,
context_size
)
process_button = gr.Button("πŸ” Process PDF", variant="primary")
with gr.Column(scale=1):
progress_status = gr.Textbox(
label="Status",
interactive=False,
show_label=True,
visible=True # Ensure error messages are always visible
)
processed_text = gr.Textbox(
label="Processed Text",
lines=10,
max_lines=50,
show_copy_button=True
)
download_full_text = gr.Button("πŸ“₯ Download Full Text")
# Tab 2: Snippet Selection
with gr.Tab("2️⃣ Snippet Selection"):
with gr.Row():
with gr.Column(scale=1):
snippet_selector = gr.Dropdown(
label="Select Snippet",
choices=[],
interactive=True
)
custom_prompt = gr.Textbox(
label="✍️ Custom Prompt Template",
placeholder="Enter your custom prompt here...",
lines=2
)
generate_prompt_btn = gr.Button("Generate Prompt", variant="primary")
with gr.Column(scale=1):
generated_prompt = gr.Textbox(
label="πŸ“‹ Generated Prompt",
lines=10,
max_lines=50,
show_copy_button=True,
elem_id="generated_prompt" # Add this
)
with gr.Row():
copy_prompt_button = gr.Button("πŸ“‹ Copy Prompt")
download_prompt = gr.Button("πŸ“₯ Download Prompt")
download_snippet = gr.Button("πŸ“₯ Download Selected Snippet")
# Tab 3: Model Processing
with gr.Tab("3️⃣ Model Processing"):
with gr.Row():
with gr.Column(scale=1):
model_choice = gr.Radio(
choices=["OpenAI ChatGPT", "HuggingFace Inference", "Groq API"],
value="OpenAI ChatGPT",
label="πŸ€– Model Selection"
)
with gr.Column(visible=False) as openai_options:
openai_api_key = gr.Textbox(
label="πŸ”‘ OpenAI API Key",
type="password"
)
with gr.Column(visible=False) as hf_options:
hf_model = gr.Dropdown(
choices=list(model_registry.hf_models.keys()),
label="πŸ”§ HuggingFace Model",
value="Phi-3 Mini 128k"
)
hf_custom_model = gr.Textbox(
label="Custom Model ID",
visible=False
)
hf_api_key = gr.Textbox(
label="πŸ”‘ HuggingFace API Key",
type="password"
)
with gr.Column(visible=False) as groq_options:
groq_model = gr.Dropdown(
choices=list(model_registry.groq_models.keys()),
label="πŸ”§ Groq Model"
)
groq_refresh_btn = gr.Button("πŸ”„ Refresh Models")
groq_api_key = gr.Textbox(
label="πŸ”‘ Groq API Key",
type="password"
)
send_to_model_btn = gr.Button("πŸš€ Send to Model", variant="primary")
open_chatgpt_button = gr.Button("🌐 Open ChatGPT")
with gr.Column(scale=1):
summary_output = gr.Textbox(
label="πŸ“ Summary",
lines=15,
max_lines=50,
show_copy_button=True,
elem_id="summary_output" # Add this
)
with gr.Row():
copy_summary_button = gr.Button("πŸ“‹ Copy Summary")
download_summary = gr.Button("πŸ“₯ Download Summary")
# Hidden components for file handling
download_files = gr.Files(label="πŸ“₯ Downloads", visible=False)
# Event Handlers
def update_context_size(size: int) -> None:
"""Update context size slider with validation"""
if not isinstance(size, (int, float)):
size = 4096 # Default size
return gr.update(value=int(size))
def get_model_context_size(choice: str, groq_model: str = None) -> int:
"""Get context size for model with better defaults"""
if choice == "Groq API" and groq_model:
return MODEL_CONTEXT_SIZES["Groq API"].get(groq_model, 4096)
elif choice == "OpenAI ChatGPT":
return 4096
elif choice == "HuggingFace Inference":
return 4096
return 32000 # Safe default
def update_snippet_choices(snippets_list: List[str]) -> List[str]:
"""Create formatted snippet choices"""
return [f"Snippet {i+1} of {len(snippets_list)}" for i in range(len(snippets_list))]
def get_snippet_index(choice: str) -> int:
"""Extract snippet index from choice string"""
if not choice:
return 0
try:
return int(choice.split()[1]) - 1
except:
return 0
def toggle_model_options(choice):
return (
gr.update(visible=choice == "HuggingFace Inference"),
gr.update(visible=choice == "Groq API"),
gr.update(visible=choice == "OpenAI ChatGPT")
)
def refresh_groq_models_list():
updated_models = model_registry.refresh_groq_models()
return gr.update(choices=list(updated_models.keys()))
def toggle_custom_model(model_name):
return gr.update(visible=model_name == "Custom Model")
def handle_model_change(choice):
"""Handle model selection change"""
return (
gr.update(visible=choice == "HuggingFace Inference"),
gr.update(visible=choice == "Groq API"),
gr.update(visible=choice == "OpenAI ChatGPT"),
update_context_size(choice)
)
def handle_groq_model_change(model_name):
"""Handle Groq model selection change"""
return update_context_size("Groq API", model_name)
def handle_model_selection(choice):
"""Handle model selection and update UI"""
ctx_size = get_model_context_size(choice)
return {
hf_options: gr.update(visible=choice == "HuggingFace Inference"),
groq_options: gr.update(visible=choice == "Groq API"),
openai_options: gr.update(visible=choice == "OpenAI ChatGPT"),
context_size: gr.update(value=ctx_size)
}
# PDF Processing Handlers
def handle_pdf_process(pdf, fmt, ctx_size):
"""Process PDF and update UI state"""
if not pdf:
return {
progress_status: "Please upload a PDF file.",
processed_text: "",
pdf_content: "",
snippets: [],
snippet_selector: gr.update(choices=[], value=None),
download_files: None
}
try:
# Extract and format text
text = extract_text_from_pdf(pdf.name)
if text.startswith("Error"):
return {
progress_status: text,
processed_text: "",
pdf_content: "",
snippets: [],
snippet_selector: gr.update(choices=[], value=None),
download_files: None
}
formatted_text = format_content(text, fmt)
snippets_list = split_into_snippets(formatted_text, ctx_size)
# Create downloadable full text
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f:
f.write(formatted_text)
download_file = f.name
return {
progress_status: f"PDF processed successfully! Generated {len(snippets_list)} snippets.",
processed_text: formatted_text,
pdf_content: formatted_text,
snippets: snippets_list,
snippet_selector: gr.update(choices=update_snippet_choices(snippets_list), value="Snippet 1 of " + str(len(snippets_list))),
download_files: [download_file]
}
except Exception as e:
error_msg = f"Error processing PDF: {str(e)}"
logging.error(error_msg)
return {
progress_status: error_msg,
processed_text: "",
pdf_content: "",
snippets: [],
snippet_selector: gr.update(choices=[], value=None),
download_files: None
}
def handle_snippet_selection(choice, snippets_list):
"""Handle snippet selection and update prompt"""
if not snippets_list:
return {
progress_status: "No snippets available.",
generated_prompt: "",
download_files: None
}
try:
idx = get_snippet_index(choice)
selected_snippet = snippets_list[idx]
# Create downloadable snippet
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f:
f.write(selected_snippet)
return {
progress_status: f"Selected snippet {idx + 1}",
generated_prompt: selected_snippet,
download_files: [f.name]
}
except Exception as e:
error_msg = f"Error selecting snippet: {str(e)}"
logging.error(error_msg)
return {
progress_status: error_msg,
generated_prompt: "",
download_files: None
}
# Copy button handlers
def copy_text_js(element_id: str) -> str:
return f"""
() => {{
const text = document.querySelector('#{element_id} textarea').value;
navigator.clipboard.writeText(text);
return "Copied to clipboard!";
}}
"""
def handle_prompt_generation(snippet_text, template, snippet_choice, snippets_list):
"""Generate prompt from selected snippet"""
if not snippet_text or not snippets_list:
return {
progress_status: "No text available for prompt generation.",
generated_prompt: "",
download_files: None
}
try:
idx = get_snippet_index(snippet_choice)
prompt = generate_prompt(snippets_list[idx], template or "Summarize the following text:")
# Create downloadable prompt
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f:
f.write(prompt)
return {
progress_status: "Prompt generated successfully!",
generated_prompt: prompt,
download_files: [f.name]
}
except Exception as e:
error_msg = f"Error generating prompt: {str(e)}"
logging.error(error_msg)
return {
progress_status: error_msg,
generated_prompt: "",
download_files: None
}
def handle_copy_action(text):
"""Handle copy to clipboard action"""
return {
progress_status: gr.update(value="Text copied to clipboard!", visible=True)
}
# Connect all event handlers
# Core event handlers
process_button.click(
handle_pdf_process,
inputs=[pdf_input, format_type, context_size],
outputs=dict(
progress_status=progress_status,
processed_text=processed_text,
pdf_content=pdf_content,
snippets=snippets,
snippet_selector=snippet_selector,
download_files=download_files
)
)
generate_prompt_btn.click(
handle_prompt_generation,
inputs=[generated_prompt, custom_prompt, snippet_selector, snippets],
outputs={
progress_status: progress_status,
generated_prompt: generated_prompt,
download_files: download_files
}
)
# Snippet handling
snippet_selector.change(
handle_snippet_selection,
inputs=[snippet_selector, snippets],
outputs={
progress_status: progress_status,
generated_prompt: generated_prompt,
download_files: download_files
}
)
# Model selection
model_choice.change(
handle_model_selection,
inputs=[model_choice],
outputs={
hf_options: hf_options,
groq_options: groq_options,
openai_options: openai_options,
context_size: context_size
}
)
hf_model.change(
toggle_custom_model,
inputs=[hf_model],
outputs=[hf_custom_model]
)
groq_model.change(
handle_groq_model_change,
inputs=[groq_model],
outputs=[context_size]
)
# Context size buttons
"""
for size_name, size_value in CONTEXT_SIZES.items():
gr.Button(size_name, size="sm").click(
update_context_size,
inputs=[],
outputs=context_size
).success(
lambda s=size_value: int(s),
None,
context_size
) """
# Download handlers
for btn, content in [
(download_full_text, pdf_content),
(download_snippet, generated_prompt),
(download_prompt, generated_prompt),
(download_summary, summary_output)
]:
btn.click(
lambda x: [x] if x else None,
inputs=[content],
outputs=download_files
)
# Copy button handlers
for btn, elem_id in [
(copy_prompt_button, "generated_prompt"),
(copy_summary_button, "summary_output")
]:
btn.click(
fn=None,
_js=copy_text_js(elem_id),
outputs=progress_status
)
# ChatGPT handler
open_chatgpt_button.click(
fn=None,
_js="() => { window.open('https://chat.openai.com/', '_blank'); return 'Opened ChatGPT in new tab'; }",
outputs=progress_status
)
# Model processing
send_to_model_btn.click(
send_to_model,
inputs=[
generated_prompt,
model_choice,
hf_model,
hf_custom_model,
hf_api_key,
groq_model,
groq_api_key,
openai_api_key
],
outputs=[
summary_output,
download_files
]
)
groq_refresh_btn.click(
refresh_groq_models_list,
outputs=[groq_model]
)
# Instructions
gr.Markdown("""
### πŸ“Œ Instructions:
1. Upload a PDF document
2. Choose output format and context window size
3. Select snippet number (default: 1) or enter custom prompt
4. Select your preferred model in case you want to proceed directly (or continue with 5):
- OpenAI ChatGPT: Manual copy/paste workflow
- HuggingFace Inference: Direct API integration
- Groq API: High-performance inference
5. Click 'Process PDF' to generate summary
6. Use 'Copy Prompt' and, optionally, 'Open ChatGPT' for manual processing
7. Download generated files as needed
""")
# Launch the interface
if __name__ == "__main__":
demo.launch(share=False, debug=True)