project1 / kernel.py
teamalphabmsit's picture
Upload folder using huggingface_hub
44504f7 verified
import queue
# import streamlit as st
import base64
from io import BytesIO
import re
import os
import jupyter_client
from PIL import Image
from subprocess import PIPE
from pprint import pprint
import nbformat as nbf
IPYKERNEL = os.environ.get('IPYKERNEL', 'dsagent')
# client = get_client()
class CodeKernel(object):
def __init__(self,
kernel_name='kernel',
kernel_id=None,
kernel_config_path="",
python_path=None,
ipython_path=None,
init_file_path="./startup.py",
verbose=1,
max_exe_time=6000):
self.kernel_name = kernel_name
self.kernel_id = kernel_id
self.kernel_config_path = kernel_config_path
self.python_path = python_path
self.ipython_path = ipython_path
self.init_file_path = init_file_path
self.executed_cells = []
self.verbose = verbose
self.interrupt_signal = False
self.max_exe_time = max_exe_time
if python_path is None and ipython_path is None:
env = None
else:
env = {"PATH": self.python_path + ":$PATH", "PYTHONPATH": self.python_path}
# Initialize the backend kernel
self.kernel_manager = jupyter_client.KernelManager(kernel_name=IPYKERNEL,
connection_file=self.kernel_config_path,
exec_files=[self.init_file_path],
env=env)
if self.kernel_config_path:
self.kernel_manager.load_connection_file()
self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
print("Backend kernel started with the configuration: {}".format(
self.kernel_config_path))
else:
self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
print("Backend kernel started with the configuration: {}".format(
self.kernel_manager.connection_file))
if verbose:
pprint(self.kernel_manager.get_connection_info())
# Initialize the code kernel
self.kernel = self.kernel_manager.blocking_client()
# self.kernel.load_connection_file()
self.kernel.start_channels()
print("Code kernel started.")
def execute(self, code):
self.kernel.execute(code)
try:
shell_msg = self.kernel.get_shell_msg(timeout=self.max_exe_time)
io_msg_content = self.kernel.get_iopub_msg(timeout=self.max_exe_time)['content']
msg_out_list = []
while True:
msg_out = io_msg_content
if 'name' in msg_out and msg_out['name'] != 'stderr':
msg_out_list.append(msg_out)
elif 'data' in msg_out and 'image/png' in msg_out['data']:
msg_out_list.append(msg_out)
### Poll the message
try:
io_msg_content = self.kernel.get_iopub_msg(timeout=self.max_exe_time)['content']
if 'execution_state' in io_msg_content and io_msg_content['execution_state'] == 'idle':
msg_out_list.append(msg_out)
break
except queue.Empty:
break
# msg_out = '\n'.join(msg_out_list)
return shell_msg, msg_out_list
except Exception as e:
print(e)
return None
def execute_code_(self, code):
msg_id = self.kernel.execute(code)
# Get the output of the code
msg_list = []
while True:
try:
iopub_msg = self.kernel.get_iopub_msg(timeout=self.max_exe_time)
msg_list.append(iopub_msg)
if iopub_msg['msg_type'] == 'status' and iopub_msg['content'].get('execution_state') == 'idle':
break
except:
if self.interrupt_signal:
self.kernel_manager.interrupt_kernel()
self.interrupt_signal = False
continue
all_output = []
for iopub_msg in msg_list:
if iopub_msg['msg_type'] == 'stream':
if iopub_msg['content'].get('name') == 'stdout':
output = iopub_msg['content']['text']
all_output.append(('stdout', output))
elif iopub_msg['msg_type'] == 'execute_result':
if 'data' in iopub_msg['content']:
if 'text/plain' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/plain']
all_output.append(('execute_result_text', output))
if 'text/html' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/html']
all_output.append(('execute_result_html', output))
if 'image/png' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/png']
all_output.append(('execute_result_png', output))
if 'image/jpeg' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/jpeg']
all_output.append(('execute_result_jpeg', output))
elif iopub_msg['msg_type'] == 'display_data':
if 'data' in iopub_msg['content']:
if 'text/plain' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/plain']
all_output.append(('display_text', output))
if 'text/html' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/html']
all_output.append(('display_html', output))
if 'image/png' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/png']
all_output.append(('display_png', output))
if 'image/jpeg' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/jpeg']
all_output.append(('display_jpeg', output))
elif iopub_msg['msg_type'] == 'error':
if 'traceback' in iopub_msg['content']:
output = '\n'.join(iopub_msg['content']['traceback'])
all_output.append(('error', output))
return all_output
def export(self, file_path):
nb = nbf.v4.new_notebook()
nb.cells = self.executed_cells
print("cell: ",nb.cells)
with open(file_path, 'w', encoding='utf-8') as f:
nbf.write(nb, f)
print(f"Notebook exported to {file_path}")
def execute_interactive(self, code, verbose=False):
shell_msg = self.kernel.execute_interactive(code)
if shell_msg is queue.Empty:
if verbose:
print("Timeout waiting for shell message.")
self.check_msg(shell_msg, verbose=verbose)
return shell_msg
def inspect(self, code, verbose=False):
msg_id = self.kernel.inspect(code)
shell_msg = self.kernel.get_shell_msg(timeout=30)
if shell_msg is queue.Empty:
if verbose:
print("Timeout waiting for shell message.")
self.check_msg(shell_msg, verbose=verbose)
return shell_msg
def get_error_msg(self, msg, verbose=False) -> str | None:
if msg['content']['status'] == 'error':
try:
error_msg = msg['content']['traceback']
except:
try:
error_msg = msg['content']['traceback'][-1].strip()
except:
error_msg = "Traceback Error"
if verbose:
print("Error: ", error_msg)
return error_msg
return None
def check_msg(self, msg, verbose=False):
status = msg['content']['status']
if status == 'ok':
if verbose:
print("Execution succeeded.")
elif status == 'error':
for line in msg['content']['traceback']:
if verbose:
print(line)
def shutdown(self):
# Shutdown the backend kernel
self.kernel_manager.shutdown_kernel(now=True)
print("Backend kernel shutdown.")
# Shutdown the code kernel
self.kernel.shutdown()
print("Code kernel shutdown.")
def restart(self):
# Restart the backend kernel
self.kernel_manager.restart_kernel()
print("Backend kernel restarted.")
def start(self):
# Initialize the code kernel
self.kernel = self.kernel_manager.blocking_client()
# self.kernel.load_connection_file()
self.kernel.start_channels()
print("Code kernel started.")
def interrupt(self):
# Interrupt the backend kernel
self.kernel_manager.interrupt_kernel()
print("Backend kernel interrupted.")
def is_alive(self):
return self.kernel.is_alive()
def b64_2_img(data):
buff = BytesIO(base64.b64decode(data))
return Image.open(buff)
def clean_ansi_codes(input_string):
ansi_escape = re.compile(r'(\x9B|\x1B\[|\u001b\[)[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', input_string)
def execute(code, kernel: CodeKernel) -> tuple[str, str | Image.Image]:
res = ""
res_type = None
# code = code.replace("<|observation|>", "")
# code = code.replace("<|assistant|>interpreter", "")
# code = code.replace("<|assistant|>", "")
# code = code.replace("<|user|>", "")
# code = code.replace("<|system|>", "")
msg, output_list = kernel.execute(code)
if msg['metadata']['status'] == "timeout":
return res_type, 'Timed out'
elif msg['metadata']['status'] == 'error':
return res_type, clean_ansi_codes('\n'.join(kernel.get_error_msg(msg, verbose=True))) #todo: change the res_type to error
if len(output_list) > 1:
res_list = []
for output in output_list:
if 'text' in output:
res_type = "text"
res = output['text']
elif 'data' in output:
for key in output['data']:
if 'text/plain' in key:
res_type = "text"
res = output['data'][key]
elif 'image/png' in key:
res_type = "image"
res = output['data'][key]
break
if res_type == "image":
#return res_type, b64_2_img(res)
res_list.append(b64_2_img(res))
elif res_type == "text" or res_type == "traceback": #traceback is error?
#res = res
res_list.append(res)
if len(res_list) > 1 and res_list[-1] == res_list[-2]:
res_list.pop()
res = '\n'.join(res_list)
return res_type, res
else:
output = output_list[0]
if 'text' in output: #output is a dict, what situation is 'text' in output?
res_type = "text"
res = output['text']
elif 'data' in output:
for key in output['data']:
if 'text/plain' in key: #key = 'text/plain'
res_type = "text"
res = output['data'][key]
elif 'image/png' in key: # image will be present as html file?
res_type = "image"
res = output['data'][key]
break
if res_type == "image":
return res_type, b64_2_img(res)
elif res_type == "text" or res_type == "traceback": # traceback is error?
res = res
return res_type, res
# @st.cache_resource
def get_kernel():
kernel = CodeKernel()
return kernel
if __name__ == '__main__':
kernel = CodeKernel()
table_code = """
import pandas as pd
data = pd.read_csv('/Users/stephensun/Desktop/pypro/LAMBDA/cache/conv_cache/c1829f26-f6b3-4cbc-96a7-88aef2c33df2-2024-08-13/wine.csv')
data.head()
"""
img_code = """
import matplotlib.pyplot as plt
x = [1, 2, 3, 4, 5]
y = [2, 3, 5, 7, 6]
plt.plot(x, y)
plt.title('Simple Line Plot')
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.show()
"""
res_type, res = execute(img_code, kernel)
print(res_type,res)
# kernel.export("my_notebook.ipynb")