|
import concurrent.futures |
|
import gradio as gr |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
import traceback |
|
import importlib.util |
|
import time |
|
|
|
class CodeExecutor: |
|
def __init__(self): |
|
self.temp_dir = tempfile.TemporaryDirectory() |
|
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) |
|
|
|
def _install_packages(self, packages): |
|
for package in packages: |
|
try: |
|
spec = importlib.util.find_spec(package) |
|
if spec is None: |
|
subprocess.check_call(["pip", "install", package], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=10) |
|
except subprocess.CalledProcessError as e: |
|
raise Exception(f"Error installing package {package}: {e}") |
|
except subprocess.TimeoutExpired: |
|
raise Exception(f"Timed out installing package {package}") |
|
|
|
def _execute_code(self, code, inputs): |
|
temp_file = f"{self.temp_dir.name}/temp.py" |
|
with open(temp_file, "w") as f: |
|
f.write(code) |
|
|
|
output_file = f"{self.temp_dir.name}/output.txt" |
|
error_file = f"{self.temp_dir.name}/error.txt" |
|
with open(output_file, "w") as output, open(error_file, "w") as error: |
|
try: |
|
process = subprocess.Popen(["python", temp_file], stdin=subprocess.PIPE, stdout=output, stderr=error) |
|
if inputs: |
|
for input_value in inputs: |
|
process.stdin.write(input_value.encode()) |
|
process.stdin.write(b"\n") |
|
process.stdin.close() |
|
process.wait(timeout=10) |
|
except subprocess.TimeoutExpired: |
|
raise Exception("Timed out executing code") |
|
except Exception as e: |
|
error.write(traceback.format_exc()) |
|
|
|
with open(output_file, "r") as output, open(error_file, "r") as error: |
|
output_text = output.read() |
|
error_text = error.read() |
|
if error_text: |
|
return f"User Code:\n{code}\n\nError:\n{error_text}" |
|
return f"User Code:\n{code}\n\nInputs:\n{', '.join(inputs)}\n\nOutput:\n{output_text}" |
|
|
|
def execute(self, code, inputs, packages): |
|
future = self.executor.submit(self._execute_code, code, inputs) |
|
try: |
|
output = future.result(timeout=10) |
|
except concurrent.futures.TimeoutError: |
|
raise Exception("Timed out waiting for code execution") |
|
except Exception as e: |
|
raise Exception(f"Error: {str(e)}") |
|
finally: |
|
if packages: |
|
self._install_packages(packages.split(",")) |
|
return output |
|
|
|
def __del__(self): |
|
self.executor.shutdown(wait=False) |
|
shutil.rmtree(self.temp_dir.name) |
|
|
|
def wrapper_execute(code, inputs, packages): |
|
executor = CodeExecutor() |
|
return executor.execute(code, inputs.split(","), packages) |
|
|
|
def create_interface(): |
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Code Executor") |
|
gr.Markdown("Execute Python code with inputs and packages") |
|
|
|
gr.Markdown("### Code") |
|
code_input = gr.Textbox(label="Code", lines=20) |
|
|
|
gr.Markdown("### Inputs") |
|
inputs_input = gr.Textbox(label="Inputs (comma-separated)", lines=1) |
|
|
|
gr.Markdown("### Packages") |
|
packages_input = gr.Textbox(label="Packages (comma-separated)", lines=1) |
|
|
|
gr.Markdown("### Output") |
|
output_text = gr.Text(label="Output", lines=20) |
|
|
|
run_button = gr.Button("Run") |
|
run_button.click( |
|
wrapper_execute, |
|
inputs=[code_input, inputs_input, packages_input], |
|
outputs=output_text |
|
) |
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch() |