|
|
|
import subprocess |
|
import os |
|
import torch |
|
from dotenv import load_dotenv |
|
from langchain_community.vectorstores import Qdrant |
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
from langchain.prompts import ChatPromptTemplate |
|
from langchain.schema.runnable import RunnablePassthrough |
|
from langchain.schema.output_parser import StrOutputParser |
|
from qdrant_client import QdrantClient, models |
|
from langchain_openai import ChatOpenAI |
|
import gradio as gr |
|
import logging |
|
from typing import List, Tuple, Generator |
|
from dataclasses import dataclass |
|
from datetime import datetime |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
from langchain_huggingface.llms import HuggingFacePipeline |
|
from langchain_cerebras import ChatCerebras |
|
from queue import Queue |
|
from threading import Thread |
|
from langchain.chains import LLMChain |
|
from langchain_core.prompts import PromptTemplate |
|
from langchain_huggingface import HuggingFaceEndpoint |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
@dataclass |
|
class Message: |
|
role: str |
|
content: str |
|
timestamp: str |
|
|
|
class ChatHistory: |
|
def __init__(self): |
|
self.messages: List[Message] = [] |
|
|
|
def add_message(self, role: str, content: str): |
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
self.messages.append(Message(role=role, content=content, timestamp=timestamp)) |
|
|
|
def get_formatted_history(self, max_messages: int = 10) -> str: |
|
recent_messages = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages |
|
formatted_history = "\n".join([ |
|
f"{msg.role}: {msg.content}" for msg in recent_messages |
|
]) |
|
return formatted_history |
|
|
|
def clear(self): |
|
self.messages = [] |
|
|
|
|
|
load_dotenv() |
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
C_apikey = os.getenv("C_apikey") |
|
OPENAPI_KEY = os.getenv("OPENAPI_KEY") |
|
|
|
if not HF_TOKEN: |
|
logger.error("HF_TOKEN is not set in the environment variables.") |
|
exit(1) |
|
|
|
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
|
|
|
try: |
|
client = QdrantClient( |
|
url=os.getenv("QDRANT_URL"), |
|
api_key=os.getenv("QDRANT_API_KEY"), |
|
prefer_grpc=False |
|
) |
|
except Exception as e: |
|
logger.error("Failed to connect to Qdrant.") |
|
exit(1) |
|
|
|
collection_name = "mawared" |
|
|
|
try: |
|
client.create_collection( |
|
collection_name=collection_name, |
|
vectors_config=models.VectorParams( |
|
size=384, |
|
distance=models.Distance.COSINE |
|
) |
|
) |
|
except Exception as e: |
|
if "already exists" not in str(e): |
|
logger.error(f"Error creating collection: {e}") |
|
exit(1) |
|
|
|
db = Qdrant( |
|
client=client, |
|
collection_name=collection_name, |
|
embeddings=embeddings, |
|
) |
|
|
|
retriever = db.as_retriever( |
|
search_type="similarity", |
|
search_kwargs={"k": 5} |
|
) |
|
|
|
llm = ChatCerebras( |
|
model="llama-3.3-70b", |
|
api_key=C_apikey, |
|
streaming=True |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template = """ |
|
You are a specialized friendly AI assistant for the Mawared HR System, designed to provide accurate and contextually relevant support based solely on the provided context and chat history. |
|
|
|
Core Principles |
|
Source of Truth: Use only the information available in the retrieved context and chat history. Do not fabricate details or access external knowledge. |
|
|
|
Clarity and Precision: Communicate clearly, concisely, and professionally, using straightforward language for easy comprehension. |
|
|
|
Actionable Guidance: Deliver practical solutions, step-by-step workflows, and troubleshooting advice directly related to Mawared HR queries. |
|
|
|
Structured Instructions: Provide numbered, easy-to-follow instructions when explaining complex processes. |
|
|
|
Targeted Clarification: If a query lacks detail, ask specific questions to obtain the necessary information, explicitly stating what is missing. |
|
|
|
Exclusive Focus: Address only Mawared HR-related topics and avoid unrelated discussions. |
|
|
|
Professional Tone: Maintain a friendly, approachable, and professional demeanor. |
|
|
|
Response Guidelines |
|
Analyze the Query Thoughtfully: |
|
|
|
Start by thoroughly examining the user's question and reviewing the chat history. |
|
Consider what the user explicitly asked and infer their intent from the context provided. |
|
Mentally identify potential gaps in information before proceeding. |
|
Break Down Context Relevance: |
|
|
|
Isolate and interpret relevant pieces of context that apply directly to the query. |
|
Match the user's needs with the most relevant data from the context or chat history. |
|
Develop the Response in a Stepwise Manner: |
|
|
|
Construct a logical chain of thoughts: |
|
What does the user want to achieve? |
|
Which parts of the context can address this? |
|
What steps or details are needed for clarity? |
|
Provide responses in a structured, easy-to-follow format (e.g., numbered lists, bullet points). |
|
Ask for Clarifications Strategically: |
|
|
|
If the query lacks sufficient detail, identify the precise information missing. |
|
Frame your clarification politely and explicitly (e.g., “Could you confirm [specific detail] to proceed with [action/task]?”). |
|
Ensure Directness and Professionalism: |
|
|
|
Avoid unnecessary elaborations or irrelevant information. |
|
Maintain a friendly, professional tone throughout the response. |
|
Double-Check for Exclusivity: |
|
|
|
Ensure all guidance is strictly based on the retrieved context and chat history. |
|
Avoid speculating or introducing external knowledge about Mawared HR. |
|
Handling Information Gaps |
|
If the provided context is insufficient to answer the query: |
|
State explicitly that additional information is required to proceed. |
|
Clearly outline what details are missing. |
|
Avoid fabricating details or making assumptions. |
|
Critical Constraint |
|
STRICTLY rely on the provided context and chat history for all responses. Do not generate information about Mawared HR beyond these sources. |
|
|
|
Note: Do not mention a human support contact unless explicitly asked. |
|
Previous Conversation: {chat_history} |
|
Retrieved Context: {context} |
|
Current Question: {question} |
|
Answer:{{answer}} |
|
""" |
|
|
|
prompt = ChatPromptTemplate.from_template(template) |
|
|
|
def create_rag_chain(chat_history: str): |
|
chain = ( |
|
{ |
|
"context": retriever, |
|
"question": RunnablePassthrough(), |
|
"chat_history": lambda x: chat_history |
|
} |
|
| prompt |
|
| llm |
|
| StrOutputParser() |
|
) |
|
return chain |
|
|
|
chat_history = ChatHistory() |
|
|
|
def process_stream(stream_queue: Queue, history: List[List[str]]) -> Generator[List[List[str]], None, None]: |
|
"""Process the streaming response and update the chat interface""" |
|
current_response = "" |
|
|
|
while True: |
|
chunk = stream_queue.get() |
|
if chunk is None: |
|
break |
|
|
|
current_response += chunk |
|
new_history = history.copy() |
|
new_history[-1][1] = current_response |
|
yield new_history |
|
|
|
|
|
def ask_question_gradio(question: str, history: List[List[str]]) -> Generator[tuple, None, None]: |
|
try: |
|
if history is None: |
|
history = [] |
|
|
|
chat_history.add_message("user", question) |
|
formatted_history = chat_history.get_formatted_history() |
|
rag_chain = create_rag_chain(formatted_history) |
|
|
|
|
|
history.append([question, ""]) |
|
|
|
|
|
stream_queue = Queue() |
|
|
|
|
|
def stream_processor(): |
|
try: |
|
for chunk in rag_chain.stream(question): |
|
stream_queue.put(chunk) |
|
stream_queue.put(None) |
|
except Exception as e: |
|
logger.error(f"Streaming error: {e}") |
|
stream_queue.put(None) |
|
|
|
|
|
Thread(target=stream_processor).start() |
|
|
|
|
|
response = "" |
|
for updated_history in process_stream(stream_queue, history): |
|
response = updated_history[-1][1] |
|
yield "", updated_history |
|
|
|
|
|
chat_history.add_message("assistant", response) |
|
|
|
except Exception as e: |
|
logger.error(f"Error during question processing: {e}") |
|
if not history: |
|
history = [] |
|
history.append([question, "An error occurred. Please try again later."]) |
|
yield "", history |
|
|
|
def clear_chat(): |
|
chat_history.clear() |
|
return [], "" |
|
|
|
|
|
with gr.Blocks(theme='ParityError/Interstellar') as iface: |
|
gr.Image("Image.jpg", width=750, height=300, show_label=False, show_download_button=False) |
|
gr.Markdown("# Mawared HR Assistant 3.0.0") |
|
gr.Markdown('### Instructions') |
|
gr.Markdown("Ask a question about MawaredHR and get a detailed answer") |
|
|
|
chatbot = gr.Chatbot( |
|
height=750, |
|
show_label=False, |
|
bubble_full_width=False, |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=20): |
|
question_input = gr.Textbox( |
|
label="Ask a question:", |
|
placeholder="Type your question here...", |
|
show_label=False |
|
) |
|
with gr.Column(scale=4): |
|
with gr.Row(): |
|
with gr.Column(): |
|
send_button = gr.Button("Send", variant="primary", size="sm") |
|
clear_button = gr.Button("Clear Chat", size="sm") |
|
|
|
|
|
submit_events = [question_input.submit, send_button.click] |
|
for submit_event in submit_events: |
|
submit_event( |
|
ask_question_gradio, |
|
inputs=[question_input, chatbot], |
|
outputs=[question_input, chatbot] |
|
) |
|
|
|
clear_button.click( |
|
clear_chat, |
|
outputs=[chatbot, question_input] |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |