from crewai import Agent, Task, Crew import gradio as gr import asyncio from typing import List, Dict, Any, Generator from langchain_openai import ChatOpenAI import queue import threading import os class AgentMessageQueue: def __init__(self): self.message_queue = queue.Queue() self.last_agent = None def add_message(self, message: Dict): print(f"Adding message to queue: {message}") # Debug print self.message_queue.put(message) def get_messages(self) -> List[Dict]: messages = [] while not self.message_queue.empty(): messages.append(self.message_queue.get()) return messages class ArticleCrew: def __init__(self, api_key: str = None): self.api_key = api_key self.message_queue = AgentMessageQueue() self.planner = None self.writer = None self.editor = None self.current_agent = None self.final_article = None def initialize_agents(self, topic: str): if not self.api_key: raise ValueError("OpenAI API key is required") os.environ["OPENAI_API_KEY"] = self.api_key llm = ChatOpenAI(temperature=0.7, model="gpt-4") self.planner = Agent( role="Content Planner", goal=f"Plan engaging and factually accurate content on {topic}", backstory="Expert content planner with focus on creating engaging outlines", allow_delegation=False, verbose=True, llm=llm ) self.writer = Agent( role="Content Writer", goal=f"Write insightful and factually accurate piece about {topic}", backstory="Expert content writer with focus on engaging articles", allow_delegation=False, verbose=True, llm=llm ) self.editor = Agent( role="Editor", goal="Polish and refine the article", backstory="Expert editor with eye for detail and clarity", allow_delegation=False, verbose=True, llm=llm ) def create_tasks(self, topic: str) -> List[Task]: planner_task = Task( description=f"""Create a detailed content plan for an article about {topic} by: 1. Prioritizing the latest trends, key players, and noteworthy news 2. Identifying the target audience, considering their interests and pain points 3. Developing a detailed content outline including introduction, key points, and call to action 4. Including SEO keywords and relevant data or sources""", expected_output="A comprehensive content plan with outline, keywords, and target audience analysis", agent=self.planner ) writer_task = Task( description="""Based on the provided content plan: 1. Use the content plan to craft a compelling blog post 2. Incorporate SEO keywords naturally 3. Ensure sections/subtitles are properly named in an engaging manner 4. Create proper structure with introduction, body, and conclusion 5. Proofread for grammatical errors""", expected_output="A well-written article draft following the content plan", agent=self.writer ) editor_task = Task( description="""Review the written article by: 1. Checking for clarity and coherence 2. Correcting any grammatical errors and typos 3. Ensuring consistent tone and style 4. Verifying proper formatting and structure""", expected_output="A polished, final version of the article ready for publication", agent=self.editor ) return [planner_task, writer_task, editor_task] async def process_article(self, topic: str) -> Generator[List[Dict], None, None]: def add_agent_messages(agent_name: str, tasks: str, emoji: str = "🤖"): # Add agent header self.message_queue.add_message({ "role": "assistant", "content": agent_name, "metadata": {"title": f"{emoji} {agent_name}"} }) # Add task description self.message_queue.add_message({ "role": "assistant", "content": tasks, "metadata": {"title": f"📋 Task for {agent_name}"} }) def setup_next_agent(current_agent: str) -> None: agent_sequence = { "Content Planner": ("Content Writer", """1. Use the content plan to craft a compelling blog post 2. Incorporate SEO keywords naturally 3. Ensure sections/subtitles are properly named in an engaging manner 4. Create proper structure with introduction, body, and conclusion 5. Proofread for grammatical errors"""), "Content Writer": ("Editor", """1. Review the article for clarity and coherence 2. Check for grammatical errors and typos 3. Ensure consistent tone and style 4. Verify proper formatting and structure""") } if current_agent in agent_sequence: next_agent, tasks = agent_sequence[current_agent] self.current_agent = next_agent add_agent_messages(next_agent, tasks) def task_callback(task_output) -> None: print(f"Task callback received: {task_output}") # Debug print # Extract content from raw output raw_output = task_output.raw if "## Final Answer:" in raw_output: content = raw_output.split("## Final Answer:")[1].strip() else: content = raw_output.strip() # Handle the output based on current agent if self.current_agent == "Editor": # Don't show editor's output with metadata # Instead, show completion message and final article self.message_queue.add_message({ "role": "assistant", "content": "Final article is ready!", "metadata": {"title": "📝 Final Article"} }) # Convert common markdown patterns to Gradio-compatible markdown formatted_content = content # Ensure proper spacing for headers formatted_content = formatted_content.replace("\n#", "\n\n#") # Ensure proper spacing for lists formatted_content = formatted_content.replace("\n-", "\n\n-") formatted_content = formatted_content.replace("\n*", "\n\n*") formatted_content = formatted_content.replace("\n1.", "\n\n1.") # Ensure proper spacing for paragraphs formatted_content = formatted_content.replace("\n\n\n", "\n\n") # Add the final article content without metadata self.message_queue.add_message({ "role": "assistant", "content": formatted_content }) else: # For other agents, show their output with metadata self.message_queue.add_message({ "role": "assistant", "content": content, "metadata": {"title": f"✨ Output from {self.current_agent}"} }) # Setup next agent setup_next_agent(self.current_agent) def step_callback(output: Any) -> None: print(f"Step callback received: {output}") # Debug print # We'll only use step_callback for logging purposes now pass try: self.initialize_agents(topic) self.current_agent = "Content Planner" # Start process yield [{ "role": "assistant", "content": "Starting work on your article...", "metadata": {"title": "🚀 Process Started"} }] # Initialize first agent add_agent_messages("Content Planner", """1. Prioritize the latest trends, key players, and noteworthy news 2. Identify the target audience, considering their interests and pain points 3. Develop a detailed content outline including introduction, key points, and call to action 4. Include SEO keywords and relevant data or sources""") crew = Crew( agents=[self.planner, self.writer, self.editor], tasks=self.create_tasks(topic), verbose=True, step_callback=step_callback, task_callback=task_callback ) def run_crew(): try: crew.kickoff() except Exception as e: print(f"Error in crew execution: {str(e)}") # Debug print self.message_queue.add_message({ "role": "assistant", "content": f"An error occurred: {str(e)}", "metadata": {"title": "❌ Error"} }) thread = threading.Thread(target=run_crew) thread.start() while thread.is_alive() or not self.message_queue.message_queue.empty(): messages = self.message_queue.get_messages() if messages: print(f"Yielding messages: {messages}") # Debug print yield messages await asyncio.sleep(0.1) except Exception as e: print(f"Error in process_article: {str(e)}") # Debug print yield [{ "role": "assistant", "content": f"An error occurred: {str(e)}", "metadata": {"title": "❌ Error"} }] # [Rest of the code remains the same] def create_demo(): article_crew = None with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 📝 AI Article Writing Crew") openai_api_key = gr.Textbox( label='OpenAI API Key', type='password', placeholder='Enter your OpenAI API key...', interactive=True ) chatbot = gr.Chatbot( label="Writing Process", height=700, type="messages", show_label=True, visible=False, avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), render_markdown=True # Enable markdown rendering ) with gr.Row(equal_height=True): topic = gr.Textbox( label="Article Topic", placeholder="Enter topic...", scale=4, visible=False ) btn = gr.Button("Write Article", variant="primary", scale=1, visible=False) async def process_input(topic, history, api_key): nonlocal article_crew if not api_key: history = history or [] history.append({ "role": "assistant", "content": "Please provide an OpenAI API key.", "metadata": {"title": "❌ Error"} }) yield history return if article_crew is None: article_crew = ArticleCrew(api_key=api_key) history = history or [] history.append({"role": "user", "content": f"Write an article about: {topic}"}) yield history try: async for messages in article_crew.process_article(topic): history.extend(messages) yield history except Exception as e: history.append({ "role": "assistant", "content": f"An error occurred: {str(e)}", "metadata": {"title": "❌ Error"} }) yield history def show_interface(): return { openai_api_key: gr.Textbox(visible=False), chatbot: gr.Chatbot(visible=True), topic: gr.Textbox(visible=True), btn: gr.Button(visible=True) } openai_api_key.submit(show_interface, None, [openai_api_key, chatbot, topic, btn]) btn.click(process_input, [topic, chatbot, openai_api_key], [chatbot]) topic.submit(process_input, [topic, chatbot, openai_api_key], [chatbot]) return demo if __name__ == "__main__": demo = create_demo() demo.queue() demo.launch(debug=True)