Spaces:
Sleeping
Sleeping
File size: 4,286 Bytes
df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 df7ef14 eba40a1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import os
import requests
import streamlit as st
from io import BytesIO
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from transformers import pipeline
import torch
# Set up the page configuration
st.set_page_config(page_title="RAG-based PDF Chat", layout="centered", page_icon="π")
# Load the summarization pipeline model
@st.cache_resource
def load_summarization_pipeline():
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
return summarizer
summarizer = load_summarization_pipeline()
# Dictionary of Hugging Face PDF URLs grouped by folders
PDF_FOLDERS = {
# Add folder-specific lists of PDF URLs as shown above
}
# Helper function to convert Hugging Face blob URLs to direct download URLs
def get_huggingface_raw_url(url):
if "huggingface.co" in url and "/blob/" in url:
return url.replace("/blob/", "/resolve/")
return url
# Fetch and extract text from all PDFs in specified folders
def fetch_pdf_text_from_folders(pdf_folders):
all_text = ""
for folder_name, urls in pdf_folders.items():
folder_text = f"\n[Folder: {folder_name}]\n"
for url in urls:
raw_url = get_huggingface_raw_url(url)
try:
response = requests.get(raw_url)
response.raise_for_status()
pdf_file = BytesIO(response.content)
pdf_reader = PdfReader(pdf_file)
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
folder_text += page_text
except requests.RequestException as e:
st.error(f"Failed to fetch PDF from URL: {url} - {e}")
except Exception as e:
st.error(f"Failed to read PDF from URL {url}: {e}")
all_text += folder_text
return all_text
# Split text into manageable chunks
@st.cache_data
def get_text_chunks(text):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
chunks = text_splitter.split_text(text)
return chunks
# Initialize embedding function
embedding_function = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
# Create a FAISS vector store with embeddings, checking for empty chunks
@st.cache_resource
def load_or_create_vector_store(text_chunks):
if not text_chunks:
st.error("No valid text chunks found to create a vector store. Please check your PDF URLs or file content.")
return None
vector_store = FAISS.from_texts(text_chunks, embedding=embedding_function)
return vector_store
# Generate summary based on the retrieved text
def generate_summary_with_huggingface(query, retrieved_text):
summarization_input = f"{query}\n\nRelated information:\n{retrieved_text}"
max_input_length = 1024
summarization_input = summarization_input[:max_input_length]
summary = summarizer(summarization_input, max_length=500, min_length=50, do_sample=False)
return summary[0]["summary_text"]
# Generate response for user query
def user_input(user_question, vector_store):
if vector_store is None:
return "Vector store is empty due to failed PDF loading or empty documents."
docs = vector_store.similarity_search(user_question)
context_text = " ".join([doc.page_content for doc in docs])
return generate_summary_with_huggingface(user_question, context_text)
# Main function to run the Streamlit app
def main():
st.title("π Gen AI Lawyers Guide")
raw_text = fetch_pdf_text_from_folders(PDF_FOLDERS)
text_chunks = get_text_chunks(raw_text)
vector_store = load_or_create_vector_store(text_chunks)
user_question = st.text_input("Ask a Question:", placeholder="Type your question here...")
if st.button("Get Response"):
if not user_question:
st.warning("Please enter a question before submitting.")
else:
with st.spinner("Generating response..."):
answer = user_input(user_question, vector_store)
st.markdown(f"**π€ AI:** {answer}")
if __name__ == "__main__":
main()
|