bluesix / autogpt /browse.py
soulwax
Push stable to huggingface
923696f
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
from autogpt.config import Config
from autogpt.llm_utils import create_chat_completion
from autogpt.memory import get_memory
cfg = Config()
memory = get_memory(cfg)
session = requests.Session()
session.headers.update({"User-Agent": cfg.user_agent})
# Function to check if the URL is valid
def is_valid_url(url):
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
return False
# Function to sanitize the URL
def sanitize_url(url):
return urljoin(url, urlparse(url).path)
# Define and check for local file address prefixes
def check_local_file_access(url):
local_prefixes = [
"file:///",
"file://localhost",
"http://localhost",
"https://localhost",
]
return any(url.startswith(prefix) for prefix in local_prefixes)
def get_response(url, timeout=10):
try:
# Restrict access to local files
if check_local_file_access(url):
raise ValueError("Access to local files is restricted")
# Most basic check if the URL is valid:
if not url.startswith("http://") and not url.startswith("https://"):
raise ValueError("Invalid URL format")
sanitized_url = sanitize_url(url)
response = session.get(sanitized_url, timeout=timeout)
# Check if the response contains an HTTP error
if response.status_code >= 400:
return None, "Error: HTTP " + str(response.status_code) + " error"
return response, None
except ValueError as ve:
# Handle invalid URL format
return None, "Error: " + str(ve)
except requests.exceptions.RequestException as re:
# Handle exceptions related to the HTTP request
# (e.g., connection errors, timeouts, etc.)
return None, "Error: " + str(re)
def scrape_text(url):
"""Scrape text from a webpage"""
response, error_message = get_response(url)
if error_message:
return error_message
if not response:
return "Error: Could not get response"
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = "\n".join(chunk for chunk in chunks if chunk)
return text
def extract_hyperlinks(soup):
"""Extract hyperlinks from a BeautifulSoup object"""
hyperlinks = []
for link in soup.find_all("a", href=True):
hyperlinks.append((link.text, link["href"]))
return hyperlinks
def format_hyperlinks(hyperlinks):
"""Format hyperlinks into a list of strings"""
formatted_links = []
for link_text, link_url in hyperlinks:
formatted_links.append(f"{link_text} ({link_url})")
return formatted_links
def scrape_links(url):
"""Scrape links from a webpage"""
response, error_message = get_response(url)
if error_message:
return error_message
if not response:
return "Error: Could not get response"
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style"]):
script.extract()
hyperlinks = extract_hyperlinks(soup)
return format_hyperlinks(hyperlinks)
def split_text(text, max_length=cfg.browse_chunk_max_length):
"""Split text into chunks of a maximum length"""
paragraphs = text.split("\n")
current_length = 0
current_chunk = []
for paragraph in paragraphs:
if current_length + len(paragraph) + 1 <= max_length:
current_chunk.append(paragraph)
current_length += len(paragraph) + 1
else:
yield "\n".join(current_chunk)
current_chunk = [paragraph]
current_length = len(paragraph) + 1
if current_chunk:
yield "\n".join(current_chunk)
def create_message(chunk, question):
"""Create a message for the user to summarize a chunk of text"""
return {
"role": "user",
"content": f'"""{chunk}""" Using the above text, please answer the following'
f' question: "{question}" -- if the question cannot be answered using the'
" text, please summarize the text.",
}
def summarize_text(url, text, question):
"""Summarize text using the LLM model"""
if not text:
return "Error: No text to summarize"
text_length = len(text)
print(f"Text length: {text_length} characters")
summaries = []
chunks = list(split_text(text))
for i, chunk in enumerate(chunks):
print(f"Adding chunk {i + 1} / {len(chunks)} to memory")
memory_to_add = f"Source: {url}\n" f"Raw content part#{i + 1}: {chunk}"
memory.add(memory_to_add)
print(f"Summarizing chunk {i + 1} / {len(chunks)}")
messages = [create_message(chunk, question)]
summary = create_chat_completion(
model=cfg.fast_llm_model,
messages=messages,
max_tokens=cfg.browse_summary_max_token,
)
summaries.append(summary)
print(f"Added chunk {i + 1} summary to memory")
memory_to_add = f"Source: {url}\n" f"Content summary part#{i + 1}: {summary}"
memory.add(memory_to_add)
print(f"Summarized {len(chunks)} chunks.")
combined_summary = "\n".join(summaries)
messages = [create_message(combined_summary, question)]
final_summary = create_chat_completion(
model=cfg.fast_llm_model,
messages=messages,
max_tokens=cfg.browse_summary_max_token,
)
return final_summary