|
import streamlit as st |
|
import os |
|
import logging |
|
import faiss |
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_community.llms import HuggingFacePipeline |
|
from langchain.chains import RetrievalQA |
|
from ingest import create_faiss_index |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
checkpoint = "LaMini-T5-738M" |
|
|
|
@st.cache_resource |
|
def load_llm(): |
|
tokenizer = AutoTokenizer.from_pretrained(checkpoint) |
|
model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) |
|
pipe = pipeline( |
|
'text2text-generation', |
|
model=model, |
|
tokenizer=tokenizer, |
|
max_length=256, |
|
do_sample=True, |
|
temperature=0.3, |
|
top_p=0.95 |
|
) |
|
return HuggingFacePipeline(pipeline=pipe) |
|
|
|
def validate_index_file(index_path): |
|
try: |
|
if os.path.getsize(index_path) == 0: |
|
st.error(f"Index file '{index_path}' is empty.") |
|
return False |
|
with open(index_path, 'rb') as f: |
|
data = f.read(100) |
|
logger.info(f"Successfully read {len(data)} bytes from the index file") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error validating index file: {e}") |
|
return False |
|
|
|
def load_faiss_index(): |
|
index_path = "faiss_index/index.faiss" |
|
|
|
if not os.path.exists(index_path) or not validate_index_file(index_path): |
|
st.warning("Index file is missing or corrupted. Creating a new one...") |
|
if os.path.exists(index_path): |
|
os.remove(index_path) |
|
st.info("Deleted the corrupted index file.") |
|
create_faiss_index() |
|
|
|
if not os.path.exists(index_path): |
|
st.error("Failed to create the FAISS index. Please check the 'docs' directory and try again.") |
|
raise RuntimeError("FAISS index creation failed.") |
|
|
|
try: |
|
index = faiss.read_index(index_path) |
|
if index is None: |
|
raise ValueError("Failed to read FAISS index.") |
|
|
|
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
|
db = FAISS.load_local("faiss_index", embeddings) |
|
if db.index is None or db.index_to_docstore_id is None: |
|
raise ValueError("FAISS index or docstore_id mapping is None.") |
|
|
|
return db.as_retriever() |
|
except Exception as e: |
|
st.error(f"Failed to load FAISS index: {e}") |
|
logger.exception("Exception in load_faiss_index") |
|
raise |
|
|
|
def process_answer(instruction): |
|
try: |
|
retriever = load_faiss_index() |
|
llm = load_llm() |
|
qa = RetrievalQA.from_chain_type( |
|
llm=llm, |
|
chain_type="stuff", |
|
retriever=retriever, |
|
return_source_documents=True |
|
) |
|
generated_text = qa.invoke(instruction) |
|
answer = generated_text['result'] |
|
return answer, generated_text |
|
except Exception as e: |
|
st.error(f"An error occurred while processing the answer: {e}") |
|
logger.exception("Exception in process_answer") |
|
return "An error occurred while processing your request.", {} |
|
|
|
def diagnose_faiss_index(): |
|
index_path = "faiss_index/index.faiss" |
|
if os.path.exists(index_path): |
|
st.write(f"Index file size: {os.path.getsize(index_path)} bytes") |
|
st.write(f"Index file permissions: {oct(os.stat(index_path).st_mode)[-3:]}") |
|
st.write(f"Index file owner: {os.stat(index_path).st_uid}") |
|
st.write(f"Current process user ID: {os.getuid()}") |
|
validate_index_file(index_path) |
|
else: |
|
st.warning("Index file does not exist.") |
|
|
|
def main(): |
|
st.title("Search Your PDF ππ") |
|
|
|
with st.expander("About the App"): |
|
st.markdown( |
|
""" |
|
This is a Generative AI powered Question and Answering app that responds to questions about your PDF File. |
|
""" |
|
) |
|
|
|
diagnose_faiss_index() |
|
|
|
question = st.text_area("Enter your Question") |
|
|
|
if st.button("Ask"): |
|
st.info("Your Question: " + question) |
|
st.info("Your Answer") |
|
try: |
|
answer, metadata = process_answer(question) |
|
st.write(answer) |
|
st.write(metadata) |
|
except Exception as e: |
|
st.error(f"An unexpected error occurred: {e}") |
|
logger.exception("Unexpected error in main function") |
|
|
|
if __name__ == '__main__': |
|
main() |
|
|