ysharma's picture
ysharma HF staff
Update app.py
c90616c verified
from crewai import Agent, Task, Crew
import gradio as gr
import asyncio
from typing import List, Dict, Any, Generator
from langchain_openai import ChatOpenAI
import queue
import threading
import os
class AgentMessageQueue:
def __init__(self):
self.message_queue = queue.Queue()
self.last_agent = None
def add_message(self, message: Dict):
print(f"Adding message to queue: {message}") # Debug print
self.message_queue.put(message)
def get_messages(self) -> List[Dict]:
messages = []
while not self.message_queue.empty():
messages.append(self.message_queue.get())
return messages
class ArticleCrew:
def __init__(self, api_key: str = None):
self.api_key = api_key
self.message_queue = AgentMessageQueue()
self.planner = None
self.writer = None
self.editor = None
self.current_agent = None
self.final_article = None
def initialize_agents(self, topic: str):
if not self.api_key:
raise ValueError("OpenAI API key is required")
os.environ["OPENAI_API_KEY"] = self.api_key
llm = ChatOpenAI(temperature=0.7, model="gpt-4")
self.planner = Agent(
role="Content Planner",
goal=f"Plan engaging and factually accurate content on {topic}",
backstory="Expert content planner with focus on creating engaging outlines",
allow_delegation=False,
verbose=True,
llm=llm
)
self.writer = Agent(
role="Content Writer",
goal=f"Write insightful and factually accurate piece about {topic}",
backstory="Expert content writer with focus on engaging articles",
allow_delegation=False,
verbose=True,
llm=llm
)
self.editor = Agent(
role="Editor",
goal="Polish and refine the article",
backstory="Expert editor with eye for detail and clarity",
allow_delegation=False,
verbose=True,
llm=llm
)
def create_tasks(self, topic: str) -> List[Task]:
planner_task = Task(
description=f"""Create a detailed content plan for an article about {topic} by:
1. Prioritizing the latest trends, key players, and noteworthy news
2. Identifying the target audience, considering their interests and pain points
3. Developing a detailed content outline including introduction, key points, and call to action
4. Including SEO keywords and relevant data or sources""",
expected_output="A comprehensive content plan with outline, keywords, and target audience analysis",
agent=self.planner
)
writer_task = Task(
description="""Based on the provided content plan:
1. Use the content plan to craft a compelling blog post
2. Incorporate SEO keywords naturally
3. Ensure sections/subtitles are properly named in an engaging manner
4. Create proper structure with introduction, body, and conclusion
5. Proofread for grammatical errors""",
expected_output="A well-written article draft following the content plan",
agent=self.writer
)
editor_task = Task(
description="""Review the written article by:
1. Checking for clarity and coherence
2. Correcting any grammatical errors and typos
3. Ensuring consistent tone and style
4. Verifying proper formatting and structure""",
expected_output="A polished, final version of the article ready for publication",
agent=self.editor
)
return [planner_task, writer_task, editor_task]
async def process_article(self, topic: str) -> Generator[List[Dict], None, None]:
def add_agent_messages(agent_name: str, tasks: str, emoji: str = "πŸ€–"):
# Add agent header
self.message_queue.add_message({
"role": "assistant",
"content": agent_name,
"metadata": {"title": f"{emoji} {agent_name}"}
})
# Add task description
self.message_queue.add_message({
"role": "assistant",
"content": tasks,
"metadata": {"title": f"πŸ“‹ Task for {agent_name}"}
})
def setup_next_agent(current_agent: str) -> None:
agent_sequence = {
"Content Planner": ("Content Writer", """1. Use the content plan to craft a compelling blog post
2. Incorporate SEO keywords naturally
3. Ensure sections/subtitles are properly named in an engaging manner
4. Create proper structure with introduction, body, and conclusion
5. Proofread for grammatical errors"""),
"Content Writer": ("Editor", """1. Review the article for clarity and coherence
2. Check for grammatical errors and typos
3. Ensure consistent tone and style
4. Verify proper formatting and structure""")
}
if current_agent in agent_sequence:
next_agent, tasks = agent_sequence[current_agent]
self.current_agent = next_agent
add_agent_messages(next_agent, tasks)
def task_callback(task_output) -> None:
print(f"Task callback received: {task_output}") # Debug print
# Extract content from raw output
raw_output = task_output.raw
if "## Final Answer:" in raw_output:
content = raw_output.split("## Final Answer:")[1].strip()
else:
content = raw_output.strip()
# Handle the output based on current agent
if self.current_agent == "Editor":
# Don't show editor's output with metadata
# Instead, show completion message and final article
self.message_queue.add_message({
"role": "assistant",
"content": "Final article is ready!",
"metadata": {"title": "πŸ“ Final Article"}
})
# Convert common markdown patterns to Gradio-compatible markdown
formatted_content = content
# Ensure proper spacing for headers
formatted_content = formatted_content.replace("\n#", "\n\n#")
# Ensure proper spacing for lists
formatted_content = formatted_content.replace("\n-", "\n\n-")
formatted_content = formatted_content.replace("\n*", "\n\n*")
formatted_content = formatted_content.replace("\n1.", "\n\n1.")
# Ensure proper spacing for paragraphs
formatted_content = formatted_content.replace("\n\n\n", "\n\n")
# Add the final article content without metadata
self.message_queue.add_message({
"role": "assistant",
"content": formatted_content
})
else:
# For other agents, show their output with metadata
self.message_queue.add_message({
"role": "assistant",
"content": content,
"metadata": {"title": f"✨ Output from {self.current_agent}"}
})
# Setup next agent
setup_next_agent(self.current_agent)
def step_callback(output: Any) -> None:
print(f"Step callback received: {output}") # Debug print
# We'll only use step_callback for logging purposes now
pass
try:
self.initialize_agents(topic)
self.current_agent = "Content Planner"
# Start process
yield [{
"role": "assistant",
"content": "Starting work on your article...",
"metadata": {"title": "πŸš€ Process Started"}
}]
# Initialize first agent
add_agent_messages("Content Planner",
"""1. Prioritize the latest trends, key players, and noteworthy news
2. Identify the target audience, considering their interests and pain points
3. Develop a detailed content outline including introduction, key points, and call to action
4. Include SEO keywords and relevant data or sources""")
crew = Crew(
agents=[self.planner, self.writer, self.editor],
tasks=self.create_tasks(topic),
verbose=True,
step_callback=step_callback,
task_callback=task_callback
)
def run_crew():
try:
crew.kickoff()
except Exception as e:
print(f"Error in crew execution: {str(e)}") # Debug print
self.message_queue.add_message({
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
})
thread = threading.Thread(target=run_crew)
thread.start()
while thread.is_alive() or not self.message_queue.message_queue.empty():
messages = self.message_queue.get_messages()
if messages:
print(f"Yielding messages: {messages}") # Debug print
yield messages
await asyncio.sleep(0.1)
except Exception as e:
print(f"Error in process_article: {str(e)}") # Debug print
yield [{
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
}]
# [Rest of the code remains the same]
def create_demo():
article_crew = None
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸ“ AI Article Writing Crew")
openai_api_key = gr.Textbox(
label='OpenAI API Key',
type='password',
placeholder='Enter your OpenAI API key...',
interactive=True
)
chatbot = gr.Chatbot(
label="Writing Process",
height=700,
type="messages",
show_label=True,
visible=False,
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"),
render_markdown=True # Enable markdown rendering
)
with gr.Row(equal_height=True):
topic = gr.Textbox(
label="Article Topic",
placeholder="Enter topic...",
scale=4,
visible=False
)
btn = gr.Button("Write Article", variant="primary", scale=1, visible=False)
async def process_input(topic, history, api_key):
nonlocal article_crew
if not api_key:
history = history or []
history.append({
"role": "assistant",
"content": "Please provide an OpenAI API key.",
"metadata": {"title": "❌ Error"}
})
yield history
return
if article_crew is None:
article_crew = ArticleCrew(api_key=api_key)
history = history or []
history.append({"role": "user", "content": f"Write an article about: {topic}"})
yield history
try:
async for messages in article_crew.process_article(topic):
history.extend(messages)
yield history
except Exception as e:
history.append({
"role": "assistant",
"content": f"An error occurred: {str(e)}",
"metadata": {"title": "❌ Error"}
})
yield history
def show_interface():
return {
openai_api_key: gr.Textbox(visible=False),
chatbot: gr.Chatbot(visible=True),
topic: gr.Textbox(visible=True),
btn: gr.Button(visible=True)
}
openai_api_key.submit(show_interface, None, [openai_api_key, chatbot, topic, btn])
btn.click(process_input, [topic, chatbot, openai_api_key], [chatbot])
topic.submit(process_input, [topic, chatbot, openai_api_key], [chatbot])
return demo
if __name__ == "__main__":
demo = create_demo()
demo.queue()
demo.launch(debug=True)