import logging
import sys
import os
import re
import base64
import nest_asyncio
nest_asyncio.apply()
import pandas as pd
from pathlib import Path
from typing import Any, Dict, List, Optional
from PIL import Image
import streamlit as st
import torch
# Imports do LlamaIndex
from llama_index.core import (
Settings,
SimpleDirectoryReader,
StorageContext,
Document
)
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.core.node_parser import LangchainNodeParser
from langchain.text_splitter import RecursiveCharacterTextSplitter
from llama_index.core.storage.chat_store import SimpleChatStore
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.chat_engine import CondensePlusContextChatEngine
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex
import chromadb
###############################################################################
# MONKEY PATCH EM bm25s #
###############################################################################
import bm25s
###############################################################################
# CLASSE BM25Retriever (AJUSTADA PARA ENCODING) #
###############################################################################
import json
import Stemmer
from llama_index.core.base.base_retriever import BaseRetriever
from llama_index.core.callbacks.base import CallbackManager
from llama_index.core.constants import DEFAULT_SIMILARITY_TOP_K
from llama_index.core.schema import (
BaseNode,
IndexNode,
NodeWithScore,
QueryBundle,
MetadataMode,
)
from llama_index.core.vector_stores.utils import (
node_to_metadata_dict,
metadata_dict_to_node,
)
from typing import cast
logger = logging.getLogger(__name__)
DEFAULT_PERSIST_ARGS = {"similarity_top_k": "similarity_top_k", "_verbose": "verbose"}
DEFAULT_PERSIST_FILENAME = "retriever.json"
class BM25Retriever(BaseRetriever):
"""
Implementação customizada do algoritmo BM25 com a lib bm25s, incluindo um
'monkey patch' para contornar problemas de decodificação de caracteres.
"""
def __init__(
self,
nodes: Optional[List[BaseNode]] = None,
stemmer: Optional[Stemmer.Stemmer] = None,
language: str = "en",
existing_bm25: Optional[bm25s.BM25] = None,
similarity_top_k: int = DEFAULT_SIMILARITY_TOP_K,
callback_manager: Optional[CallbackManager] = None,
objects: Optional[List[IndexNode]] = None,
object_map: Optional[dict] = None,
verbose: bool = False,
) -> None:
self.stemmer = stemmer or Stemmer.Stemmer("english")
self.similarity_top_k = similarity_top_k
if existing_bm25 is not None:
# Usa instância BM25 existente
self.bm25 = existing_bm25
self.corpus = existing_bm25.corpus
else:
# Cria uma nova instância BM25 a partir de 'nodes'
if nodes is None:
raise ValueError("É preciso fornecer 'nodes' ou um 'existing_bm25'.")
self.corpus = [node_to_metadata_dict(node) for node in nodes]
corpus_tokens = bm25s.tokenize(
[node.get_content(metadata_mode=MetadataMode.EMBED) for node in nodes],
stopwords=language,
stemmer=self.stemmer,
show_progress=verbose,
)
self.bm25 = bm25s.BM25()
self.bm25.index(corpus_tokens, show_progress=verbose)
super().__init__(
callback_manager=callback_manager,
object_map=object_map,
objects=objects,
verbose=verbose,
)
@classmethod
def from_defaults(
cls,
index: Optional[VectorStoreIndex] = None,
nodes: Optional[List[BaseNode]] = None,
docstore: Optional["BaseDocumentStore"] = None,
stemmer: Optional[Stemmer.Stemmer] = None,
language: str = "en",
similarity_top_k: int = DEFAULT_SIMILARITY_TOP_K,
verbose: bool = False,
tokenizer: Optional[Any] = None,
) -> "BM25Retriever":
if tokenizer is not None:
logger.warning(
"O parâmetro 'tokenizer' foi descontinuado e será removido "
"no futuro. Use um Stemmer do PyStemmer para melhor controle."
)
if sum(bool(val) for val in [index, nodes, docstore]) != 1:
raise ValueError("Passe exatamente um entre 'index', 'nodes' ou 'docstore'.")
if index is not None:
docstore = index.docstore
if docstore is not None:
nodes = cast(List[BaseNode], list(docstore.docs.values()))
assert nodes is not None, (
"Não foi possível determinar os nodes. Verifique seus parâmetros."
)
return cls(
nodes=nodes,
stemmer=stemmer,
language=language,
similarity_top_k=similarity_top_k,
verbose=verbose,
)
def get_persist_args(self) -> Dict[str, Any]:
"""Dicionário com os parâmetros de persistência a serem salvos."""
return {
DEFAULT_PERSIST_ARGS[key]: getattr(self, key)
for key in DEFAULT_PERSIST_ARGS
if hasattr(self, key)
}
def persist(self, path: str, **kwargs: Any) -> None:
"""
Persiste o retriever em um diretório, incluindo
a estrutura do BM25 e o corpus em JSON.
"""
self.bm25.save(path, corpus=self.corpus, **kwargs)
with open(
os.path.join(path, DEFAULT_PERSIST_FILENAME),
"wt",
encoding="utf-8",
errors="ignore",
) as f:
json.dump(self.get_persist_args(), f, indent=2, ensure_ascii=False)
@classmethod
def from_persist_dir(cls, path: str, **kwargs: Any) -> "BM25Retriever":
"""
Carrega o retriever de um diretório, incluindo o BM25 e o corpus.
Devido ao nosso patch, ignoramos qualquer erro de decodificação
que eventualmente apareça.
"""
bm25_obj = bm25s.BM25.load(path, load_corpus=True, **kwargs)
with open(
os.path.join(path, DEFAULT_PERSIST_FILENAME),
"rt",
encoding="utf-8",
errors="ignore",
) as f:
retriever_data = json.load(f)
return cls(existing_bm25=bm25_obj, **retriever_data)
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
"""Recupera nós relevantes a partir do BM25."""
query = query_bundle.query_str
tokenized_query = bm25s.tokenize(
query, stemmer=self.stemmer, show_progress=self._verbose
)
indexes, scores = self.bm25.retrieve(
tokenized_query, k=self.similarity_top_k, show_progress=self._verbose
)
# bm25s retorna lista de listas, pois suporta batched queries
indexes = indexes[0]
scores = scores[0]
nodes: List[NodeWithScore] = []
for idx, score in zip(indexes, scores):
if isinstance(idx, dict):
node = metadata_dict_to_node(idx)
else:
node_dict = self.corpus[int(idx)]
node = metadata_dict_to_node(node_dict)
nodes.append(NodeWithScore(node=node, score=float(score)))
return nodes
###############################################################################
# CONFIGURAÇÃO STREAMLIT E AJUSTES DA PIPELINE #
###############################################################################
# Evite reindexar ou baixar dados repetidamente armazenando o estado na sessão.
im = Image.open("pngegg.png")
st.set_page_config(page_title="Chatbot Carômetro", page_icon=im, layout="wide")
# Seções laterais (sidebar)
st.sidebar.title("Configuração de LLM")
sidebar_option = st.sidebar.radio("Selecione o LLM", ["gpt-3.5-turbo"])
import base64
with open("sicoob-logo.png", "rb") as f:
data = base64.b64encode(f.read()).decode("utf-8")
st.sidebar.markdown(
f"""
""",
unsafe_allow_html=True,
)
if sidebar_option == "gpt-3.5-turbo":
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
Settings.llm = OpenAI(model="gpt-3.5-turbo")
Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-ada-002")
else:
raise Exception("Opção de LLM inválida!")
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
# Caminhos principais
chat_store_path = os.path.join("chat_store", "chat_store.json")
documents_path = "documentos"
chroma_storage_path = "chroma_db"
bm25_persist_path = "bm25_retriever"
# Classe CSV customizada
class CustomPandasCSVReader:
"""PandasCSVReader modificado para incluir cabeçalhos nos documentos."""
def __init__(
self,
*args: Any,
concat_rows: bool = True,
col_joiner: str = ", ",
row_joiner: str = "\n",
pandas_config: dict = {},
**kwargs: Any
) -> None:
self._concat_rows = concat_rows
self._col_joiner = col_joiner
self._row_joiner = row_joiner
self._pandas_config = pandas_config
def load_data(
self,
file: Path,
extra_info: Optional[Dict] = None,
) -> List[Document]:
df = pd.read_csv(file, **self._pandas_config)
text_list = [" ".join(df.columns.astype(str))]
text_list += (
df.astype(str)
.apply(lambda row: self._col_joiner.join(row.values), axis=1)
.tolist()
)
metadata = {"filename": file.name, "extension": file.suffix}
if extra_info:
metadata.update(extra_info)
if self._concat_rows:
return [Document(text=self._row_joiner.join(text_list), metadata=metadata)]
else:
return [
Document(text=text, metadata=metadata)
for text in text_list
]
def clean_documents(documents: List[Document]) -> List[Document]:
"""Remove caracteres indesejados diretamente nos textos."""
cleaned_docs = []
for doc in documents:
cleaned_text = re.sub(r"[^0-9A-Za-zÀ-ÿ ]", "", doc.get_content())
doc.text = cleaned_text
cleaned_docs.append(doc)
return cleaned_docs
def are_docs_downloaded(directory_path: str) -> bool:
"""Verifica se o diretório tem algum arquivo."""
return os.path.isdir(directory_path) and any(os.scandir(directory_path))
# Simula a leitura de arquivos do Google Drive
from llama_index.readers.google import GoogleDriveReader
import json
credentials_json = os.getenv('GOOGLE_CREDENTIALS')
token_json = os.getenv('GOOGLE_TOKEN')
if credentials_json is None:
raise ValueError("The GOOGLE_CREDENTIALS environment variable is not set.")
# Write the credentials to a file
credentials_path = "credentials.json"
token_path = "token.json"
with open(credentials_path, 'w') as credentials_file:
credentials_file.write(credentials_json)
with open(token_path, 'w') as credentials_file:
credentials_file.write(token_json)
google_drive_reader = GoogleDriveReader(credentials_path=credentials_path)
google_drive_reader._creds = google_drive_reader._get_credentials()
def download_original_files_from_folder(
greader: GoogleDriveReader,
pasta_documentos_drive: str,
local_path: str
):
"""Faz download dos arquivos apenas se não existirem localmente."""
os.makedirs(local_path, exist_ok=True)
files_meta = greader._get_fileids_meta(folder_id=pasta_documentos_drive)
if not files_meta:
logging.info("Nenhum arquivo encontrado na pasta especificada.")
return
for fmeta in files_meta:
file_id = fmeta[0]
file_name = os.path.basename(fmeta[2])
local_file_path = os.path.join(local_path, file_name)
if os.path.exists(local_file_path):
logging.info(f"Arquivo '{file_name}' já existe localmente, ignorando download.")
continue
downloaded_file_path = greader._download_file(file_id, local_file_path)
if downloaded_file_path:
logging.info(f"Arquivo '{file_name}' baixado com sucesso em: {downloaded_file_path}")
else:
logging.warning(f"Não foi possível baixar '{file_name}'")
# Pasta do Drive
pasta_documentos_drive = "1s0UUANcU1B0D2eyRweb1W5idUn1V5JEh"
###############################################################################
# CRIAÇÃO/CARREGAMENTO DE RECURSOS (evita repetição de etapas) #
###############################################################################
# 1. Garantir que não baixamos dados novamente se eles já existem.
if not are_docs_downloaded(documents_path):
logging.info("Baixando arquivos originais do Drive para 'documentos'...")
download_original_files_from_folder(
google_drive_reader,
pasta_documentos_drive,
documents_path
)
else:
logging.info("'documentos' já contém arquivos, ignorando download.")
# 2. Se ainda não existir docstore e index no estado da sessão, criamos.
# Caso contrário, apenas reutilizamos o que já existe.
if "docstore" not in st.session_state:
# Carregar documentos do diretório local
file_extractor = {".csv": CustomPandasCSVReader()}
documents = SimpleDirectoryReader(
input_dir=documents_path,
file_extractor=file_extractor,
filename_as_id=True,
recursive=True
).load_data()
documents = clean_documents(documents)
# Cria docstore
docstore = SimpleDocumentStore()
docstore.add_documents(documents)
st.session_state["docstore"] = docstore
else:
docstore = st.session_state["docstore"]
# 3. Configuramos o VectorStore + Chroma sem recriar se já estiver pronto.
if "vector_store" not in st.session_state:
db = chromadb.PersistentClient(path=chroma_storage_path)
chroma_collection = db.get_or_create_collection("dense_vectors")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
st.session_state["vector_store"] = vector_store
else:
vector_store = st.session_state["vector_store"]
storage_context = StorageContext.from_defaults(
docstore=docstore,
vector_store=vector_store
)
# 4. Carregamos ou criamos o índice. Se já existe a base do Chroma, supõe-se
# que o índice foi persistido. Caso contrário, cria-se.
if "index" not in st.session_state:
if os.path.exists(chroma_storage_path) and os.listdir(chroma_storage_path):
# Há dados salvos, então criamos índice a partir do vector_store
index = VectorStoreIndex.from_vector_store(vector_store)
else:
# Cria índice (chunk_size pode ser configurado conforme necessidade)
splitter = LangchainNodeParser(
RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=128)
)
index = VectorStoreIndex.from_documents(
list(docstore.docs.values()),
storage_context=storage_context,
transformations=[splitter]
)
vector_store.persist()
st.session_state["index"] = index
else:
index = st.session_state["index"]
# 5. Criação ou carregamento do BM25Retriever customizado
if "bm25_retriever" not in st.session_state:
if (
os.path.exists(bm25_persist_path)
and os.path.exists(os.path.join(bm25_persist_path, "bm25.index.json"))
):
bm25_retriever = BM25Retriever.from_persist_dir(bm25_persist_path)
else:
bm25_retriever = BM25Retriever.from_defaults(
docstore=docstore,
similarity_top_k=2,
language="portuguese",
verbose=True
)
os.makedirs(bm25_persist_path, exist_ok=True)
bm25_retriever.persist(bm25_persist_path)
st.session_state["bm25_retriever"] = bm25_retriever
else:
bm25_retriever = st.session_state["bm25_retriever"]
# 6. Criamos ou recuperamos o retriever que fará Query Fusion (BM25 + eventual vetor)
if "fusion_retriever" not in st.session_state:
vector_retriever = index.as_retriever(similarity_top_k=2)
fusion_retriever = QueryFusionRetriever(
[bm25_retriever, vector_retriever],
similarity_top_k=2,
num_queries=0,
mode="reciprocal_rerank",
use_async=True,
verbose=True,
query_gen_prompt=(
"Gere {num_queries} perguntas de busca relacionadas à seguinte pergunta. "
"Priorize o significado da pergunta sobre qualquer histórico de conversa. "
"Se o histórico não for relevante, ignore-o. "
"Não adicione explicações ou introduções. Apenas escreva as perguntas. "
"Pergunta: {query}\n\nPerguntas:\n"
),
)
st.session_state["fusion_retriever"] = fusion_retriever
else:
fusion_retriever = st.session_state["fusion_retriever"]
# 7. Configura o Chat Engine caso ainda não esteja na sessão
if "chat_engine" not in st.session_state:
nest_asyncio.apply()
memory = ChatMemoryBuffer.from_defaults(token_limit=3900)
query_engine = RetrieverQueryEngine.from_args(fusion_retriever)
chat_engine = CondensePlusContextChatEngine.from_defaults(
query_engine,
memory=memory,
context_prompt=(
"Você é um assistente virtual capaz de interagir normalmente, além de "
"fornecer informações sobre organogramas e listar funcionários. "
"Aqui estão os documentos relevantes para o contexto:\n"
"{context_str}\n"
"Use o histórico anterior ou o contexto acima para responder."
),
verbose=True,
)
st.session_state["chat_engine"] = chat_engine
else:
chat_engine = st.session_state["chat_engine"]
# 8. Armazenamento do chat
if "chat_store" not in st.session_state:
if os.path.exists(chat_store_path):
chat_store = SimpleChatStore.from_persist_path(persist_path=chat_store_path)
else:
chat_store = SimpleChatStore()
chat_store.persist(persist_path=chat_store_path)
st.session_state["chat_store"] = chat_store
else:
chat_store = st.session_state["chat_store"]
###############################################################################
# INTERFACE DO CHAT EM STREAMLIT #
###############################################################################
st.title("Chatbot Carômetro")
st.write("Este assistente virtual pode te ajudar a encontrar informações relevantes sobre os carômetros da Sicoob.")
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
for message in st.session_state.chat_history:
role, text = message.split(":", 1)
with st.chat_message(role.strip().lower()):
st.write(text.strip())
user_input = st.chat_input("Digite sua pergunta")
if user_input:
with st.chat_message('user'):
st.write(user_input)
st.session_state.chat_history.append(f"user: {user_input}")
with st.chat_message('assistant'):
message_placeholder = st.empty()
assistant_message = ''
response = chat_engine.stream_chat(user_input)
for token in response.response_gen:
assistant_message += token
message_placeholder.markdown(assistant_message + "▌")
message_placeholder.markdown(assistant_message)
st.session_state.chat_history.append(f"assistant: {assistant_message}")