Spaces:
Running
Running
import gradio as gr | |
from lavague.ActionEngine import ActionEngine | |
from lavague.defaults import DefaultLocalLLM, DefaultLLM | |
from llama_index.llms.huggingface import HuggingFaceInferenceAPI | |
import base64 | |
import requests | |
import uuid | |
import re | |
MAX_CHARS = 1500 | |
USER_ID = str(uuid.uuid4()) | |
SERVER_URL = "https://lavague.mithrilsecurity.io" | |
# Use this action_engine instead to have a local inference | |
# action_engine = ActionEngine(llm=DefaultLocalLLM()) | |
action_engine = ActionEngine() | |
title = """ | |
<div align="center"> | |
<h1>🌊 Welcome to LaVague</h1> | |
<p>Redefining internet surfing by transforming natural language instructions into seamless browser interactions.</p> | |
</div> | |
""" | |
# action_engine = ActionEngine(llm, embedder) | |
def exec_code_req(url, code): | |
headers = { | |
"X-User-ID": USER_ID # Include the X-User-ID header for authentication | |
} | |
try: | |
response = requests.post(SERVER_URL + "/execute_req", json={"url": url, "requests": code}, headers=headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return {"error": f"Failed with status code {response.status_code}"} | |
except requests.RequestException as e: | |
return {"error": str(e)} | |
def get_html(url): | |
headers = { | |
"X-User-ID": USER_ID # Include the X-User-ID header for authentication | |
} | |
try: | |
response = requests.post(SERVER_URL + "/get_html", json={"url": url}, headers=headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return {"error": f"Failed with status code {response.status_code}"} | |
except requests.RequestException as e: | |
return {"error": str(e)} | |
def send_request(url): | |
headers = { | |
"X-User-ID": USER_ID # Include the X-User-ID header for authentication | |
} | |
try: | |
response = requests.get(SERVER_URL + "/screenshot", params={"url": url}, headers=headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return {"error": f"Failed with status code {response.status_code}"} | |
except requests.RequestException as e: | |
return {"error": str(e)} | |
def process_url(url): | |
r = send_request(url) | |
f = open("screenshot.png", "wb") | |
scr = base64.b64decode(r["result"]) | |
f.write(scr) | |
return "screenshot.png" | |
def process_instruction(query, url_input): | |
r = get_html(url_input) | |
state = r["html"] | |
query_engine = action_engine.get_query_engine(state) | |
response = query_engine.query(query) | |
source_nodes = response.get_formatted_sources(MAX_CHARS) | |
return response.response, source_nodes | |
def extract_first_python_code(markdown_text): | |
# Pattern to match the first ```python ``` code block | |
pattern = r"```python(.*?)```" | |
# Using re.DOTALL to make '.' match also newlines | |
match = re.search(pattern, markdown_text, re.DOTALL) | |
if match: | |
# Return the first matched group, which is the code inside the ```python ``` | |
return match.group(1).strip() | |
else: | |
# Return None if no match is found | |
return None | |
def exec_code(code, source_nodes, full_code, url): | |
code = extract_first_python_code(code) | |
html = "" | |
try: | |
r = exec_code_req(url, code) | |
url = r["url"] | |
html = r["html"] | |
output = "Successful code execution" | |
status = """<p style="color: green; font-size: 20px; font-weight: bold;">Success!</p>""" | |
full_code += code | |
except Exception as e: | |
output = f"Error in code execution: {str(e)}" | |
status = """<p style="color: red; font-size: 20px; font-weight: bold;">Failure! Open the Debug tab for more information</p>""" | |
return output, code, html, status, full_code, url | |
def update_image_display(img, url): | |
r = send_request(url) | |
f = open("screenshot.png", "wb") | |
scr = base64.b64decode(r["result"]) | |
f.write(scr) | |
return "screenshot.png", url | |
def show_processing_message(): | |
return "Processing..." | |
def create_demo(base_url, instructions): | |
with gr.Blocks() as demo: | |
with gr.Tab("LaVague"): | |
with gr.Row(): | |
gr.HTML(title) | |
with gr.Row(): | |
url_input = gr.Textbox(value=base_url, label="Enter URL and press 'Enter' to load the page.") | |
with gr.Row(): | |
with gr.Column(scale=7): | |
image_display = gr.Image(label="Browser", interactive=False) | |
with gr.Column(scale=3): | |
with gr.Accordion(label="Full code", open=False): | |
full_code = gr.Code(value="", language="python", interactive=False) | |
code_display = gr.Code(label="Generated code", language="python", | |
lines=5, interactive=True) | |
status_html = gr.HTML() | |
with gr.Row(): | |
with gr.Column(scale=8): | |
text_area = gr.Textbox(label="Enter instructions and press 'Enter' to generate code.") | |
gr.Examples(examples=instructions, inputs=text_area) | |
with gr.Tab("Debug"): | |
with gr.Row(): | |
with gr.Column(): | |
log_display = gr.Textbox(interactive=False, lines=20) | |
with gr.Column(): | |
source_display = gr.Code(language="html", label="Retrieved nodes", interactive=False, lines=20) | |
with gr.Row(): | |
with gr.Accordion(label="Full HTML", open=False): | |
full_html = gr.Code(language="html", label="Full HTML", interactive=False, lines=20) | |
# Linking components | |
url_input.submit(process_url, inputs=url_input, outputs=image_display) | |
text_area.submit(show_processing_message, outputs=[status_html]).then( | |
process_instruction, inputs=[text_area, url_input], outputs=[code_display, source_display] | |
).then( | |
exec_code, inputs=[code_display, source_display, full_code, url_input], | |
outputs=[log_display, code_display, full_html, status_html, full_code, url_input] | |
).then( | |
update_image_display, inputs=[image_display, url_input], outputs=[image_display, url_input] | |
) | |
demo.launch(share=True, debug=True) | |
base_url = "https://huggingface.co/" | |
instructions = ["Click on the Datasets item on the menu, between Models and Spaces", | |
"Click on the search bar 'Filter by name', type 'The Stack', and press 'Enter'", | |
"Scroll by 500 pixels",] | |
create_demo(base_url, instructions) |