File size: 10,288 Bytes
375afca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
""" Command and Control """
import json
from typing import List, NoReturn, Union
from autogpt.agent.agent_manager import AgentManager
from autogpt.commands.evaluate_code import evaluate_code
from autogpt.commands.google_search import google_official_search, google_search
from autogpt.commands.improve_code import improve_code
from autogpt.commands.write_tests import write_tests
from autogpt.config import Config
from autogpt.commands.image_gen import generate_image
from autogpt.commands.web_requests import scrape_links, scrape_text
from autogpt.commands.execute_code import execute_python_file, execute_shell
from autogpt.commands.file_operations import (
append_to_file,
delete_file,
read_file,
search_files,
write_to_file,
)
from autogpt.json_fixes.parsing import fix_and_parse_json
from autogpt.memory import get_memory
from autogpt.processing.text import summarize_text
from autogpt.speech import say_text
from autogpt.commands.web_selenium import browse_website
from autogpt.commands.git_operations import clone_repository
CFG = Config()
AGENT_MANAGER = AgentManager()
def is_valid_int(value: str) -> bool:
"""Check if the value is a valid integer
Args:
value (str): The value to check
Returns:
bool: True if the value is a valid integer, False otherwise
"""
try:
int(value)
return True
except ValueError:
return False
def get_command(response: str):
"""Parse the response and return the command name and arguments
Args:
response (str): The response from the user
Returns:
tuple: The command name and arguments
Raises:
json.decoder.JSONDecodeError: If the response is not valid JSON
Exception: If any other error occurs
"""
try:
response_json = fix_and_parse_json(response)
if "command" not in response_json:
return "Error:", "Missing 'command' object in JSON"
if not isinstance(response_json, dict):
return "Error:", f"'response_json' object is not dictionary {response_json}"
command = response_json["command"]
if not isinstance(command, dict):
return "Error:", "'command' object is not a dictionary"
if "name" not in command:
return "Error:", "Missing 'name' field in 'command' object"
command_name = command["name"]
# Use an empty dictionary if 'args' field is not present in 'command' object
arguments = command.get("args", {})
return command_name, arguments
except json.decoder.JSONDecodeError:
return "Error:", "Invalid JSON"
# All other errors, return "Error: + error message"
except Exception as e:
return "Error:", str(e)
def map_command_synonyms(command_name: str):
"""Takes the original command name given by the AI, and checks if the
string matches a list of common/known hallucinations
"""
synonyms = [
("write_file", "write_to_file"),
("create_file", "write_to_file"),
("search", "google"),
]
for seen_command, actual_command_name in synonyms:
if command_name == seen_command:
return actual_command_name
return command_name
def execute_command(command_name: str, arguments):
"""Execute the command and return the result
Args:
command_name (str): The name of the command to execute
arguments (dict): The arguments for the command
Returns:
str: The result of the command"""
memory = get_memory(CFG)
try:
command_name = map_command_synonyms(command_name)
if command_name == "google":
# Check if the Google API key is set and use the official search method
# If the API key is not set or has only whitespaces, use the unofficial
# search method
key = CFG.google_api_key
if key and key.strip() and key != "your-google-api-key":
google_result = google_official_search(arguments["input"])
else:
google_result = google_search(arguments["input"])
safe_message = google_result.encode("utf-8", "ignore")
return str(safe_message)
elif command_name == "memory_add":
return memory.add(arguments["string"])
elif command_name == "start_agent":
return start_agent(
arguments["name"], arguments["task"], arguments["prompt"]
)
elif command_name == "message_agent":
return message_agent(arguments["key"], arguments["message"])
elif command_name == "list_agents":
return list_agents()
elif command_name == "delete_agent":
return delete_agent(arguments["key"])
elif command_name == "get_text_summary":
return get_text_summary(arguments["url"], arguments["question"])
elif command_name == "get_hyperlinks":
return get_hyperlinks(arguments["url"])
elif command_name == "clone_repository":
return clone_repository(
arguments["repository_url"], arguments["clone_path"]
)
elif command_name == "read_file":
return read_file(arguments["file"])
elif command_name == "write_to_file":
return write_to_file(arguments["file"], arguments["text"])
elif command_name == "append_to_file":
return append_to_file(arguments["file"], arguments["text"])
elif command_name == "delete_file":
return delete_file(arguments["file"])
elif command_name == "search_files":
return search_files(arguments["directory"])
elif command_name == "browse_website":
return browse_website(arguments["url"], arguments["question"])
# TODO: Change these to take in a file rather than pasted code, if
# non-file is given, return instructions "Input should be a python
# filepath, write your code to file and try again"
elif command_name == "evaluate_code":
return evaluate_code(arguments["code"])
elif command_name == "improve_code":
return improve_code(arguments["suggestions"], arguments["code"])
elif command_name == "write_tests":
return write_tests(arguments["code"], arguments.get("focus"))
elif command_name == "execute_python_file": # Add this command
return execute_python_file(arguments["file"])
elif command_name == "execute_shell":
if CFG.execute_local_commands:
return execute_shell(arguments["command_line"])
else:
return (
"You are not allowed to run local shell commands. To execute"
" shell commands, EXECUTE_LOCAL_COMMANDS must be set to 'True' "
"in your config. Do not attempt to bypass the restriction."
)
elif command_name == "generate_image":
return generate_image(arguments["prompt"])
elif command_name == "do_nothing":
return "No action performed."
elif command_name == "task_complete":
shutdown()
else:
return (
f"Unknown command '{command_name}'. Please refer to the 'COMMANDS'"
" list for available commands and only respond in the specified JSON"
" format."
)
except Exception as e:
return f"Error: {str(e)}"
def get_text_summary(url: str, question: str) -> str:
"""Return the results of a google search
Args:
url (str): The url to scrape
question (str): The question to summarize the text for
Returns:
str: The summary of the text
"""
text = scrape_text(url)
summary = summarize_text(url, text, question)
return f""" "Result" : {summary}"""
def get_hyperlinks(url: str) -> Union[str, List[str]]:
"""Return the results of a google search
Args:
url (str): The url to scrape
Returns:
str or list: The hyperlinks on the page
"""
return scrape_links(url)
def shutdown() -> NoReturn:
"""Shut down the program"""
print("Shutting down...")
quit()
def start_agent(name: str, task: str, prompt: str, model=CFG.fast_llm_model) -> str:
"""Start an agent with a given name, task, and prompt
Args:
name (str): The name of the agent
task (str): The task of the agent
prompt (str): The prompt for the agent
model (str): The model to use for the agent
Returns:
str: The response of the agent
"""
# Remove underscores from name
voice_name = name.replace("_", " ")
first_message = f"""You are {name}. Respond with: "Acknowledged"."""
agent_intro = f"{voice_name} here, Reporting for duty!"
# Create agent
if CFG.speak_mode:
say_text(agent_intro, 1)
key, ack = AGENT_MANAGER.create_agent(task, first_message, model)
if CFG.speak_mode:
say_text(f"Hello {voice_name}. Your task is as follows. {task}.")
# Assign task (prompt), get response
agent_response = AGENT_MANAGER.message_agent(key, prompt)
return f"Agent {name} created with key {key}. First response: {agent_response}"
def message_agent(key: str, message: str) -> str:
"""Message an agent with a given key and message"""
# Check if the key is a valid integer
if is_valid_int(key):
agent_response = AGENT_MANAGER.message_agent(int(key), message)
else:
return "Invalid key, must be an integer."
# Speak response
if CFG.speak_mode:
say_text(agent_response, 1)
return agent_response
def list_agents():
"""List all agents
Returns:
str: A list of all agents
"""
return "List of agents:\n" + "\n".join(
[str(x[0]) + ": " + x[1] for x in AGENT_MANAGER.list_agents()]
)
def delete_agent(key: str) -> str:
"""Delete an agent with a given key
Args:
key (str): The key of the agent to delete
Returns:
str: A message indicating whether the agent was deleted or not
"""
result = AGENT_MANAGER.delete_agent(key)
return f"Agent {key} deleted." if result else f"Agent {key} does not exist."
|