ysharma's picture
ysharma HF staff
add more comments
92d452d verified
raw
history blame
12.6 kB
# imports
import gradio as gr
from crewai import Agent, Task, Crew
from crewai_tools import ScrapeWebsiteTool
import os
import queue
import threading
import asyncio
from typing import List, Dict, Generator
# Message Queue System to manage flow of message
class SupportMessageQueue:
def __init__(self):
self.message_queue = queue.Queue()
self.last_agent = None
def add_message(self, message: Dict):
print(f"Adding message to queue: {message}")
self.message_queue.put(message)
def get_messages(self) -> List[Dict]:
messages = []
while not self.message_queue.empty():
messages.append(self.message_queue.get())
return messages
# main class
class SupportCrew:
def __init__(self, api_key: str = None):
self.api_key = api_key
self.message_queue = SupportMessageQueue()
self.support_agent = None
self.qa_agent = None
self.current_agent = None
self.scrape_tool = None
# agent initialization with role, goal, and backstory
def initialize_agents(self, website_url: str):
if not self.api_key:
raise ValueError("OpenAI API key is required")
os.environ["OPENAI_API_KEY"] = self.api_key
self.scrape_tool = ScrapeWebsiteTool(website_url=website_url)
self.support_agent = Agent(
role="Senior Support Representative",
goal="Be the most friendly and helpful support representative in your team",
backstory=(
"You work at crewAI and are now working on providing support to customers. "
"You need to make sure that you provide the best support! "
"Make sure to provide full complete answers, and make no assumptions."
),
allow_delegation=False,
verbose=True
)
self.qa_agent = Agent(
role="Support Quality Assurance Specialist",
goal="Get recognition for providing the best support quality assurance in your team",
backstory=(
"You work at crewAI and are now working with your team on customer requests "
"ensuring that the support representative is providing the best support possible. "
"You need to make sure that the support representative is providing full "
"complete answers, and make no assumptions."
),
verbose=True
)
# task creation with description and expected output format and tools
def create_tasks(self, inquiry: str) -> List[Task]:
inquiry_resolution = Task(
description=(
f"A customer just reached out with a super important ask:\n{inquiry}\n\n"
"Make sure to use everything you know to provide the best support possible. "
"You must strive to provide a complete and accurate response to the customer's inquiry."
),
expected_output=(
"A detailed, informative response to the customer's inquiry that addresses "
"all aspects of their question.\n"
"The response should include references to everything you used to find the answer, "
"including external data or solutions. Ensure the answer is complete, "
"leaving no questions unanswered, and maintain a helpful and friendly tone throughout."
),
tools=[self.scrape_tool],
agent=self.support_agent
)
quality_assurance_review = Task(
description=(
"Review the response drafted by the Senior Support Representative for the customer's inquiry. "
"Ensure that the answer is comprehensive, accurate, and adheres to the "
"high-quality standards expected for customer support.\n"
"Verify that all parts of the customer's inquiry have been addressed "
"thoroughly, with a helpful and friendly tone.\n"
"Check for references and sources used to find the information, "
"ensuring the response is well-supported and leaves no questions unanswered."
),
expected_output=(
"A final, detailed, and informative response ready to be sent to the customer.\n"
"This response should fully address the customer's inquiry, incorporating all "
"relevant feedback and improvements.\n"
"Don't be too formal, maintain a professional and friendly tone throughout."
),
agent=self.qa_agent
)
return [inquiry_resolution, quality_assurance_review]
# main processing function
async def process_support(self, inquiry: str, website_url: str) -> Generator[List[Dict], None, None]:
def add_agent_messages(agent_name: str, tasks: str, emoji: str = "πŸ€–"):
self.message_queue.add_message({
"role": "assistant",
"content": agent_name,
"metadata": {"title": f"{emoji} {agent_name}"}
})
self.message_queue.add_message({
"role": "assistant",
"content": tasks,
"metadata": {"title": f"πŸ“‹ Task for {agent_name}"}
})
# Manages transition between agents
def setup_next_agent(current_agent: str) -> None:
if current_agent == "Senior Support Representative":
self.current_agent = "Support Quality Assurance Specialist"
add_agent_messages(
"Support Quality Assurance Specialist",
"Review and improve the support representative's response"
)
def task_callback(task_output) -> None:
print(f"Task callback received: {task_output}")
raw_output = task_output.raw
if "## Final Answer:" in raw_output:
content = raw_output.split("## Final Answer:")[1].strip()
else:
content = raw_output.strip()
if self.current_agent == "Support Quality Assurance Specialist":
self.message_queue.add_message({
"role": "assistant",
"content": "Final response is ready!",
"metadata": {"title": "βœ… Final Response"}
})
formatted_content = content
formatted_content = formatted_content.replace("\n#", "\n\n#")
formatted_content = formatted_content.replace("\n-", "\n\n-")
formatted_content = formatted_content.replace("\n*", "\n\n*")
formatted_content = formatted_content.replace("\n1.", "\n\n1.")
formatted_content = formatted_content.replace("\n\n\n", "\n\n")
self.message_queue.add_message({
"role": "assistant",
"content": formatted_content
})
else:
self.message_queue.add_message({
"role": "assistant",
"content": content,
"metadata": {"title": f"✨ Output from {self.current_agent}"}
})
setup_next_agent(self.current_agent)
try:
self.initialize_agents(website_url)
self.current_agent = "Senior Support Representative"
yield [{
"role": "assistant",
"content": "Starting to process your inquiry...",
"metadata": {"title": "πŸš€ Process Started"}
}]
add_agent_messages(
"Senior Support Representative",
"Analyze customer inquiry and provide comprehensive support"
)
crew = Crew(
agents=[self.support_agent, self.qa_agent],
tasks=self.create_tasks(inquiry),
verbose=True,
task_callback=task_callback
)
def run_crew():
try:
crew.kickoff()
except Exception as e:
print(f"Error in crew execution: {str(e)}")
self.message_queue.add_message({
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
})
thread = threading.Thread(target=run_crew)
thread.start()
while thread.is_alive() or not self.message_queue.message_queue.empty():
messages = self.message_queue.get_messages()
if messages:
print(f"Yielding messages: {messages}")
yield messages
await asyncio.sleep(0.1)
except Exception as e:
print(f"Error in process_support: {str(e)}")
yield [{
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
}]
def create_demo():
support_crew = None
with gr.Blocks(theme=gr.themes.Ocean()) as demo:
gr.Markdown("# 🎯 AI Customer Support Crew")
gr.Markdown("This is a friendly, high-performing multi-agent application built with Gradio and CrewAI. Enter a webpage URL and your questions from that webpage.")
openai_api_key = gr.Textbox(
label='OpenAI API Key',
type='password',
placeholder='Type your OpenAI API Key and press Enter to access the app...',
interactive=True
)
chatbot = gr.Chatbot(
label="Support Process",
height=700,
type="messages",
show_label=True,
visible=False,
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"),
render_markdown=True
)
with gr.Row(equal_height=True):
inquiry = gr.Textbox(
label="Your Inquiry",
placeholder="Enter your question...",
scale=4,
visible=False
)
website_url = gr.Textbox(
label="Documentation URL",
placeholder="Enter documentation URL to search...",
scale=4,
visible=False
)
btn = gr.Button("Get Support", variant="primary", scale=1, visible=False)
async def process_input(inquiry_text, website_url_text, history, api_key):
nonlocal support_crew
if not api_key:
history = history or []
history.append({
"role": "assistant",
"content": "Please provide an OpenAI API key.",
"metadata": {"title": "❌ Error"}
})
yield history
return
if support_crew is None:
support_crew = SupportCrew(api_key=api_key)
history = history or []
history.append({
"role": "user",
"content": f"Question: {inquiry_text}\nDocumentation: {website_url_text}"
})
yield history
try:
async for messages in support_crew.process_support(inquiry_text, website_url_text):
history.extend(messages)
yield history
except Exception as e:
history.append({
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
})
yield history
def show_interface():
return {
openai_api_key: gr.Textbox(visible=False),
chatbot: gr.Chatbot(visible=True),
inquiry: gr.Textbox(visible=True),
website_url: gr.Textbox(visible=True),
btn: gr.Button(visible=True)
}
openai_api_key.submit(show_interface, None, [openai_api_key, chatbot, inquiry, website_url, btn])
btn.click(process_input, [inquiry, website_url, chatbot, openai_api_key], [chatbot])
inquiry.submit(process_input, [inquiry, website_url, chatbot, openai_api_key], [chatbot])
return demo
if __name__ == "__main__":
demo = create_demo()
demo.queue()
demo.launch(debug=True)