Spaces:
Sleeping
Sleeping
import os | |
import json | |
import re | |
import gradio as gr | |
import requests | |
from duckduckgo_search import DDGS | |
from typing import List | |
from pydantic import BaseModel, Field | |
from tempfile import NamedTemporaryFile | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from llama_parse import LlamaParse | |
from langchain_core.documents import Document | |
# Environment variables and configurations | |
huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") | |
llama_cloud_api_key = os.environ.get("LLAMA_CLOUD_API_KEY") | |
# Initialize LlamaParse | |
llama_parser = LlamaParse( | |
api_key=llama_cloud_api_key, | |
result_type="markdown", | |
num_workers=4, | |
verbose=True, | |
language="en", | |
) | |
def load_document(file: NamedTemporaryFile, parser: str = "pypdf") -> List[Document]: | |
"""Loads and splits the document into pages.""" | |
if parser == "pypdf": | |
loader = PyPDFLoader(file.name) | |
return loader.load_and_split() | |
elif parser == "llamaparse": | |
try: | |
documents = llama_parser.load_data(file.name) | |
return [Document(page_content=doc.text, metadata={"source": file.name}) for doc in documents] | |
except Exception as e: | |
print(f"Error using Llama Parse: {str(e)}") | |
print("Falling back to PyPDF parser") | |
loader = PyPDFLoader(file.name) | |
return loader.load_and_split() | |
else: | |
raise ValueError("Invalid parser specified. Use 'pypdf' or 'llamaparse'.") | |
def get_embeddings(): | |
return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
def update_vectors(files, parser): | |
if not files: | |
return "Please upload at least one PDF file." | |
embed = get_embeddings() | |
total_chunks = 0 | |
all_data = [] | |
for file in files: | |
data = load_document(file, parser) | |
all_data.extend(data) | |
total_chunks += len(data) | |
if os.path.exists("faiss_database"): | |
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
database.add_documents(all_data) | |
else: | |
database = FAISS.from_documents(all_data, embed) | |
database.save_local("faiss_database") | |
return f"Vector store updated successfully. Processed {total_chunks} chunks from {len(files)} files using {parser}." | |
def generate_chunked_response(prompt, max_tokens=1000, max_chunks=5): | |
API_URL = "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fmistralai%2FMistral-7B-Instruct-v0.3%26quot%3B%3C%2Fspan%3E%3C!-- HTML_TAG_END --> | |
headers = {"Authorization": f"Bearer {huggingface_token}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": max_tokens, | |
"temperature": 0.7, | |
"top_p": 0.95, | |
"top_k": 40, | |
"repetition_penalty": 1.1, | |
"stop": ["</s>", "[/INST]"] # Add stop tokens | |
} | |
} | |
full_response = "" | |
for _ in range(max_chunks): | |
response = requests.post(API_URL, headers=headers, json=payload) | |
if response.status_code == 200: | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
chunk = result[0].get('generated_text', '') | |
# Remove any part of the chunk that's already in full_response | |
new_content = chunk[len(full_response):].strip() | |
if not new_content: | |
break # No new content, so we're done | |
full_response += new_content | |
if chunk.endswith((".", "!", "?", "</s>", "[/INST]")): | |
break | |
# Update the prompt for the next iteration | |
payload["inputs"] = full_response | |
else: | |
break | |
else: | |
break | |
# Clean up the response | |
clean_response = re.sub(r'<s>\[INST\].*?\[/INST\]\s*', '', full_response, flags=re.DOTALL) | |
clean_response = clean_response.replace("Using the following context:", "").strip() | |
clean_response = clean_response.replace("Using the following context from the PDF documents:", "").strip() | |
return clean_response | |
def duckduckgo_search(query): | |
with DDGS() as ddgs: | |
results = ddgs.text(query, max_results=5) | |
return results | |
class CitingSources(BaseModel): | |
sources: List[str] = Field( | |
..., | |
description="List of sources to cite. Should be an URL of the source." | |
) | |
def get_response_from_pdf(query): | |
embed = get_embeddings() | |
if os.path.exists("faiss_database"): | |
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
else: | |
return "No documents available. Please upload PDF documents to answer questions." | |
retriever = database.as_retriever() | |
relevant_docs = retriever.get_relevant_documents(query) | |
context_str = "\n".join([doc.page_content for doc in relevant_docs]) | |
prompt = f"""<s>[INST] Using the following context from the PDF documents: | |
{context_str} | |
Write a detailed and complete response that answers the following user question: '{query}' | |
Do not include a list of sources in your response. [/INST]""" | |
generated_text = generate_chunked_response(prompt) | |
# Clean the response | |
clean_text = re.sub(r'<s>\[INST\].*?\[/INST\]\s*', '', generated_text, flags=re.DOTALL) | |
clean_text = clean_text.replace("Using the following context from the PDF documents:", "").strip() | |
return clean_text | |
def get_response_with_search(query): | |
search_results = duckduckgo_search(query) | |
context = "\n".join(f"{result['title']}\n{result['body']}\nSource: {result['href']}\n" | |
for result in search_results if 'body' in result) | |
prompt = f"""<s>[INST] Using the following context: | |
{context} | |
Write a detailed and complete research document that fulfills the following user request: '{query}' | |
After writing the document, please provide a list of sources used in your response. [/INST]""" | |
generated_text = generate_chunked_response(prompt) | |
# Clean the response | |
clean_text = re.sub(r'<s>\[INST\].*?\[/INST\]\s*', '', generated_text, flags=re.DOTALL) | |
clean_text = clean_text.replace("Using the following context:", "").strip() | |
# Split the content and sources | |
parts = clean_text.split("Sources:", 1) | |
main_content = parts[0].strip() | |
sources = parts[1].strip() if len(parts) > 1 else "" | |
return main_content, sources | |
def chatbot_interface(message, history, use_web_search): | |
if use_web_search: | |
main_content, sources = get_response_with_search(message) | |
formatted_response = f"{main_content}\n\nSources:\n{sources}" | |
else: | |
response = get_response_from_pdf(message) | |
formatted_response = response # No sources for PDF responses | |
history.append((message, formatted_response)) | |
return history | |
# Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# AI-powered Web Search and PDF Chat Assistant") | |
with gr.Row(): | |
file_input = gr.Files(label="Upload your PDF documents", file_types=[".pdf"]) | |
parser_dropdown = gr.Dropdown(choices=["pypdf", "llamaparse"], label="Select PDF Parser", value="pypdf") | |
update_button = gr.Button("Upload Document") | |
update_output = gr.Textbox(label="Update Status") | |
update_button.click(update_vectors, inputs=[file_input, parser_dropdown], outputs=update_output) | |
chatbot = gr.Chatbot(label="Conversation") | |
msg = gr.Textbox(label="Ask a question") | |
use_web_search = gr.Checkbox(label="Use Web Search", value=False) | |
submit = gr.Button("Submit") | |
gr.Examples( | |
examples=[ | |
["What are the latest developments in AI?"], | |
["Tell me about recent updates on GitHub"], | |
["What are the best hotels in Galapagos, Ecuador?"], | |
["Summarize recent advancements in Python programming"], | |
], | |
inputs=msg, | |
) | |
submit.click(chatbot_interface, inputs=[msg, chatbot, use_web_search], outputs=[chatbot]) | |
msg.submit(chatbot_interface, inputs=[msg, chatbot, use_web_search], outputs=[chatbot]) | |
gr.Markdown( | |
""" | |
## How to use | |
1. Upload PDF documents using the file input at the top. | |
2. Select the PDF parser (pypdf or llamaparse) and click "Upload Document" to update the vector store. | |
3. Ask questions in the textbox. | |
4. Toggle "Use Web Search" to switch between PDF chat and web search. | |
5. Click "Submit" or press Enter to get a response. | |
""" | |
) | |
if __name__ == "__main__": | |
demo.launch(share=True) |