Spaces:
Running
Running
import platform | |
import streamlit as st | |
import psutil | |
from typing import List, Dict, Optional, Any, Tuple | |
from dataclasses import dataclass | |
from enum import Enum | |
import logging | |
import time | |
import ast | |
import pylint.lint | |
import radon.complexity | |
import radon.metrics | |
from pylint.lint import Run | |
from pylint.reporters import JSONReporter | |
from coverage import Coverage | |
import bandit | |
from bandit.core import manager | |
from datetime import datetime | |
import os | |
import sys | |
import requests | |
import asyncio | |
import statistics | |
import json | |
import traceback | |
from typing import Dict, Any | |
from datetime import datetime | |
from pathlib import Path | |
# Set logging level from environment variable | |
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO')) | |
class AutonomousAgentApp: | |
"""Main application class for the Autonomous Agent System""" | |
def __init__(self): | |
self.workspace_manager = self.WorkspaceManager(workspace_dir=os.getenv('WORKSPACE_DIR', 'workspace')) # Use self.WorkspaceManager | |
self.pipeline = self._initialize_pipeline() | |
self.refinement_loop = self.RefinementLoop(pipeline=self.pipeline) # Use self.RefinementLoop | |
self.interface = self.StreamlitInterface(self) # Use self.StreamlitInterface | |
def _initialize_pipeline(self) -> 'AutonomousAgentApp.DevelopmentPipeline': | |
"""Initialize the development pipeline""" | |
return self.DevelopmentPipeline( | |
workspace_manager=self.workspace_manager, | |
tool_manager=self._setup_tool_manager() | |
) | |
def _setup_tool_manager(self): | |
"""Setup tool manager with configuration""" | |
return self.ToolManager() # Use self.ToolManager | |
def run(self): | |
"""Main application entry point""" | |
try: | |
logging.info("Starting Autonomous Agent Application") | |
self.interface.render_main_interface() | |
except Exception as e: | |
logging.error(f"Application error: {str(e)}") | |
st.error("An error occurred while starting the application. Please check the logs.") | |
raise | |
class WorkspaceManager: | |
"""Manages workspace files and directories.""" | |
def __init__(self, workspace_dir: str = "workspace"): | |
self.workspace_dir = workspace_dir | |
self._ensure_workspace_exists() | |
def _ensure_workspace_exists(self): | |
"""Ensure the workspace directory exists.""" | |
os.makedirs(self.workspace_dir, exist_ok=True) | |
def create_file(self, filename: str, content: str) -> str: | |
"""Create a file in the workspace with the given content.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
with open(file_path, "w") as f: | |
f.write(content) | |
return f"File '{filename}' created at '{file_path}'." | |
def delete_file(self, filename: str) -> str: | |
"""Delete a file from the workspace.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
return f"File '{filename}' deleted." | |
return f"File '{filename}' not found." | |
def list_files(self) -> List[str]: | |
"""List all files in the workspace.""" | |
return [ | |
os.path.join(root, file) | |
for root, _, files in os.walk(self.workspace_dir) | |
for file in files | |
] | |
def read_file(self, filename: str) -> str: | |
"""Read the content of a file in the workspace.""" | |
file_path = os.path.join(self.workspace_dir, filename) | |
if os.path.exists(file_path): | |
with open(file_path, "r") as f: | |
return f.read() | |
return f"File '{filename}' not found." | |
def get_workspace_tree(self) -> Dict[str, Any]: | |
"""Get the workspace directory structure as a nested dictionary.""" | |
workspace_path = Path(self.workspace_dir) | |
return self._build_tree(workspace_path) | |
def _build_tree(self, path: Path) -> Dict[str, Any]: | |
"""Recursively build a directory tree.""" | |
if path.is_file(): | |
return {"type": "file", "name": path.name} | |
elif path.is_dir(): | |
return { | |
"type": "directory", | |
"name": path.name, | |
"children": [self._build_tree(child) for child in path.iterdir()], | |
} | |
class AutonomousAgent: | |
"""Autonomous agent that builds tools and agents based on tasks.""" | |
def __init__(self, workspace_manager: 'AutonomousAgentApp.WorkspaceManager'): # Use fully qualified name | |
self.workspace_manager = workspace_manager | |
self.tools_dir = Path(self.workspace_manager.workspace_dir) / "tools" | |
self.agents_dir = Path(self.workspace_manager.workspace_dir) / "agents" | |
self.tools_dir.mkdir(exist_ok=True) # Ensure the tools directory exists | |
self.agents_dir.mkdir(exist_ok=True) # Ensure the agents directory exists | |
self.running = True # Flag to control the running state | |
async def run(self): | |
"""Run the autonomous agent, continuously processing tasks.""" | |
while self.running: | |
# Default task execution | |
await self.default_task() | |
await asyncio.sleep(1) # Prevent busy waiting | |
async def default_task(self): | |
"""Perform the default task of analyzing and generating tools/agents.""" | |
logging.info("Running default task...") | |
try: | |
# Simulate task processing | |
await asyncio.sleep(2) # Simulate time taken for the task | |
except Exception as e: | |
logging.error(f"Error during default task: {str(e)}") | |
async def pause(self): | |
"""Pause the current operation to accept user input.""" | |
self.running = False | |
logging.info("Paused. Waiting for user input...") | |
async def accept_user_input(self, user_input: str): | |
"""Process user input and execute commands.""" | |
logging.info(f"User input received: {user_input}") | |
commands = self.extract_commands(user_input) | |
for command in commands: | |
try: | |
if command.startswith("generate tool"): | |
await self.generate_tool(command) | |
elif command.startswith("generate agent"): | |
await self.generate_agent(command) | |
except Exception as e: | |
logging.error(f"Error processing command '{command}': {str(e)}") | |
def extract_commands(self, user_input: str) -> List[str]: | |
"""Extract commands from user input.""" | |
return user_input.split(';') # Assume commands are separated by semicolons | |
async def run_refinement_cycle(self, task: str) -> Dict[str, Any]: | |
"""Run a refinement cycle for the given task.""" | |
try: | |
task_analysis = await self._analyze_task(task) | |
search_results = await self._web_search(task) | |
tools_built = await self._build_tools(task_analysis, search_results) | |
execution_results = await self._execute_tools(tools_built) | |
return { | |
"task_analysis": task_analysis, | |
"search_results": search_results, | |
"tools_built": tools_built, | |
"execution_results": execution_results, | |
} | |
except Exception as e: | |
logging.error(f"Error during refinement cycle: {str(e)}") | |
async def _analyze_task(self, task: str) -> Dict[str, Any]: | |
"""Analyze the task to determine requirements.""" | |
keywords = self._extract_keywords(task) | |
requirements = self._generate_requirements(keywords) | |
return { | |
"task": task, | |
"keywords": keywords, | |
"requirements": requirements, | |
} | |
def _extract_keywords(self, text: str) -> List[str]: | |
"""Extract keywords from the task text.""" | |
stop_words = {"the", "and", "of", "to", "in", "a", "is", "for", "on", "with"} | |
words = [word.lower() for word in text.split() if word.lower() not in stop_words] | |
return list(set(words)) # Remove duplicates | |
def _generate_requirements(self, keywords: List[str]) -> List[str]: | |
"""Generate requirements based on extracted keywords.""" | |
requirement_map = { | |
"data": ["data collection", "data processing", "data visualization"], | |
"web": ["web scraping", "API integration", "web development"], | |
"ai": ["machine learning", "natural language processing", "computer vision"], | |
"automation": ["task automation", "workflow optimization", "scripting"], | |
} | |
requirements = [] | |
for keyword in keywords: | |
if keyword in requirement_map: | |
requirements.extend(requirement_map[keyword]) | |
return requirements | |
async def _web_search(self, query: str) -> List[Dict[str, Any]]: | |
"""Perform a web search for relevant approaches/methods.""" | |
try: | |
response = requests.get( | |
os.getenv('API_URL', 'https://api.example.com/search'), | |
params={"q": query, "limit": 5} | |
) | |
response.raise_for_status() | |
return response.json().get("results", []) | |
except Exception as e: | |
logging.error(f"Web search failed: {e}") | |
return [{"title": "Example Approach", "url": "https://example.com"}] | |
async def _build_tools(self, task_analysis: Dict[str, Any], search_results: List[Dict[str, Any]]) -> List[str]: | |
"""Build tools/agents based on the task and search results.""" | |
tools = [] | |
for requirement in task_analysis["requirements"]: | |
tool_name = f"tool_for_{requirement.replace(' ', '_')}.py" | |
tool_path = self.tools_dir / tool_name | |
# Generate a simple Python script for the tool | |
tool_code = self._generate_tool_code(requirement, search_results) | |
with open(tool_path, "w") as f: | |
f.write(tool_code) | |
tools.append(tool_name) | |
return tools | |
def _generate_tool_code(self, requirement: str, search_results: List[Dict[str, Any]]) -> str: | |
"""Generate Python code for a tool based on the requirement.""" | |
example_code = "" | |
for result in search_results: | |
if requirement.lower() in result["title"].lower(): | |
example_code = f"# Example code based on: {result['title']}\n" | |
example_code += f"# Source: {result['url']}\n" | |
break | |
tool_code = f""" | |
{example_code} | |
def {requirement.replace(' ', '_')}(): | |
print("Executing {requirement}...") | |
# Add your implementation here | |
if __name__ == "__main__": | |
{requirement.replace(' ', '_')}() | |
""" | |
return tool_code.strip() | |
async def _execute_tools(self, tools: List[str]) -> Dict[str, Any]: | |
"""Execute the built tools/agents.""" | |
execution_results = {} | |
for tool in tools: | |
tool_path = self.tools_dir / tool | |
try: | |
process = await asyncio.create_subprocess_exec( | |
"python", str(tool_path), | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE | |
) | |
stdout, stderr = await process.communicate() | |
execution_results[tool] = { | |
"status": "success" if process.returncode == 0 else "failed", | |
"stdout": stdout.decode(), | |
"stderr": stderr.decode(), | |
} | |
except Exception as e: | |
execution_results[tool] = { | |
"status": "error", | |
"error": str(e), | |
} | |
return execution_results | |
async def generate_tool(self, command: str): | |
"""Generate a tool based on the command.""" | |
tool_name = command.split(" ")[-1] # Extract tool name from command | |
tool_code = f"# Tool: {tool_name}\n\ndef {tool_name}():\n pass\n" # Placeholder code | |
tool_path = self.tools_dir / f"{tool_name}.py" | |
with open(tool_path, "w") as f: | |
f.write(tool_code) | |
logging.info(f"Generated tool: {tool_name}") | |
async def generate_agent(self, command: str): | |
"""Generate an agent based on the command.""" | |
agent_name = command.split(" ")[-1] # Extract agent name from command | |
agent_code = f"# Agent: {agent_name}\n\ndef {agent_name}():\n pass\n" # Placeholder code | |
agent_path = self.agents_dir / f"{agent_name}.py" | |
with open(agent_path, "w") as f: | |
f.write(agent_code) | |
logging.info(f"Generated agent: {agent_name}") | |
def stop(self): | |
"""Stop the autonomous agent.""" | |
self.running = False | |
logging.info("Autonomous agent stopped.") | |
class ToolManager: | |
"""Manages various tools used in the development pipeline.""" | |
def __init__(self): | |
self.tools = { | |
"requirements_analyzer": self._requirements_analyzer, | |
"task_breakdown": self._task_breakdown, | |
"code_generator": self._code_generator, | |
"code_quality_checker": self._code_quality_checker, | |
"test_generator": self._test_generator, | |
"test_runner": self._test_runner, | |
"coverage_analyzer": self._coverage_analyzer, | |
} | |
async def execute_tool(self, tool_name: str, input_data: Any) -> Dict[str, Any]: | |
"""Execute a tool with the given input data.""" | |
if tool_name in self.tools: | |
return await self.tools[tool_name](input_data) | |
else: | |
raise ValueError(f"Tool '{tool_name}' not found.") | |
async def _requirements_analyzer(self, requirements: str) -> Dict[str, Any]: | |
"""Analyze requirements and return a structured result.""" | |
# Placeholder implementation | |
return {"status": "success", "result": {"requirements": requirements}} | |
async def _task_breakdown(self, requirements: Dict[str, Any]) -> Dict[str, Any]: | |
"""Break down requirements into tasks.""" | |
# Placeholder implementation | |
return {"status": "success", "result": ["task1", "task2", "task3"]} | |
async def _code_generator(self, tasks: List[str]) -> Dict[str, Any]: | |
"""Generate code based on tasks.""" | |
# Placeholder implementation | |
return {"status": "success", "result": "generated_code"} | |
async def _code_quality_checker(self, code: str) -> Dict[str, Any]: | |
"""Check the quality of the generated code.""" | |
# Placeholder implementation | |
return {"status": "success", "result": {"quality_score": 0.9}} | |
async def _test_generator(self, code: str) -> Dict[str, Any]: | |
"""Generate tests for the code.""" | |
# Placeholder implementation | |
return {"status": "success", "result": ["test1", "test2", "test3"]} | |
async def _test_runner(self, tests: List[str]) -> Dict[str, Any]: | |
"""Run the generated tests.""" | |
# Placeholder implementation | |
return {"status": "success", "result": {"passed": 3, "failed": 0}} | |
async def _coverage_analyzer(self, test_results: Dict[str, Any]) -> Dict[str, Any]: | |
"""Analyze test coverage.""" | |
# Placeholder implementation | |
return {"status": "success", "result": {"coverage": 0.95}} | |
class DevelopmentPipeline: | |
"""Advanced development pipeline with stage management and monitoring.""" | |
class PipelineStage(Enum): | |
PLANNING = "planning" | |
DEVELOPMENT = "development" | |
TESTING = "testing" | |
DEPLOYMENT = "deployment" | |
MAINTENANCE = "maintenance" | |
ROLLBACK = "rollback" | |
def __init__(self, workspace_manager, tool_manager): | |
self.workspace_manager = workspace_manager | |
self.tool_manager = tool_manager | |
self.current_stage = None | |
self.stage_history = [] | |
self.stage_metrics = {} | |
self.logger = self._setup_logger() | |
def _setup_logger(self) -> logging.Logger: | |
"""Setup the pipeline logger.""" | |
logger = logging.getLogger("DevelopmentPipeline") | |
logger.setLevel(logging.DEBUG) | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
return logger | |
async def execute_stage(self, stage: PipelineStage, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute a pipeline stage with monitoring and error handling.""" | |
self.logger.info(f"Starting stage: {stage.value}") | |
start_time = time.time() | |
try: | |
self.current_stage = stage | |
self.stage_history.append(stage) | |
# Execute stage-specific logic | |
result = await self._execute_stage_logic(stage, context) | |
# Validate the stage output | |
self._validate_stage_output(stage, result) | |
# Record stage metrics | |
execution_time = time.time() - start_time | |
self._record_stage_metrics(stage, execution_time, result) | |
self.logger.info(f"Stage {stage.value} completed successfully.") | |
return { | |
"status": "success", | |
"stage": stage.value, | |
"result": result, | |
"execution_time": execution_time, | |
"metrics": self.stage_metrics.get(stage, {}) | |
} | |
except Exception as e: | |
self.logger.error(f"Error in stage {stage.value}: {str(e)}") | |
await self._handle_stage_failure(stage, context, e) | |
return { | |
"status": "error", | |
"stage": stage.value, | |
"error": str(e), | |
"execution_time": time.time() - start_time | |
} | |
async def _execute_stage_logic(self, stage: PipelineStage, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute stage-specific logic.""" | |
if stage == self.PipelineStage.PLANNING: | |
return await self._execute_planning_stage(context) | |
elif stage == self.PipelineStage.DEVELOPMENT: | |
return await self._execute_development_stage(context) | |
elif stage == self.PipelineStage.TESTING: | |
return await self._execute_testing_stage(context) | |
elif stage == self.PipelineStage.DEPLOYMENT: | |
return await self._execute_deployment_stage(context) | |
elif stage == self.PipelineStage.MAINTENANCE: | |
return await self._execute_maintenance_stage(context) | |
elif stage == self.PipelineStage.ROLLBACK: | |
return await self._execute_rollback_stage(context) | |
else: | |
raise ValueError(f"Unknown pipeline stage: {stage}") | |
async def _execute_planning_stage(self, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute planning stage with requirement analysis and task breakdown.""" | |
self.logger.info("Planning stage: Analyzing requirements and generating tasks...") | |
requirements = await self.tool_manager.execute_tool("requirements_analyzer", context.get("requirements", "")) | |
tasks = await self.tool_manager.execute_tool("task_breakdown", requirements["result"]) | |
project_structure = self.workspace_manager.create_project_structure( | |
context.get("project_name", "default_project"), tasks["result"] | |
) | |
return {"requirements": requirements["result"], "tasks": tasks["result"], "project_structure": project_structure} | |
async def _execute_development_stage(self, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute development stage with code generation and quality checks.""" | |
self.logger.info("Development stage: Generating code and performing quality checks...") | |
code_generation = await self.tool_manager.execute_tool("code_generator", context.get("tasks", [])) | |
quality_check = await self.tool_manager.execute_tool("code_quality_checker", code_generation["result"]) | |
saved_files = self.workspace_manager.save_generated_code( | |
context.get("project_name", "default_project"), code_generation["result"] | |
) | |
return {"generated_code": code_generation["result"], "quality_check": quality_check["result"], "saved_files": saved_files} | |
async def _execute_testing_stage(self, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute testing stage with comprehensive test suite.""" | |
self.logger.info("Testing stage: Generating and running tests...") | |
test_generation = await self.tool_manager.execute_tool("test_generator", context.get("generated_code", "")) | |
test_results = await self.tool_manager.execute_tool("test_runner", test_generation["result"]) | |
coverage_report = await self.tool_manager.execute_tool("coverage_analyzer", test_results["result"]) | |
return {"test_cases": test_generation["result"], "test_results": test_results["result"], "coverage_report": coverage_report["result"]} | |
async def _execute_deployment_stage(self, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute deployment stage by deploying the application.""" | |
self.logger.info("Deployment stage: Deploying the application...") | |
deployment_result = await self.tool_manager.execute_tool("deployment_tool", context.get("deployment_package", "")) | |
return {"deployment_result": deployment_result} | |
async def _execute_maintenance_stage(self, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute maintenance stage for updates and monitoring.""" | |
self.logger.info("Maintenance stage: Performing system updates and monitoring...") | |
monitoring_result = await self.tool_manager.execute_tool("monitoring_tool", context.get("system_status", "")) | |
return {"monitoring_result": monitoring_result} | |
async def _execute_rollback_stage(self, context: Dict[str, Any]) -> Dict[str, Any]: | |
"""Execute rollback stage to revert changes.""" | |
self.logger.info("Rollback stage: Reverting changes...") | |
rollback_result = await self.tool_manager.execute_tool("rollback_tool", context.get("rollback_point", "")) | |
return {"rollback_result": rollback_result} | |
def _validate_stage_output(self, stage: PipelineStage, result: Dict[str, Any]): | |
"""Validate the output of a stage.""" | |
if not result or "status" in result and result["status"] != "success": | |
raise ValueError(f"Stage {stage.value} failed validation with result: {result}") | |
def _record_stage_metrics(self, stage: PipelineStage, execution_time: float, result: Dict[str, Any]): | |
"""Record metrics for a stage.""" | |
if stage not in self.stage_metrics: | |
self.stage_metrics[stage] = { | |
"total_executions": 0, | |
"successful_executions": 0, | |
"failed_executions": 0, | |
"average_execution_time": 0, | |
"last_execution_time": 0, | |
"error_rate": 0.0 | |
} | |
metrics = self.stage_metrics[stage] | |
metrics["total_executions"] += 1 | |
metrics["last_execution_time"] = execution_time | |
if result.get("status") == "success": | |
metrics["successful_executions"] += 1 | |
else: | |
metrics["failed_executions"] += 1 | |
metrics["error_rate"] = metrics["failed_executions"] / metrics["total_executions"] | |
metrics["average_execution_time"] = ( | |
(metrics["average_execution_time"] * (metrics["total_executions"] - 1) + execution_time) | |
/ metrics["total_executions"] | |
) | |
async def _handle_stage_failure(self, stage: PipelineStage, context: Dict[str, Any], error: Exception): | |
"""Handle a failure during a pipeline stage.""" | |
self.logger.error(f"Handling failure for stage {stage.value}: {str(error)}") | |
if stage == self.PipelineStage.TESTING or stage == self.PipelineStage.DEPLOYMENT: | |
self.logger.info("Initiating rollback process...") | |
await self._execute_rollback_stage(context) | |
class CodeMetricsAnalyzer: | |
"""Analyzes code metrics using various tools""" | |
def __init__(self): | |
self.metrics_history = [] | |
def analyze_code_quality(self, file_path: str) -> Dict[str, Any]: | |
"""Analyzes code quality using multiple metrics""" | |
try: | |
# Pylint analysis | |
pylint_score = self._run_pylint(file_path) | |
# Complexity analysis | |
complexity_score = self._analyze_complexity(file_path) | |
# Test coverage analysis | |
coverage_score = self._analyze_test_coverage(file_path) | |
# Security analysis | |
security_score = self._analyze_security(file_path) | |
# Calculate overall quality score | |
quality_score = self._calculate_overall_score( | |
pylint_score, | |
complexity_score, | |
coverage_score, | |
security_score | |
) | |
metrics = { | |
"quality_score": quality_score, | |
"pylint_score": pylint_score, | |
"complexity_score": complexity_score, | |
"coverage_score": coverage_score, | |
"security_score": security_score, | |
"timestamp": datetime.now() | |
} | |
self.metrics_history.append(metrics) | |
return metrics | |
except Exception as e: | |
logging.error(f"Error analyzing code metrics: {str(e)}") | |
return { | |
"error": str(e), | |
"quality_score": 0.0, | |
"timestamp": datetime.now() | |
} | |
def _run_pylint(self, file_path: str) -> float: | |
"""Runs pylint analysis""" | |
try: | |
reporter = JSONReporter() | |
Run([file_path], reporter=reporter, do_exit=False) | |
score = reporter.data.get('score', 0.0) | |
return float(score) / 10.0 # Normalize to 0-1 scale | |
except Exception as e: | |
logging.error(f"Pylint analysis error: {str(e)}") | |
return 0.0 | |
def _analyze_complexity(self, file_path: str) -> float: | |
"""Analyzes code complexity""" | |
try: | |
with open(file_path, 'r') as file: | |
code = file.read() | |
# Calculate cyclomatic complexity | |
complexity = radon.complexity.cc_visit(code) | |
avg_complexity = sum(item.complexity for item in complexity) / len(complexity) if complexity else 0 | |
# Normalize complexity score (0-1 scale, lower is better) | |
normalized_score = 1.0 - min(avg_complexity / 10.0, 1.0) | |
return normalized_score | |
except Exception as e: | |
logging.error(f"Complexity analysis error: {str(e)}") | |
return 0.0 | |
async def _analyze_current_state(self, project_name: str) -> Dict[str, Any]: | |
"""Analyze current project state with detailed metrics.""" | |
try: | |
self.logger.info(f"Analyzing current state for project: {project_name}") | |
# Collect code metrics | |
code_metrics = await self._collect_code_metrics(project_name) | |
self.logger.info("Code metrics collected successfully.") | |
# Analyze test coverage | |
test_coverage = await self._analyze_test_coverage(project_name) | |
self.logger.info("Test coverage analysis completed.") | |
# Check security vulnerabilities | |
security_analysis = await self._analyze_security(project_name) | |
self.logger.info("Security analysis completed.") | |
# Measure performance metrics | |
performance_metrics = await self._measure_performance(project_name) | |
self.logger.info("Performance metrics measured.") | |
# Determine if requirements are met | |
meets_requirements = await self._check_requirements( | |
code_metrics, | |
test_coverage, | |
security_analysis, | |
performance_metrics | |
) | |
self.logger.info("Requirements check completed.") | |
return { | |
"code_metrics": code_metrics, | |
"test_coverage": test_coverage, | |
"security_analysis": security_analysis, | |
"performance_metrics": performance_metrics, | |
"meets_requirements": meets_requirements, | |
"timestamp": datetime.now() | |
} | |
except Exception as e: | |
self.logger.error(f"Error analyzing current state: {str(e)}") | |
raise | |
def _analyze_security(self, file_path: str) -> float: | |
"""Analyzes code security using bandit""" | |
try: | |
conf = manager.BanditManager() | |
conf.discover_files([file_path]) | |
conf.run_tests() | |
# Calculate security score based on findings | |
total_issues = len(conf.get_issue_list()) | |
max_severity = max((issue.severity for issue in conf.get_issue_list()), default=0) | |
# Normalize security score (0-1 scale, higher is better) | |
security_score = 1.0 - (total_issues * max_severity) / 10.0 | |
return max(0.0, min(1.0, security_score)) | |
except Exception as e: | |
logging.error(f"Security analysis error: {str(e)}") | |
return 0.0 | |
def _calculate_overall_score(self, pylint_score: float, complexity_score: float, | |
coverage_score: float, security_score: float) -> float: | |
"""Calculates overall code quality score""" | |
weights = { | |
'pylint': 0.3, | |
'complexity': 0.2, | |
'coverage': 0.25, | |
'security': 0.25 | |
} | |
overall_score = ( | |
weights['pylint'] * pylint_score + | |
weights['complexity'] * complexity_score + | |
weights['coverage'] * coverage_score + | |
weights['security'] * security_score | |
) | |
return max(0.0, min(1.0, overall_score)) | |
def get_metrics_history(self) -> List[Dict[str, Any]]: | |
"""Returns the history of metrics measurements""" | |
return self.metrics_history | |
def get_trend_analysis(self) -> Dict[str, Any]: | |
"""Analyzes trends in metrics over time""" | |
if not self.metrics_history: | |
return {"status": "No metrics history available"} | |
trends = { | |
"quality_score": self._calculate_trend([m["quality_score"] for m in self.metrics_history]), | |
"coverage_score": self._calculate_trend([m["coverage_score"] for m in self.metrics_history]), | |
"security_score": self._calculate_trend([m["security_score"] for m in self.metrics_history]) | |
} | |
return trends | |
def _calculate_trend(self, values: List[float]) -> Dict[str, Any]: | |
"""Calculates trend statistics for a metric""" | |
if not values: | |
return {"trend": "unknown", "change": 0.0} | |
recent_values = values[-3:] # Look at last 3 measurements | |
if len(recent_values) < 2: | |
return {"trend": "insufficient data", "change": 0.0} | |
change = recent_values[-1] - recent_values[0] | |
trend = "improving" if change > 0 else "declining" if change < 0 else "stable" | |
return { | |
"trend": trend, | |
"change": change, | |
"current": recent_values[-1], | |
"previous": recent_values[0] | |
} | |
class QualityMetrics: | |
"""Advanced quality metrics tracking and analysis""" | |
code_quality_score: float = 0.0 | |
test_coverage: float = 0.0 | |
security_score: str = "unknown" | |
performance_score: float = 0.0 | |
metrics_analyzer: CodeMetricsAnalyzer = None | |
def __post_init__(self): | |
self.metrics_analyzer = CodeMetricsAnalyzer() | |
self.history = [] | |
self.thresholds = { | |
"code_quality": 0.85, | |
"test_coverage": 0.90, | |
"security": 0.85, | |
"performance": 0.80 | |
} | |
def analyze_code(self, project_name: str) -> Dict[str, Any]: | |
"""Comprehensive code analysis""" | |
try: | |
# Get all Python files in the project | |
project_files = self._get_project_files(project_name) | |
aggregated_metrics = { | |
"code_quality": 0.0, | |
"test_coverage": 0.0, | |
"security": 0.0, | |
"performance": 0.0, | |
"files_analyzed": len(project_files), | |
"detailed_metrics": [] | |
} | |
for file_path in project_files: | |
metrics = self.metrics_analyzer.analyze_code_quality(file_path) | |
aggregated_metrics["detailed_metrics"].append({ | |
"file": file_path, | |
"metrics": metrics | |
}) | |
# Update aggregated scores | |
aggregated_metrics["code_quality"] += metrics["quality_score"] | |
aggregated_metrics["test_coverage"] += metrics["coverage_score"] | |
aggregated_metrics["security"] += metrics["security_score"] | |
# Calculate averages | |
if project_files: | |
for key in ["code_quality", "test_coverage", "security"]: | |
aggregated_metrics[key] /= len(project_files) | |
# Update instance variables | |
self.code_quality_score = aggregated_metrics["code_quality"] | |
self.test_coverage = aggregated_metrics["test_coverage"] | |
self.security_score = str(aggregated_metrics["security"]) | |
# Add to history | |
self.history.append({ | |
"timestamp": datetime.now(), | |
"metrics": aggregated_metrics | |
}) | |
return aggregated_metrics | |
except Exception as e: | |
logging.error(f"Error in code analysis: {str(e)}") | |
return { | |
"error": str(e), | |
"code_quality": 0.0, | |
"test_coverage": 0.0, | |
"security": "error", | |
"performance": 0.0 | |
} | |
def _get_project_files(self, project_name: str) -> List[str]: | |
"""Get all Python files in the project""" | |
project_dir = os.path.join(os.getcwd(), project_name) | |
python_files = [] | |
for root, _, files in os.walk(project_dir): | |
for file in files: | |
if file.endswith('.py'): | |
python_files.append(os.path.join(root, file)) | |
return python_files | |
def get_improvement_suggestions(self) -> List[str]: | |
"""Generate improvement suggestions based on metrics""" | |
suggestions = [] | |
latest_metrics = self.history[-1]["metrics"] if self.history else None | |
if not latest_metrics: | |
return ["No metrics available for analysis"] | |
if latest_metrics["code_quality"] < self.thresholds["code_quality"]: | |
suggestions.append( | |
f"Code quality score ({latest_metrics['code_quality']:.2f}) is below threshold " | |
f"({self.thresholds['code_quality']}). Consider refactoring complex methods." | |
) | |
if latest_metrics["test_coverage"] < self.thresholds["test_coverage"]: | |
suggestions.append( | |
f"Test coverage ({latest_metrics['test_coverage']:.2f}) is below threshold " | |
f"({self.thresholds['test_coverage']}). Add more unit tests." | |
) | |
if float(latest_metrics["security"]) < self.thresholds["security"]: | |
suggestions.append( | |
f"Security score ({latest_metrics['security']}) is below threshold " | |
f"({self.thresholds['security']}). Address security vulnerabilities." | |
) | |
return suggestions | |
class ErrorTracker: | |
"""Enhanced error tracking and analysis""" | |
def __init__(self): | |
self.errors: List[Dict[str, Any]] = [] | |
self.error_patterns: Dict[str, int] = {} | |
self.critical_errors: List[Dict[str, Any]] = [] | |
def add_error(self, error_type: str, message: str, severity: str = "normal"): | |
"""Add an error with enhanced tracking""" | |
error_entry = { | |
"type": error_type, | |
"message": message, | |
"severity": severity, | |
"timestamp": datetime.now(), | |
"stack_trace": traceback.format_exc() | |
} | |
self.errors.append(error_entry) | |
# Track error patterns | |
if error_type in self.error_patterns: | |
self.error_patterns[error_type] += 1 | |
else: | |
self.error_patterns[error_type] = 1 | |
# Track critical errors | |
if severity == "critical": | |
self.critical_errors.append(error_entry) | |
self._notify_critical_error(error_entry) | |
def _notify_critical_error(self, error: Dict[str, Any]): | |
"""Handle critical error notification""" | |
logging.critical(f"Critical error detected: {error['message']}") | |
# Implement notification system here (e.g., email, Slack) | |
def get_error_analysis(self) -> Dict[str, Any]: | |
"""Generate comprehensive error analysis""" | |
return { | |
"total_errors": len(self.errors), | |
"error_patterns": self.error_patterns, | |
"critical_errors": len(self.critical_errors), | |
"most_common_error": max(self.error_patterns.items(), key=lambda x: x[1]) if self.error_patterns else None, | |
"error_trend": self._analyze_error_trend() | |
} | |
def _analyze_error_trend(self) -> Dict[str, Any]: | |
"""Analyze error trends over time""" | |
if not self.errors: | |
return {"trend": "no errors"} | |
# Group errors by hour | |
error_timeline = {} | |
for error in self.errors: | |
hour = error["timestamp"].replace(minute=0, second=0, microsecond=0) | |
if hour in error_timeline: | |
error_timeline[hour] += 1 | |
else: | |
error_timeline[hour] = 1 | |
# Calculate trend | |
timeline_values = list(error_timeline.values()) | |
if len(timeline_values) < 2: | |
return {"trend": "insufficient data"} | |
trend = "increasing" if timeline_values[-1] > timeline_values[0] else "decreasing" | |
return { | |
"trend": trend, | |
"current_rate": timeline_values[-1], | |
"initial_rate": timeline_values[0] | |
} | |
class ProjectAnalytics: | |
"""Enhanced project analytics and reporting""" | |
"""Enhanced project analytics and reporting""" | |
def __init__(self, workspace_manager): | |
self.workspace_manager = workspace_manager | |
self.metrics_analyzer = CodeMetricsAnalyzer() | |
self.analysis_history = [] | |
def generate_project_report(self, project_name: str) -> Dict[str, Any]: | |
"""Generate comprehensive project report""" | |
try: | |
current_analysis = { | |
"timestamp": datetime.now(), | |
"basic_metrics": self._get_basic_metrics(project_name), | |
"code_quality": self._get_code_quality_metrics(project_name), | |
"performance": self._get_performance_metrics(project_name), | |
"security": self._get_security_metrics(project_name), | |
"dependencies": self._analyze_dependencies(project_name) | |
} | |
self.analysis_history.append(current_analysis) | |
return { | |
"current_analysis": current_analysis, | |
"historical_trends": self._analyze_trends(), | |
"recommendations": self._generate_recommendations(current_analysis) | |
} | |
except Exception as e: | |
logging.error(f"Error generating project report: {str(e)}") | |
return {"error": str(e)} | |
class StreamlitInterface: | |
"""Streamlit UI integration for the Autonomous Agent system.""" | |
def __init__(self, app: AutonomousAgentApp): | |
self.app = app | |
def render_main_interface(self): | |
"""Render the main Streamlit interface.""" | |
st.title("Autonomous Agent System") | |
# Create tabs for different functionalities | |
tab_names = ["Autonomous Agent", "Workspace Management", "Settings"] | |
selected_tab = st.selectbox("Select a Tab", tab_names) | |
if selected_tab == "Autonomous Agent": | |
self.render_autonomous_agent_tab() | |
elif selected_tab == "Workspace Management": | |
self.render_workspace_management_tab() | |
elif selected_tab == "Settings": | |
self.render_settings_tab() | |
def render_autonomous_agent_tab(self): | |
"""Render the Autonomous Agent tab.""" | |
st.header("Autonomous Agent") | |
task = st.text_area("Enter a task for the autonomous agent:") | |
if st.button("Run Autonomous Agent"): | |
if task: | |
# Run the autonomous agent with the provided task | |
try: | |
result = asyncio.run(self.app.refinement_loop.run_refinement_cycle(task)) | |
st.success(f"Result: {result}") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
def render_workspace_management_tab(self): | |
"""Render the Workspace Management tab with a workspace explorer.""" | |
st.header("Workspace Management") | |
# Workspace Explorer | |
st.subheader("Workspace Explorer") | |
workspace_tree = self.app.workspace_manager.get_workspace_tree() | |
self._render_tree(workspace_tree) | |
# File creation | |
st.subheader("Create a File") | |
new_filename = st.text_input("Enter filename:") | |
new_file_content = st.text_area("Enter file content:") | |
if st.button("Create File"): | |
if new_filename and new_file_content: | |
result = self.app.workspace_manager.create_file(new_filename, new_file_content) | |
st.success(result) | |
else: | |
st.error("Filename and content are required.") | |
# File deletion | |
st.subheader("Delete a File") | |
delete_filename = st.text_input("Enter filename to delete:") | |
if st.button("Delete File"): | |
if delete_filename: | |
result = self.app.workspace_manager.delete_file(delete_filename) | |
st.success(result) | |
else: | |
st.error("Filename is required.") | |
def _render_tree(self, tree: Dict[str, Any], level: int = 0): | |
"""Recursively render the workspace directory tree.""" | |
if tree["type"] == "file": | |
st.write(" " * level + f"📄 {tree['name']}") | |
elif tree["type"] == "directory": | |
st.write(" " * level + f"📁 {tree['name']}") | |
for child in tree["children"]: | |
self._render_tree(child, level + 1) | |
def render_settings_tab(self): | |
"""Render the Settings tab.""" | |
st.header("Application Settings") | |
# Section 1: Refinement Process Configuration | |
st.subheader("Refinement Process Settings") | |
# Adjust maximum refinement iterations | |
current_max_iter = self.app.refinement_loop.max_iterations | |
new_max_iter = st.number_input( | |
"Maximum Refinement Iterations", | |
min_value=1, | |
max_value=20, | |
value=current_max_iter, | |
help="Maximum number of refinement cycles to perform" | |
) | |
if new_max_iter != current_max_iter: | |
self.app.refinement_loop.max_iterations = new_max_iter | |
st.success(f"Updated maximum iterations to {new_max_iter}") | |
# Section 2: Quality Threshold Configuration | |
st.subheader("Quality Thresholds") | |
# Get current thresholds | |
thresholds = self.app.refinement_loop.quality_metrics.thresholds | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
new_code_quality = st.slider( | |
"Code Quality Threshold", | |
0.0, 1.0, thresholds["code_quality"], | |
help="Minimum acceptable code quality score" | |
) | |
with col2: | |
new_test_coverage = st.slider( | |
"Test Coverage Threshold", | |
0.0, 1.0, thresholds["test_coverage"], | |
help="Minimum required test coverage" | |
) | |
with col3: | |
new_security = st.slider( | |
"Security Threshold", | |
0.0, 1.0, thresholds["security"], | |
help="Minimum acceptable security score" | |
) | |
if st.button("Update Quality Thresholds"): | |
self.app.refinement_loop.quality_metrics.thresholds.update({ | |
"code_quality": new_code_quality, | |
"test_coverage": new_test_coverage, | |
"security": new_security | |
}) | |
st.success("Quality thresholds updated!") | |
# Section 3: Performance Configuration | |
st.subheader("Performance Settings") | |
# Concurrency settings | |
concurrency_level = st.selectbox( | |
"Max Concurrency", | |
options=[1, 2, 4, 8], | |
index=2, | |
help="Maximum parallel tasks for code analysis" | |
) | |
# Resource limits | |
mem_limit = st.slider( | |
"Memory Limit (GB)", | |
1, 16, 4, | |
help="Maximum memory allocation for pipeline operations" | |
) | |
# Section 4: Security Settings | |
st.subheader("Security Configuration") | |
# Security rules toggle | |
enable_security_scan = st.checkbox( | |
"Enable Real-time Security Scanning", | |
value=True, | |
help="Perform continuous security analysis during development" | |
) | |
# Severity level filtering | |
security_level = st.selectbox( | |
"Minimum Security Severity Level", | |
["Low", "Medium", "High", "Critical"], | |
index=1, | |
help="Minimum severity level to trigger security alerts" | |
) | |
# Section 5: Workspace Configuration | |
st.subheader("Workspace Settings") | |
current_workspace = self.app.workspace_manager.workspace_dir | |
st.write(f"Current Workspace: `{current_workspace}`") | |
# Workspace actions | |
if st.button("Clear Workspace Cache"): | |
self.app.workspace_manager.clean_cache() | |
st.success("Workspace cache cleared!") | |
# Section 6: Diagnostic Settings | |
st.subheader("Diagnostics") | |
# Logging controls | |
log_level = st.selectbox( | |
"Logging Level", | |
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], | |
index=1 | |
) | |
st.session_state.log_level = log_level # Store in session state | |
logging.getLogger().setLevel(log_level) | |
# Debug mode toggle | |
debug_mode = st.checkbox("Enable Debug Mode") | |
st.session_state.debug_mode = debug_mode # Store in session state | |
if debug_mode: | |
self.app.refinement_loop.logger.setLevel(logging.DEBUG) | |
else: | |
self.app.refinement_loop.logger.setLevel(logging.INFO) | |
# Section 7: System Information | |
st.subheader("System Info") | |
st.write(f"Python Version: {sys.version}") | |
st.write(f"Platform: {platform.platform()}") | |
st.write(f"Available Memory: {psutil.virtual_memory().available / (1024**3):.1f} GB free") | |
# Main entry point defined | |
def main(): | |
app = AutonomousAgentApp() | |
app.run() | |
if __name__ == "__main__": | |
main() | |