|
from urllib.parse import urljoin, urlparse |
|
|
|
import requests |
|
from bs4 import BeautifulSoup |
|
|
|
from autogpt.config import Config |
|
from autogpt.llm_utils import create_chat_completion |
|
from autogpt.memory import get_memory |
|
|
|
cfg = Config() |
|
memory = get_memory(cfg) |
|
|
|
session = requests.Session() |
|
session.headers.update({"User-Agent": cfg.user_agent}) |
|
|
|
|
|
|
|
def is_valid_url(url): |
|
try: |
|
result = urlparse(url) |
|
return all([result.scheme, result.netloc]) |
|
except ValueError: |
|
return False |
|
|
|
|
|
|
|
def sanitize_url(url): |
|
return urljoin(url, urlparse(url).path) |
|
|
|
|
|
|
|
def check_local_file_access(url): |
|
local_prefixes = [ |
|
"file:///", |
|
"file://localhost", |
|
"http://localhost", |
|
"https://localhost", |
|
] |
|
return any(url.startswith(prefix) for prefix in local_prefixes) |
|
|
|
|
|
def get_response(url, timeout=10): |
|
try: |
|
|
|
if check_local_file_access(url): |
|
raise ValueError("Access to local files is restricted") |
|
|
|
|
|
if not url.startswith("http://") and not url.startswith("https://"): |
|
raise ValueError("Invalid URL format") |
|
|
|
sanitized_url = sanitize_url(url) |
|
|
|
response = session.get(sanitized_url, timeout=timeout) |
|
|
|
|
|
if response.status_code >= 400: |
|
return None, "Error: HTTP " + str(response.status_code) + " error" |
|
|
|
return response, None |
|
except ValueError as ve: |
|
|
|
return None, "Error: " + str(ve) |
|
|
|
except requests.exceptions.RequestException as re: |
|
|
|
|
|
return None, "Error: " + str(re) |
|
|
|
|
|
def scrape_text(url): |
|
"""Scrape text from a webpage""" |
|
response, error_message = get_response(url) |
|
if error_message: |
|
return error_message |
|
if not response: |
|
return "Error: Could not get response" |
|
|
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
for script in soup(["script", "style"]): |
|
script.extract() |
|
|
|
text = soup.get_text() |
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = "\n".join(chunk for chunk in chunks if chunk) |
|
|
|
return text |
|
|
|
|
|
def extract_hyperlinks(soup): |
|
"""Extract hyperlinks from a BeautifulSoup object""" |
|
hyperlinks = [] |
|
for link in soup.find_all("a", href=True): |
|
hyperlinks.append((link.text, link["href"])) |
|
return hyperlinks |
|
|
|
|
|
def format_hyperlinks(hyperlinks): |
|
"""Format hyperlinks into a list of strings""" |
|
formatted_links = [] |
|
for link_text, link_url in hyperlinks: |
|
formatted_links.append(f"{link_text} ({link_url})") |
|
return formatted_links |
|
|
|
|
|
def scrape_links(url): |
|
"""Scrape links from a webpage""" |
|
response, error_message = get_response(url) |
|
if error_message: |
|
return error_message |
|
if not response: |
|
return "Error: Could not get response" |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
for script in soup(["script", "style"]): |
|
script.extract() |
|
|
|
hyperlinks = extract_hyperlinks(soup) |
|
|
|
return format_hyperlinks(hyperlinks) |
|
|
|
|
|
def split_text(text, max_length=cfg.browse_chunk_max_length): |
|
"""Split text into chunks of a maximum length""" |
|
paragraphs = text.split("\n") |
|
current_length = 0 |
|
current_chunk = [] |
|
|
|
for paragraph in paragraphs: |
|
if current_length + len(paragraph) + 1 <= max_length: |
|
current_chunk.append(paragraph) |
|
current_length += len(paragraph) + 1 |
|
else: |
|
yield "\n".join(current_chunk) |
|
current_chunk = [paragraph] |
|
current_length = len(paragraph) + 1 |
|
|
|
if current_chunk: |
|
yield "\n".join(current_chunk) |
|
|
|
|
|
def create_message(chunk, question): |
|
"""Create a message for the user to summarize a chunk of text""" |
|
return { |
|
"role": "user", |
|
"content": f'"""{chunk}""" Using the above text, please answer the following' |
|
f' question: "{question}" -- if the question cannot be answered using the' |
|
" text, please summarize the text.", |
|
} |
|
|
|
|
|
def summarize_text(url, text, question): |
|
"""Summarize text using the LLM model""" |
|
if not text: |
|
return "Error: No text to summarize" |
|
|
|
text_length = len(text) |
|
print(f"Text length: {text_length} characters") |
|
|
|
summaries = [] |
|
chunks = list(split_text(text)) |
|
|
|
for i, chunk in enumerate(chunks): |
|
print(f"Adding chunk {i + 1} / {len(chunks)} to memory") |
|
|
|
memory_to_add = f"Source: {url}\n" f"Raw content part#{i + 1}: {chunk}" |
|
|
|
memory.add(memory_to_add) |
|
|
|
print(f"Summarizing chunk {i + 1} / {len(chunks)}") |
|
messages = [create_message(chunk, question)] |
|
|
|
summary = create_chat_completion( |
|
model=cfg.fast_llm_model, |
|
messages=messages, |
|
max_tokens=cfg.browse_summary_max_token, |
|
) |
|
summaries.append(summary) |
|
print(f"Added chunk {i + 1} summary to memory") |
|
|
|
memory_to_add = f"Source: {url}\n" f"Content summary part#{i + 1}: {summary}" |
|
|
|
memory.add(memory_to_add) |
|
|
|
print(f"Summarized {len(chunks)} chunks.") |
|
|
|
combined_summary = "\n".join(summaries) |
|
messages = [create_message(combined_summary, question)] |
|
|
|
final_summary = create_chat_completion( |
|
model=cfg.fast_llm_model, |
|
messages=messages, |
|
max_tokens=cfg.browse_summary_max_token, |
|
) |
|
|
|
return final_summary |
|
|