File size: 12,609 Bytes
92d452d 26c0317 92d452d 26c0317 92d452d 26c0317 92d452d 26c0317 92d452d 26c0317 92d452d 26c0317 92d452d 26c0317 7428132 26c0317 7428132 26c0317 7428132 26c0317 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# imports
import gradio as gr
from crewai import Agent, Task, Crew
from crewai_tools import ScrapeWebsiteTool
import os
import queue
import threading
import asyncio
from typing import List, Dict, Generator
# Message Queue System to manage flow of message
class SupportMessageQueue:
def __init__(self):
self.message_queue = queue.Queue()
self.last_agent = None
def add_message(self, message: Dict):
print(f"Adding message to queue: {message}")
self.message_queue.put(message)
def get_messages(self) -> List[Dict]:
messages = []
while not self.message_queue.empty():
messages.append(self.message_queue.get())
return messages
# main class
class SupportCrew:
def __init__(self, api_key: str = None):
self.api_key = api_key
self.message_queue = SupportMessageQueue()
self.support_agent = None
self.qa_agent = None
self.current_agent = None
self.scrape_tool = None
# agent initialization with role, goal, and backstory
def initialize_agents(self, website_url: str):
if not self.api_key:
raise ValueError("OpenAI API key is required")
os.environ["OPENAI_API_KEY"] = self.api_key
self.scrape_tool = ScrapeWebsiteTool(website_url=website_url)
self.support_agent = Agent(
role="Senior Support Representative",
goal="Be the most friendly and helpful support representative in your team",
backstory=(
"You work at crewAI and are now working on providing support to customers. "
"You need to make sure that you provide the best support! "
"Make sure to provide full complete answers, and make no assumptions."
),
allow_delegation=False,
verbose=True
)
self.qa_agent = Agent(
role="Support Quality Assurance Specialist",
goal="Get recognition for providing the best support quality assurance in your team",
backstory=(
"You work at crewAI and are now working with your team on customer requests "
"ensuring that the support representative is providing the best support possible. "
"You need to make sure that the support representative is providing full "
"complete answers, and make no assumptions."
),
verbose=True
)
# task creation with description and expected output format and tools
def create_tasks(self, inquiry: str) -> List[Task]:
inquiry_resolution = Task(
description=(
f"A customer just reached out with a super important ask:\n{inquiry}\n\n"
"Make sure to use everything you know to provide the best support possible. "
"You must strive to provide a complete and accurate response to the customer's inquiry."
),
expected_output=(
"A detailed, informative response to the customer's inquiry that addresses "
"all aspects of their question.\n"
"The response should include references to everything you used to find the answer, "
"including external data or solutions. Ensure the answer is complete, "
"leaving no questions unanswered, and maintain a helpful and friendly tone throughout."
),
tools=[self.scrape_tool],
agent=self.support_agent
)
quality_assurance_review = Task(
description=(
"Review the response drafted by the Senior Support Representative for the customer's inquiry. "
"Ensure that the answer is comprehensive, accurate, and adheres to the "
"high-quality standards expected for customer support.\n"
"Verify that all parts of the customer's inquiry have been addressed "
"thoroughly, with a helpful and friendly tone.\n"
"Check for references and sources used to find the information, "
"ensuring the response is well-supported and leaves no questions unanswered."
),
expected_output=(
"A final, detailed, and informative response ready to be sent to the customer.\n"
"This response should fully address the customer's inquiry, incorporating all "
"relevant feedback and improvements.\n"
"Don't be too formal, maintain a professional and friendly tone throughout."
),
agent=self.qa_agent
)
return [inquiry_resolution, quality_assurance_review]
# main processing function
async def process_support(self, inquiry: str, website_url: str) -> Generator[List[Dict], None, None]:
def add_agent_messages(agent_name: str, tasks: str, emoji: str = "π€"):
self.message_queue.add_message({
"role": "assistant",
"content": agent_name,
"metadata": {"title": f"{emoji} {agent_name}"}
})
self.message_queue.add_message({
"role": "assistant",
"content": tasks,
"metadata": {"title": f"π Task for {agent_name}"}
})
# Manages transition between agents
def setup_next_agent(current_agent: str) -> None:
if current_agent == "Senior Support Representative":
self.current_agent = "Support Quality Assurance Specialist"
add_agent_messages(
"Support Quality Assurance Specialist",
"Review and improve the support representative's response"
)
def task_callback(task_output) -> None:
print(f"Task callback received: {task_output}")
raw_output = task_output.raw
if "## Final Answer:" in raw_output:
content = raw_output.split("## Final Answer:")[1].strip()
else:
content = raw_output.strip()
if self.current_agent == "Support Quality Assurance Specialist":
self.message_queue.add_message({
"role": "assistant",
"content": "Final response is ready!",
"metadata": {"title": "β
Final Response"}
})
formatted_content = content
formatted_content = formatted_content.replace("\n#", "\n\n#")
formatted_content = formatted_content.replace("\n-", "\n\n-")
formatted_content = formatted_content.replace("\n*", "\n\n*")
formatted_content = formatted_content.replace("\n1.", "\n\n1.")
formatted_content = formatted_content.replace("\n\n\n", "\n\n")
self.message_queue.add_message({
"role": "assistant",
"content": formatted_content
})
else:
self.message_queue.add_message({
"role": "assistant",
"content": content,
"metadata": {"title": f"β¨ Output from {self.current_agent}"}
})
setup_next_agent(self.current_agent)
try:
self.initialize_agents(website_url)
self.current_agent = "Senior Support Representative"
yield [{
"role": "assistant",
"content": "Starting to process your inquiry...",
"metadata": {"title": "π Process Started"}
}]
add_agent_messages(
"Senior Support Representative",
"Analyze customer inquiry and provide comprehensive support"
)
crew = Crew(
agents=[self.support_agent, self.qa_agent],
tasks=self.create_tasks(inquiry),
verbose=True,
task_callback=task_callback
)
def run_crew():
try:
crew.kickoff()
except Exception as e:
print(f"Error in crew execution: {str(e)}")
self.message_queue.add_message({
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "β Error"}
})
thread = threading.Thread(target=run_crew)
thread.start()
while thread.is_alive() or not self.message_queue.message_queue.empty():
messages = self.message_queue.get_messages()
if messages:
print(f"Yielding messages: {messages}")
yield messages
await asyncio.sleep(0.1)
except Exception as e:
print(f"Error in process_support: {str(e)}")
yield [{
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "β Error"}
}]
def create_demo():
support_crew = None
with gr.Blocks(theme=gr.themes.Ocean()) as demo:
gr.Markdown("# π― AI Customer Support Crew")
gr.Markdown("This is a friendly, high-performing multi-agent application built with Gradio and CrewAI. Enter a webpage URL and your questions from that webpage.")
openai_api_key = gr.Textbox(
label='OpenAI API Key',
type='password',
placeholder='Type your OpenAI API Key and press Enter to access the app...',
interactive=True
)
chatbot = gr.Chatbot(
label="Support Process",
height=700,
type="messages",
show_label=True,
visible=False,
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"),
render_markdown=True
)
with gr.Row(equal_height=True):
inquiry = gr.Textbox(
label="Your Inquiry",
placeholder="Enter your question...",
scale=4,
visible=False
)
website_url = gr.Textbox(
label="Documentation URL",
placeholder="Enter documentation URL to search...",
scale=4,
visible=False
)
btn = gr.Button("Get Support", variant="primary", scale=1, visible=False)
async def process_input(inquiry_text, website_url_text, history, api_key):
nonlocal support_crew
if not api_key:
history = history or []
history.append({
"role": "assistant",
"content": "Please provide an OpenAI API key.",
"metadata": {"title": "β Error"}
})
yield history
return
if support_crew is None:
support_crew = SupportCrew(api_key=api_key)
history = history or []
history.append({
"role": "user",
"content": f"Question: {inquiry_text}\nDocumentation: {website_url_text}"
})
yield history
try:
async for messages in support_crew.process_support(inquiry_text, website_url_text):
history.extend(messages)
yield history
except Exception as e:
history.append({
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "β Error"}
})
yield history
def show_interface():
return {
openai_api_key: gr.Textbox(visible=False),
chatbot: gr.Chatbot(visible=True),
inquiry: gr.Textbox(visible=True),
website_url: gr.Textbox(visible=True),
btn: gr.Button(visible=True)
}
openai_api_key.submit(show_interface, None, [openai_api_key, chatbot, inquiry, website_url, btn])
btn.click(process_input, [inquiry, website_url, chatbot, openai_api_key], [chatbot])
inquiry.submit(process_input, [inquiry, website_url, chatbot, openai_api_key], [chatbot])
return demo
if __name__ == "__main__":
demo = create_demo()
demo.queue()
demo.launch(debug=True) |