xd11yggy's picture
fix queue
32d380b verified
import concurrent.futures
import gradio as gr
import subprocess
import tempfile
import shutil
import traceback
import importlib.util
import time
class CodeExecutor:
def __init__(self):
self.temp_dir = tempfile.TemporaryDirectory()
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def _install_packages(self, packages):
for package in packages:
try:
spec = importlib.util.find_spec(package)
if spec is None:
subprocess.check_call(["pip", "install", package], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=10)
except subprocess.CalledProcessError as e:
raise Exception(f"Error installing package {package}: {e}")
except subprocess.TimeoutExpired:
raise Exception(f"Timed out installing package {package}")
def _execute_code(self, code, inputs):
temp_file = f"{self.temp_dir.name}/temp.py"
with open(temp_file, "w") as f:
f.write(code)
output_file = f"{self.temp_dir.name}/output.txt"
error_file = f"{self.temp_dir.name}/error.txt"
with open(output_file, "w") as output, open(error_file, "w") as error:
try:
process = subprocess.Popen(["python", temp_file], stdin=subprocess.PIPE, stdout=output, stderr=error)
if inputs:
for input_value in inputs:
process.stdin.write(input_value.encode())
process.stdin.write(b"\n")
process.stdin.close()
process.wait(timeout=10)
except subprocess.TimeoutExpired:
raise Exception("Timed out executing code")
except Exception as e:
error.write(traceback.format_exc())
with open(output_file, "r") as output, open(error_file, "r") as error:
output_text = output.read()
error_text = error.read()
if error_text:
return f"User Code:\n{code}\n\nError:\n{error_text}"
return f"User Code:\n{code}\n\nInputs:\n{', '.join(inputs)}\n\nOutput:\n{output_text}"
def execute(self, code, inputs, packages):
future = self.executor.submit(self._execute_code, code, inputs)
try:
output = future.result(timeout=10)
except concurrent.futures.TimeoutError:
raise Exception("Timed out waiting for code execution")
except Exception as e:
raise Exception(f"Error: {str(e)}")
finally:
if packages:
self._install_packages(packages.split(","))
return output
def __del__(self):
self.executor.shutdown(wait=False)
shutil.rmtree(self.temp_dir.name)
def wrapper_execute(code, inputs, packages):
executor = CodeExecutor()
return executor.execute(code, inputs.split(","), packages)
def create_interface():
with gr.Blocks() as demo:
gr.Markdown("# Code Executor")
gr.Markdown("Execute Python code with inputs and packages")
gr.Markdown("### Code")
code_input = gr.Textbox(label="Code", lines=20)
gr.Markdown("### Inputs")
inputs_input = gr.Textbox(label="Inputs (comma-separated)", lines=1)
gr.Markdown("### Packages")
packages_input = gr.Textbox(label="Packages (comma-separated)", lines=1)
gr.Markdown("### Output")
output_text = gr.Text(label="Output", lines=20)
run_button = gr.Button("Run")
run_button.click(
wrapper_execute,
inputs=[code_input, inputs_input, packages_input],
outputs=output_text
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch()