|
import re |
|
import signal |
|
from contextlib import contextmanager, redirect_stdout |
|
from dataclasses import dataclass |
|
from enum import Enum |
|
from io import StringIO |
|
from typing import Optional, Type |
|
|
|
from ..schema import ActionReturn, ActionStatusCode |
|
from .base_action import AsyncActionMixin, BaseAction, tool_api |
|
from .parser import BaseParser, JsonParser |
|
|
|
|
|
class Status(str, Enum): |
|
"""Execution status.""" |
|
SUCCESS = 'success' |
|
FAILURE = 'failure' |
|
|
|
|
|
@dataclass |
|
class ExecutionResult: |
|
"""Execution result.""" |
|
status: Status |
|
value: Optional[str] = None |
|
msg: Optional[str] = None |
|
|
|
|
|
@contextmanager |
|
def _raise_timeout(timeout): |
|
|
|
def _handler(signum, frame): |
|
raise TimeoutError() |
|
|
|
signal.signal(signal.SIGALRM, _handler) |
|
signal.alarm(timeout) |
|
|
|
try: |
|
yield |
|
finally: |
|
signal.alarm(0) |
|
|
|
|
|
class IPythonInteractive(BaseAction): |
|
"""An interactive IPython shell for code execution. |
|
|
|
Args: |
|
timeout (int): Upper bound of waiting time for Python script execution. |
|
Defaults to ``20``. |
|
max_out_len (int): maximum output length. No truncation occurs if negative. |
|
Defaults to ``2048``. |
|
use_signals (bool): whether signals should be used for timing function out |
|
or the multiprocessing. Set to ``False`` when not running in the main |
|
thread, e.g. web applications. Defaults to ``True`` |
|
description (dict): The description of the action. Defaults to ``None``. |
|
parser (Type[BaseParser]): The parser class to process the |
|
action's inputs and outputs. Defaults to :class:`JsonParser`. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
timeout: int = 30, |
|
max_out_len: int = 8192, |
|
use_signals: bool = True, |
|
description: Optional[dict] = None, |
|
parser: Type[BaseParser] = JsonParser, |
|
): |
|
super().__init__(description, parser) |
|
self.timeout = timeout |
|
self._executor = self.create_shell() |
|
self._highlighting = re.compile( |
|
r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') |
|
self._max_out_len = max_out_len if max_out_len >= 0 else None |
|
self._use_signals = use_signals |
|
|
|
def reset(self): |
|
"""Clear the context.""" |
|
self._executor.reset() |
|
|
|
@tool_api |
|
def run(self, command: str, timeout: Optional[int] = None) -> ActionReturn: |
|
"""Launch an IPython Interactive Shell to execute code. |
|
|
|
Args: |
|
command (:class:`str`): Python code snippet |
|
timeout (:class:`Optional[int]`): timeout for execution. |
|
This argument only works in the main thread. Defaults to ``None``. |
|
""" |
|
from timeout_decorator import timeout as timer |
|
tool_return = ActionReturn(args={'text': command}, type=self.name) |
|
ret = ( |
|
timer(timeout or self.timeout)(self.exec)(command) |
|
if self._use_signals else self.exec(command)) |
|
if ret.status is Status.SUCCESS: |
|
tool_return.result = [{'type': 'text', 'content': ret.value}] |
|
tool_return.state = ActionStatusCode.SUCCESS |
|
else: |
|
tool_return.errmsg = ret.msg |
|
tool_return.state = ActionStatusCode.API_ERROR |
|
return tool_return |
|
|
|
def exec(self, code: str) -> ExecutionResult: |
|
"""Run Python scripts in IPython shell. |
|
|
|
Args: |
|
code (:class:`str`): code block |
|
|
|
Returns: |
|
:py:class:`ExecutionResult`: execution result |
|
""" |
|
with StringIO() as io: |
|
with redirect_stdout(io): |
|
ret = self._executor.run_cell(self.extract_code(code)) |
|
result = ret.result |
|
if result is not None: |
|
return ExecutionResult(Status.SUCCESS, |
|
str(result)[:self._max_out_len]) |
|
outs = io.getvalue().strip().split('\n') |
|
if not outs: |
|
return ExecutionResult(Status.SUCCESS, '') |
|
for i, out in enumerate(outs): |
|
if re.search('Error|Traceback', out, re.S): |
|
if 'TimeoutError' in out: |
|
return ExecutionResult( |
|
Status.FAILURE, |
|
msg=('The code interpreter encountered ' |
|
'a timeout error.')) |
|
err_idx = i |
|
break |
|
else: |
|
return ExecutionResult(Status.SUCCESS, |
|
outs[-1].strip()[:self._max_out_len]) |
|
return ExecutionResult( |
|
Status.FAILURE, |
|
msg=self._highlighting.sub( |
|
'', '\n'.join(outs[err_idx:])[:self._max_out_len]), |
|
) |
|
|
|
@staticmethod |
|
def create_shell(): |
|
from IPython import InteractiveShell |
|
from traitlets.config import Config |
|
|
|
c = Config() |
|
c.HistoryManager.enabled = False |
|
c.HistoryManager.hist_file = ':memory:' |
|
return InteractiveShell( |
|
user_ns={'_raise_timeout': _raise_timeout}, config=c) |
|
|
|
@staticmethod |
|
def extract_code(text: str) -> str: |
|
"""Extract Python code from markup languages. |
|
|
|
Args: |
|
text (:class:`str`): Markdown-formatted text |
|
|
|
Returns: |
|
:class:`str`: Python code |
|
""" |
|
import json5 |
|
|
|
|
|
triple_match = re.search(r'```[^\n]*\n(.+?)```', text, re.DOTALL) |
|
|
|
single_match = re.search(r'`([^`]*)`', text, re.DOTALL) |
|
if triple_match: |
|
text = triple_match.group(1) |
|
elif single_match: |
|
text = single_match.group(1) |
|
else: |
|
try: |
|
text = json5.loads(text)['code'] |
|
except Exception: |
|
pass |
|
|
|
return text |
|
|
|
@staticmethod |
|
def wrap_code_with_timeout(code: str, timeout: int) -> str: |
|
if not code.strip(): |
|
return code |
|
code = code.strip('\n').rstrip() |
|
indent = len(code) - len(code.lstrip()) |
|
handle = ' ' * indent + f'with _raise_timeout({timeout}):\n' |
|
block = '\n'.join([' ' + line for line in code.split('\n')]) |
|
wrapped_code = handle + block |
|
last_line = code.split('\n')[-1] |
|
is_expression = True |
|
try: |
|
compile(last_line.lstrip(), '<stdin>', 'eval') |
|
except SyntaxError: |
|
is_expression = False |
|
if is_expression: |
|
wrapped_code += '\n' * 5 + last_line |
|
return wrapped_code |
|
|
|
|
|
class AsyncIPythonInteractive(AsyncActionMixin, IPythonInteractive): |
|
"""An interactive IPython shell for code execution. |
|
|
|
Args: |
|
timeout (int): Upper bound of waiting time for Python script execution. |
|
Defaults to ``20``. |
|
max_out_len (int): maximum output length. No truncation occurs if negative. |
|
Defaults to ``2048``. |
|
use_signals (bool): whether signals should be used for timing function out |
|
or the multiprocessing. Set to ``False`` when not running in the main |
|
thread, e.g. web applications. Defaults to ``True`` |
|
description (dict): The description of the action. Defaults to ``None``. |
|
parser (Type[BaseParser]): The parser class to process the |
|
action's inputs and outputs. Defaults to :class:`JsonParser`. |
|
""" |
|
|
|
@tool_api |
|
async def run(self, |
|
command: str, |
|
timeout: Optional[int] = None) -> ActionReturn: |
|
"""Launch an IPython Interactive Shell to execute code. |
|
|
|
Args: |
|
command (:class:`str`): Python code snippet |
|
timeout (:class:`Optional[int]`): timeout for execution. |
|
This argument only works in the main thread. Defaults to ``None``. |
|
""" |
|
tool_return = ActionReturn(args={'text': command}, type=self.name) |
|
ret = await self.exec(command, timeout) |
|
if ret.status is Status.SUCCESS: |
|
tool_return.result = [{'type': 'text', 'content': ret.value}] |
|
tool_return.state = ActionStatusCode.SUCCESS |
|
else: |
|
tool_return.errmsg = ret.msg |
|
tool_return.state = ActionStatusCode.API_ERROR |
|
return tool_return |
|
|
|
async def exec(self, code: str, timeout: int = None) -> ExecutionResult: |
|
"""Asynchronously run Python scripts in IPython shell. |
|
|
|
Args: |
|
code (:class:`str`): code block |
|
timeout (:class:`int`): max waiting time for code execution |
|
|
|
Returns: |
|
:py:class:`ExecutionResult`: execution result |
|
""" |
|
with StringIO() as io: |
|
with redirect_stdout(io): |
|
ret = await self._executor.run_cell_async( |
|
|
|
self.wrap_code_with_timeout( |
|
self.extract_code(code), timeout or self.timeout)) |
|
result = ret.result |
|
if result is not None: |
|
return ExecutionResult(Status.SUCCESS, |
|
str(result)[:self._max_out_len]) |
|
outs = io.getvalue().strip().split('\n') |
|
if not outs: |
|
return ExecutionResult(Status.SUCCESS, '') |
|
for i, out in enumerate(outs): |
|
if re.search('Error|Traceback', out, re.S): |
|
if 'TimeoutError' in out: |
|
return ExecutionResult( |
|
Status.FAILURE, |
|
msg=('The code interpreter encountered a ' |
|
'timeout error.')) |
|
err_idx = i |
|
break |
|
else: |
|
return ExecutionResult(Status.SUCCESS, |
|
outs[-1].strip()[:self._max_out_len]) |
|
return ExecutionResult( |
|
Status.FAILURE, |
|
msg=self._highlighting.sub( |
|
'', '\n'.join(outs[err_idx:])[:self._max_out_len]), |
|
) |
|
|