Spaces:
Sleeping
Sleeping
import streamlit as st | |
from interpreter import interpreter | |
import os | |
from streamlit_extras.colored_header import colored_header | |
from streamlit_lottie import st_lottie | |
import json | |
import requests | |
import re | |
from datetime import datetime, timezone | |
from typing import Dict, Any | |
from streamlit.runtime.scriptrunner import get_script_run_ctx | |
import time | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
import shutil | |
from pathlib import Path | |
import hashlib | |
import streamlit_file_browser as sfb | |
# Add retry decorator for API calls | |
def load_lottieurl(url: str) -> Dict[str, Any]: | |
try: | |
r = requests.get(url, timeout=10) # Add timeout | |
r.raise_for_status() # Raise exception for bad status codes | |
return r.json() | |
except requests.exceptions.RequestException as e: | |
st.error(f"Failed to load animation: {str(e)}") | |
return None | |
# Add error handling for interpreter calls | |
def safe_interpreter_call(func, *args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
error_msg = str(e) | |
if "API key" in error_msg.lower(): | |
st.error("β API key error. Please check your API key in settings.") | |
elif "rate limit" in error_msg.lower(): | |
st.error("β³ Rate limit exceeded. Please wait a moment and try again.") | |
else: | |
st.error(f"β Error: {error_msg}") | |
return None | |
def get_session_id(): | |
ctx = get_script_run_ctx() | |
return ctx.session_id if ctx else None | |
def save_settings(): | |
"""Save current settings to session state and ensure they persist""" | |
settings = st.session_state.settings | |
session_id = get_session_id() | |
if session_id: | |
# Save all relevant state | |
st.session_state[f"persistent_settings_{session_id}"] = settings.copy() | |
st.session_state[f"persistent_model_{session_id}"] = st.session_state.selected_model | |
st.session_state[f"persistent_audio_{session_id}"] = st.session_state.selected_audio_track | |
def load_settings(): | |
"""Load settings from persistent storage""" | |
session_id = get_session_id() | |
if session_id: | |
# Load all saved state | |
if f"persistent_settings_{session_id}" in st.session_state: | |
st.session_state.settings = st.session_state[f"persistent_settings_{session_id}"].copy() | |
if f"persistent_model_{session_id}" in st.session_state: | |
st.session_state.selected_model = st.session_state[f"persistent_model_{session_id}"] | |
if f"persistent_audio_{session_id}" in st.session_state: | |
st.session_state.selected_audio_track = st.session_state[f"persistent_audio_{session_id}"] | |
def init_session_state(): | |
"""Initialize session state with proper error handling and defaults""" | |
try: | |
# Load persistent settings first | |
load_settings() | |
# Define all required session state keys and their defaults | |
default_state = { | |
"settings": { | |
"api_key": os.getenv("HF_API_KEY", ""), | |
"api_base": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FQwen%2FQwen2.5-Coder-32B-Instruct%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"model": "huggingface/Qwen/Qwen2.5-Coder-32B-Instruct", | |
"auto_run": True, | |
"theme": "light", | |
"code_style": "monokai", | |
"custom_instructions": "", | |
"safe_mode": "off", | |
"conversation_history": False, | |
"os_mode": False, | |
"os_restricted_mode": False, | |
"allowed_paths": ["/"], | |
"use_workspace": True, | |
}, | |
"selected_model": None, # Will be set from settings["model"] | |
"selected_audio_track": "Ambient", | |
"uploaded_files": [], | |
"settings_open": False, | |
"messages": [], | |
"original_system_message": interpreter.system_message, | |
"session_dir": None, | |
"last_request_time": time.time() | |
} | |
# Initialize all required state variables | |
for key, default_value in default_state.items(): | |
if key not in st.session_state: | |
st.session_state[key] = default_value | |
# Ensure selected_model is set from settings if not already set | |
if not st.session_state.selected_model: | |
st.session_state.selected_model = st.session_state.settings["model"] | |
# Always reset interpreter for fresh session | |
interpreter.reset() | |
# Apply initial settings | |
apply_interpreter_settings() | |
except Exception as e: | |
st.error(f"Error initializing session state: {str(e)}") | |
# Provide fallback values for critical settings | |
if "settings" not in st.session_state: | |
st.session_state.settings = default_state["settings"] | |
def apply_interpreter_settings(): | |
"""Apply interpreter settings""" | |
interpreter.llm.api_key = st.session_state.settings["api_key"] | |
interpreter.llm.api_base = st.session_state.settings["api_base"] | |
interpreter.llm.model = st.session_state.settings["model"] | |
interpreter.auto_run = st.session_state.settings["auto_run"] | |
interpreter.safe_mode = "off" | |
interpreter.conversation_history = False # Force this to False | |
interpreter.os = st.session_state.settings["os_mode"] | |
# Set allowed paths | |
interpreter.computer.allowed_paths = [get_session_folder()] if st.session_state.settings["use_workspace"] else \ | |
st.session_state.settings["allowed_paths"] | |
# Update system message | |
interpreter.system_message = st.session_state.original_system_message | |
if st.session_state.settings["custom_instructions"]: | |
interpreter.system_message += f"\n\nAdditional Instructions:\n{st.session_state.settings['custom_instructions']}" | |
if st.session_state.settings["use_workspace"]: | |
workspace_path = get_session_folder() | |
interpreter.system_message += f"\n\nWorkspace Path: {workspace_path}\nYou can only access files in this workspace directory." | |
class OutputController: | |
def __init__(self): | |
self.loop_detection = { | |
'last_content': None, | |
'repeat_count': 0, | |
'last_timestamp': datetime.now(), | |
'number_pattern': re.compile(r'^[\d\s.]+$'), | |
'terminal_spam': re.compile(r'^[0-9\s.]{20,}$'), # Long number sequences | |
'max_repeats': 3, | |
'timeout_seconds': 2 | |
} | |
def is_loop_detected(self, content: str) -> bool: | |
now = datetime.now() | |
content = str(content).strip() | |
# Immediately skip terminal spam | |
if self.loop_detection['terminal_spam'].match(content): | |
return True | |
if (content != self.loop_detection['last_content'] or | |
(now - self.loop_detection['last_timestamp']).seconds > self.loop_detection['timeout_seconds']): | |
self.loop_detection.update({ | |
'repeat_count': 0, | |
'last_content': content, | |
'last_timestamp': now | |
}) | |
return False | |
# More aggressive number detection | |
if self.loop_detection['number_pattern'].match(content): | |
self.loop_detection['repeat_count'] += 1 | |
if self.loop_detection['repeat_count'] > 2: # Reduced tolerance | |
return True | |
return False | |
# Move clear_chat_history outside main function | |
def clear_chat_history(): | |
"""Completely reset the chat state""" | |
interpreter.messages = [] # Clear interpreter's message history | |
interpreter.reset() # Full reset of interpreter | |
st.session_state.messages = [] # Clear UI message history | |
save_settings() # Ensure settings persist after clear | |
st.success("Chat history cleared!") | |
st.rerun() | |
# Move update_model_settings outside main function | |
def update_model_settings(): | |
st.session_state.selected_model = st.session_state.model_select | |
st.session_state.settings.update({ | |
"model": st.session_state.model_select, | |
"api_base": model_options[st.session_state.model_select] | |
}) | |
save_settings() # Save settings after update | |
# Define audio_tracks at module level | |
audio_tracks = { | |
"Ambient": "https://cdn.pixabay.com/download/audio/2022/02/22/audio_d1718ab41b.mp3", | |
"Lo-Fi": "https://cdn.pixabay.com/download/audio/2022/03/10/audio_2d7b426f87.mp3", | |
"Focus": "https://cdn.pixabay.com/download/audio/2022/01/18/audio_d0c6bf3c0e.mp3" | |
} | |
# Move to module level (top of file with other constants) | |
model_options = { | |
# Default and recommended model | |
"huggingface/Qwen/Qwen2.5-Coder-32B-Instruct": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FQwen%2FQwen2.5-Coder-32B-Instruct%26quot%3B%3C%2Fspan%3E%2C # Default | |
# Other Qwen 2.5 Coder Series | |
"huggingface/Qwen/Qwen2.5-Coder-14B-Instruct": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FQwen%2FQwen2.5-Coder-14B-Instruct%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/Qwen/Qwen2.5-Coder-7B-Instruct": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FQwen%2FQwen2.5-Coder-7B-Instruct%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
# Qwen 2.5 General Series | |
"huggingface/Qwen/Qwen2.5-72B-Instruct": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FQwen%2FQwen2.5-72B-Instruct%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/Qwen/Qwen2.5-32B-Instruct": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FQwen%2FQwen2.5-32B-Instruct%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/Qwen/Qwen2.5-7B-Instruct": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2FQwen%2FQwen2.5-7B-Instruct%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
# Other verified top performers | |
"huggingface/mistralai/Mixtral-8x7B-Instruct-v0.1": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fmistralai%2FMixtral-8x7B-Instruct-v0.1%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/mistralai/Mistral-7B-Instruct-v0.2": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fmistralai%2FMistral-7B-Instruct-v0.2%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/codellama/CodeLlama-34b-Instruct-hf": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fcodellama%2FCodeLlama-34b-Instruct-hf%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/codellama/CodeLlama-13b-Instruct-hf": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fcodellama%2FCodeLlama-13b-Instruct-hf%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/deepseek-ai/deepseek-coder-6.7b-instruct": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fdeepseek-ai%2Fdeepseek-coder-6.7b-instruct%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/microsoft/phi-2": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fmicrosoft%2Fphi-2%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
"huggingface/bigcode/starcoder2-15b": "/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2Fbigcode%2Fstarcoder2-15b%26quot%3B%3C%2Fspan%3E%2C%3C!-- HTML_TAG_END --> | |
} | |
def get_theme_styles(theme: str = "light") -> str: | |
"""Get theme styles based on Streamlit's native theming""" | |
return """ | |
/* Base styles */ | |
body { | |
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif; | |
line-height: 1.6; | |
} | |
/* Message container styles */ | |
.stChatMessage { | |
margin: 1rem 0; | |
padding: 1rem; | |
border-radius: 10px; | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
} | |
/* Code block styles */ | |
pre { | |
border-radius: 8px !important; | |
padding: 1rem !important; | |
margin: 1rem 0 !important; | |
border: 1px solid rgba(128, 128, 128, 0.2) !important; | |
overflow-x: auto !important; | |
} | |
code { | |
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, Courier, monospace !important; | |
font-size: 0.9em !important; | |
padding: 0.2em 0.4em !important; | |
border-radius: 3px !important; | |
} | |
/* Output block styles */ | |
.output-block { | |
border-radius: 8px; | |
padding: 1rem; | |
margin: 0.5rem 0; | |
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, Courier, monospace; | |
font-size: 0.9em; | |
border-left: 4px solid #FF4B4B; | |
opacity: 0.9; | |
} | |
/* Message spacing */ | |
.stMarkdown { | |
line-height: 1.6; | |
margin: 0.5rem 0; | |
} | |
.stMarkdown p { | |
margin: 0.75rem 0; | |
} | |
/* Button styles */ | |
.stButton button { | |
border-radius: 20px !important; | |
padding: 0.4rem 1rem !important; | |
border: 1px solid rgba(128, 128, 128, 0.2) !important; | |
font-weight: 500 !important; | |
transition: all 0.3s ease !important; | |
} | |
/* Header styles */ | |
.stMarkdown h1, .stMarkdown h2, .stMarkdown h3 { | |
margin: 1.5rem 0 1rem 0; | |
font-weight: 600; | |
} | |
/* Input field styles */ | |
.stTextInput > div > div > input { | |
border-radius: 10px !important; | |
border: 1px solid rgba(128, 128, 128, 0.2) !important; | |
padding: 0.75rem 1rem !important; | |
font-size: 1rem !important; | |
} | |
/* Chat message styles */ | |
.chat-message { | |
padding: 1.25rem; | |
border-radius: 12px; | |
margin: 1rem 0; | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
box-shadow: 0 2px 4px rgba(0,0,0,0.05); | |
} | |
/* User message specific styles */ | |
.user-message { | |
margin-left: auto; | |
max-width: 80%; | |
} | |
/* Assistant message specific styles */ | |
.assistant-message { | |
margin-right: auto; | |
max-width: 80%; | |
} | |
/* Error and warning styles */ | |
.error-message { | |
border-left: 4px solid #DC2626; | |
padding: 1rem; | |
margin: 1rem 0; | |
border-radius: 8px; | |
opacity: 0.9; | |
} | |
.warning-message { | |
border-left: 4px solid #F59E0B; | |
padding: 1rem; | |
margin: 1rem 0; | |
border-radius: 8px; | |
opacity: 0.9; | |
} | |
/* Success message styles */ | |
.success-message { | |
border-left: 4px solid #059669; | |
padding: 1rem; | |
margin: 1rem 0; | |
border-radius: 8px; | |
opacity: 0.9; | |
} | |
/* Floating audio player styles */ | |
.floating-audio { | |
position: fixed; | |
bottom: 20px; | |
right: 20px; | |
z-index: 9999; | |
padding: 1rem; | |
border-radius: 12px; | |
box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
backdrop-filter: blur(10px); | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
max-width: 300px; | |
transition: all 0.3s ease; | |
} | |
.floating-audio:hover { | |
transform: translateY(-2px); | |
box-shadow: 0 6px 8px rgba(0,0,0,0.15); | |
} | |
.floating-audio audio { | |
width: 250px; | |
height: 40px; | |
opacity: 0.9; | |
border-radius: 8px; | |
margin-bottom: 0.5rem; | |
} | |
.floating-audio select { | |
width: 100%; | |
padding: 0.5rem; | |
border-radius: 8px; | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
font-size: 0.9rem; | |
cursor: pointer; | |
} | |
/* File browser styles */ | |
.file-browser { | |
border-radius: 12px; | |
padding: 1rem; | |
margin: 1rem 0; | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
} | |
.file-item { | |
display: flex; | |
align-items: center; | |
padding: 0.75rem; | |
border-bottom: 1px solid rgba(128, 128, 128, 0.2); | |
transition: all 0.2s ease; | |
} | |
.file-item:hover { | |
opacity: 0.8; | |
} | |
/* Settings panel styles */ | |
.settings-panel { | |
border-radius: 12px; | |
padding: 1.5rem; | |
margin: 1rem 0; | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
} | |
/* Tab styles */ | |
.stTabs { | |
border-radius: 8px; | |
padding: 0.5rem; | |
} | |
.stTab { | |
border-radius: 8px !important; | |
padding: 0.5rem 1rem !important; | |
} | |
""" | |
class ChatMessage: | |
def __init__(self, content="", message_type="text", role="assistant"): | |
self.content = content | |
self.type = message_type | |
self.role = role | |
self.timestamp = datetime.now(timezone.utc) | |
self.id = hashlib.md5(f"{self.timestamp.isoformat()}-{content}".encode()).hexdigest() | |
self._processed_content = None # Cache for processed content | |
def to_dict(self): | |
return { | |
"content": self.content, | |
"type": self.type, | |
"role": self.role, | |
"timestamp": self.timestamp.isoformat(), | |
"id": self.id | |
} | |
def from_dict(cls, data): | |
msg = cls( | |
content=data["content"], | |
message_type=data["type"], | |
role=data["role"] | |
) | |
msg.timestamp = datetime.fromisoformat(data["timestamp"]) | |
msg.id = data["id"] | |
return msg | |
def get_formatted_content(self, force_refresh=False): | |
"""Get formatted content with caching""" | |
if self._processed_content is None or force_refresh: | |
self._processed_content = format_message(self) | |
return self._processed_content | |
def format_message(message: ChatMessage) -> str: | |
"""Format message with improved markdown and syntax highlighting for large responses""" | |
try: | |
if message.type == "code": | |
# Default to Python for code blocks | |
lang = "python" | |
content = message.content.strip() | |
# Enhanced language detection | |
if content.startswith("```"): | |
first_line = content.split("\n")[0] | |
lang = first_line.replace("```", "").strip() or "python" | |
content = "\n".join(content.split("\n")[1:]) | |
if content.endswith("```"): | |
content = content[:-3] | |
# Extended language detection | |
if "." in content: | |
ext_match = re.search(r'\.(py|js|html|css|json|md|sql|sh|bash|yaml|yml|java|cpp|c|go|rs|ts)$', content.lower()) | |
if ext_match: | |
lang_map = { | |
'py': 'python', | |
'js': 'javascript', | |
'html': 'html', | |
'css': 'css', | |
'json': 'json', | |
'md': 'markdown', | |
'sql': 'sql', | |
'sh': 'bash', | |
'bash': 'bash', | |
'yaml': 'yaml', | |
'yml': 'yaml', | |
'java': 'java', | |
'cpp': 'cpp', | |
'c': 'c', | |
'go': 'go', | |
'rs': 'rust', | |
'ts': 'typescript' | |
} | |
lang = lang_map.get(ext_match.group(1), lang) | |
# Format code with proper spacing and syntax | |
formatted_content = f"```{lang}\n{content.strip()}\n```" | |
# Add visual separator for multiple code blocks | |
if "\n\n```" in message.content: | |
formatted_content = f"\n{formatted_content}\n" | |
return formatted_content | |
elif message.type == "error": | |
return f'<div class="error-message">β **Error:** {message.content}</div>' | |
elif message.type == "warning": | |
return f'<div class="warning-message">β οΈ **Warning:** {message.content}</div>' | |
elif message.type == "success": | |
return f'<div class="success-message">β {message.content}</div>' | |
else: | |
# Clean and format regular text | |
content = message.content.strip() | |
# Handle inline code with better spacing | |
content = re.sub(r'(?<!`)`([^`]+)`(?!`)', r' <code>\1</code> ', content) | |
content = re.sub(r'\s+', ' ', content) # Normalize spaces | |
# Handle markdown links with proper spacing | |
content = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'<a href="\2" target="_blank">\1</a>', content) | |
# Improve list formatting | |
content = re.sub(r'(\n\s*[-*]\s+[^\n]+)(?=\n\s*[^-*]|\Z)', r'\1\n', content) | |
# Enhanced console output formatting | |
if "$ " in content or "%" in content: | |
lines = content.split("\n") | |
formatted_lines = [] | |
in_output = False | |
output_buffer = [] | |
for line in lines: | |
if line.strip().startswith(("$ ", "%")): | |
# Format collected output | |
if output_buffer: | |
formatted_lines.append(f'<div class="output-block">{"".join(output_buffer)}</div>') | |
output_buffer = [] | |
# Format command with proper styling | |
formatted_lines.append(f'<code class="command">{line}</code>') | |
in_output = True | |
elif in_output: | |
# Collect output lines | |
output_buffer.append(line + "\n") | |
else: | |
# Regular text line | |
formatted_lines.append(line) | |
# Handle any remaining output | |
if output_buffer: | |
formatted_lines.append(f'<div class="output-block">{"".join(output_buffer)}</div>') | |
content = "\n".join(formatted_lines) | |
# Clean up excessive newlines | |
content = re.sub(r'\n{3,}', '\n\n', content) | |
return content | |
except Exception as e: | |
st.error(f"Error formatting message: {str(e)}") | |
return message.content | |
def handle_user_input(user_input: str): | |
"""Handle user input with improved streaming and chunking for large responses""" | |
if not user_input.strip(): | |
return | |
# Rate limiting with exponential backoff | |
current_time = time.time() | |
if hasattr(st.session_state, 'last_request_time'): | |
time_since_last = current_time - st.session_state.last_request_time | |
min_interval = 1.0 # Base interval in seconds | |
if hasattr(st.session_state, 'request_count'): | |
st.session_state.request_count += 1 | |
if st.session_state.request_count > 5: | |
min_interval = min(5.0, min_interval * 1.5) | |
else: | |
st.session_state.request_count = 1 | |
if time_since_last < min_interval: | |
st.warning(f"Please wait {min_interval - time_since_last:.1f} seconds before sending another message...") | |
time.sleep(min_interval - time_since_last) | |
st.session_state.last_request_time = current_time | |
st.session_state.request_count = 1 | |
# Add user message | |
user_message = ChatMessage(user_input, "text", "user") | |
st.session_state.messages.append(user_message) | |
with st.chat_message("user", avatar="π§βπ»"): | |
st.markdown(user_message.get_formatted_content()) | |
# Process with interpreter | |
try: | |
with st.chat_message("assistant", avatar="π€"): | |
message_container = st.container() | |
with st.spinner("Thinking..."): | |
# Initialize buffers and state | |
message_buffer = [] | |
code_buffer = [] | |
current_chunk = { | |
'type': 'message', | |
'content': '', | |
'language': None | |
} | |
# Create placeholder for streaming updates | |
with message_container: | |
response_placeholder = st.empty() | |
# Enhanced streaming with chunking | |
for chunk in interpreter.chat(user_input, stream=True, display=False): | |
if isinstance(chunk, dict): | |
content = str(chunk.get('content', '')) # Convert content to string | |
chunk_type = chunk.get('type', 'message') | |
# Skip empty chunks | |
if not content: | |
continue | |
# Handle different chunk types | |
if chunk_type == 'message': | |
# Flush code buffer if exists | |
if code_buffer: | |
code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string | |
if code_text.strip(): | |
message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n") | |
code_buffer = [] | |
# Add message content | |
message_buffer.append(content) | |
current_chunk = {'type': 'message', 'content': content} | |
elif chunk_type in ['code', 'console']: | |
# Start new code block if needed | |
if current_chunk['type'] != 'code': | |
if code_buffer: # Flush previous code buffer | |
code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string | |
if code_text.strip(): | |
message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n") | |
code_buffer = [] | |
current_chunk = { | |
'type': 'code', | |
'language': chunk.get('format', 'python') | |
} | |
# Accumulate code content | |
code_buffer.append(content) | |
# Update display with proper chunking | |
try: | |
display_content = ''.join(str(item) for item in message_buffer) # Convert each item to string | |
if code_buffer: # Add current code buffer if exists | |
code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string | |
if code_text.strip(): | |
display_content += f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```" | |
# Use markdown for display with proper formatting | |
response_placeholder.markdown(display_content) | |
except Exception as e: | |
st.error(f"Error updating display: {str(e)}") | |
# Final cleanup and display | |
try: | |
# Handle any remaining code buffer | |
if code_buffer: | |
code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string | |
if code_text.strip(): | |
message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n") | |
# Prepare final response | |
final_response = ''.join(str(item) for item in message_buffer) # Convert each item to string | |
# Create and store assistant message | |
assistant_message = ChatMessage(final_response, "text", "assistant") | |
st.session_state.messages.append(assistant_message) | |
# Final display update | |
response_placeholder.markdown(assistant_message.get_formatted_content()) | |
except Exception as e: | |
st.error(f"Error in final display update: {str(e)}") | |
response_placeholder.markdown(final_response) | |
except Exception as e: | |
error_msg = ChatMessage(str(e), "error", "assistant") | |
st.session_state.messages.append(error_msg) | |
st.error(error_msg.get_formatted_content()) | |
# Add file handling functions | |
def get_session_folder() -> str: | |
"""Get or create the session folder for file uploads with proper error handling""" | |
if not st.session_state.settings["use_workspace"]: | |
return "/" | |
try: | |
# If a custom workspace path is set, use it | |
if st.session_state.get("session_dir"): | |
workspace_dir = Path(st.session_state.session_dir) | |
else: | |
# Use default workspace directory | |
workspace_dir = Path("autointerpreter-workspace") | |
# Create directory if it doesn't exist | |
workspace_dir.mkdir(parents=True, exist_ok=True, mode=0o755) | |
# If no session directory is set, create one | |
if not st.session_state.get("session_dir"): | |
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H_%M") | |
session_dir = workspace_dir / f"session-{timestamp}" | |
session_dir.mkdir(exist_ok=True, mode=0o755) | |
st.session_state.session_dir = str(session_dir) | |
st.session_state.uploaded_files = [] | |
update_custom_instructions() | |
# Verify the workspace directory exists and is accessible | |
if not workspace_dir.exists(): | |
st.error(f"Workspace directory not found: {workspace_dir}") | |
# Create a new default workspace | |
workspace_dir = Path("autointerpreter-workspace") | |
workspace_dir.mkdir(parents=True, exist_ok=True, mode=0o755) | |
st.session_state.session_dir = str(workspace_dir) | |
st.session_state.uploaded_files = [] | |
update_custom_instructions() | |
return st.session_state.session_dir | |
except Exception as e: | |
st.error(f"Error managing workspace: {str(e)}") | |
return "/" | |
def handle_file_upload(uploaded_files): | |
"""Enhanced file upload handling with better error handling and validation""" | |
if not uploaded_files: | |
return | |
try: | |
session_dir = Path(get_session_folder()) | |
if str(session_dir) == "/": | |
st.error("Invalid workspace configuration!") | |
return | |
if not session_dir.exists(): | |
st.error("Session directory does not exist!") | |
try: | |
session_dir.mkdir(parents=True, exist_ok=True, mode=0o755) | |
st.success("Created new session directory.") | |
except Exception as e: | |
st.error(f"Failed to create session directory: {str(e)}") | |
return | |
for uploaded_file in uploaded_files: | |
try: | |
# Validate file | |
if uploaded_file.size == 0: | |
st.warning(f"Skipping empty file: {uploaded_file.name}") | |
continue | |
# Sanitize filename | |
safe_filename = Path(uploaded_file.name).name | |
safe_filename = re.sub(r'[^a-zA-Z0-9._-]', '_', safe_filename) | |
# Validate extension | |
allowed_extensions = {'.txt', '.py', '.js', '.html', '.css', '.json', '.md', '.csv', '.yml', '.yaml'} | |
file_ext = Path(safe_filename).suffix.lower() | |
if file_ext not in allowed_extensions: | |
st.warning(f"Unsupported file type: {file_ext}. Skipping {safe_filename}") | |
continue | |
file_path = session_dir / safe_filename | |
# Handle file conflicts | |
if file_path.exists(): | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
name_parts = safe_filename.rsplit('.', 1) | |
safe_filename = f"{name_parts[0]}_{timestamp}.{name_parts[1]}" if len(name_parts) > 1 else f"{safe_filename}_{timestamp}" | |
file_path = session_dir / safe_filename | |
# Check file size | |
file_size = len(uploaded_file.getvalue()) | |
if file_size > 100 * 1024 * 1024: # 100MB limit | |
st.warning(f"File {safe_filename} is too large (>{file_size/(1024*1024):.1f}MB). Skipping...") | |
continue | |
# Save file with error handling | |
try: | |
with open(file_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
# Add to session state | |
file_info = { | |
"name": safe_filename, | |
"path": str(file_path), | |
"size": uploaded_file.size, | |
"type": uploaded_file.type or "application/octet-stream", | |
"timestamp": datetime.now(timezone.utc) | |
} | |
if "uploaded_files" not in st.session_state: | |
st.session_state.uploaded_files = [] | |
# Update or append file info | |
existing_file = next((f for f in st.session_state.uploaded_files if f["path"] == str(file_path)), None) | |
if existing_file: | |
existing_file.update(file_info) | |
else: | |
st.session_state.uploaded_files.append(file_info) | |
st.success(f"Successfully uploaded: {safe_filename}") | |
except Exception as e: | |
st.error(f"Error saving {safe_filename}: {str(e)}") | |
if file_path.exists(): | |
try: | |
file_path.unlink() | |
except Exception as cleanup_error: | |
st.error(f"Error cleaning up partial file: {str(cleanup_error)}") | |
continue | |
except Exception as e: | |
st.error(f"Error processing upload {uploaded_file.name}: {str(e)}") | |
continue | |
# Update instructions after successful uploads | |
update_custom_instructions() | |
except Exception as e: | |
st.error(f"Error handling file uploads: {str(e)}") | |
def update_custom_instructions(): | |
"""Update custom instructions with workspace info""" | |
workspace_path = get_session_folder() | |
files_info = "" | |
if st.session_state.get("uploaded_files"): | |
files = [f"{info['name']} ({info['size'] / 1024:.1f} KB)" | |
for info in st.session_state.uploaded_files] | |
files_info = "\nFiles: " + ", ".join(files) | |
workspace_info = f"\nWorking Space: {workspace_path}{files_info}" | |
# Update instructions | |
current_instructions = st.session_state.settings.get("custom_instructions", "").strip() | |
current_instructions = re.sub(r'\nWorking Space:.*?(?=\n|$)', '', current_instructions, flags=re.MULTILINE) | |
current_instructions = re.sub(r'\nFiles:.*?(?=\n|$)', '', current_instructions, flags=re.MULTILINE) | |
new_instructions = current_instructions + ("\n" if current_instructions else "") + workspace_info | |
st.session_state.settings["custom_instructions"] = new_instructions | |
apply_interpreter_settings() | |
def get_file_icon(file_type: str) -> str: | |
"""Get appropriate icon for file type""" | |
if file_type.startswith('image/'): | |
return "πΌοΈ" | |
elif file_type.startswith('text/'): | |
return "π" | |
elif file_type.startswith('application/pdf'): | |
return "π" | |
elif file_type.startswith('application/json'): | |
return "οΏ½οΏ½οΏ½οΏ½" | |
elif 'python' in file_type.lower(): | |
return "π" | |
elif 'javascript' in file_type.lower(): | |
return "" | |
elif 'spreadsheet' in file_type.lower(): | |
return "π" | |
else: | |
return "π" | |
# Add audio track selection handler | |
def update_audio_track(): | |
"""Update the selected audio track""" | |
st.session_state.selected_audio_track = st.session_state.audio_select | |
def create_audio_player(): | |
return f""" | |
<div class="floating-audio"> | |
<audio id="audio-player" controls loop style="width: 250px; height: 40px;" | |
onerror="handleAudioError()" | |
onloadeddata="handleAudioLoaded()"> | |
<source id="audio-source" src="{audio_tracks[st.session_state.selected_audio_track]}" type="audio/mpeg"> | |
Your browser does not support the audio element. | |
</audio> | |
<select id="audio-select" onchange="updateAudio(this.value)" | |
style="width: 100%; padding: 5px; border-radius: 5px; border: 1px solid var(--border-color);"> | |
{' '.join([f'<option value="{url}" {"selected" if name == st.session_state.selected_audio_track else ""}>{name}</option>' | |
for name, url in audio_tracks.items()])} | |
</select> | |
</div> | |
<script> | |
function handleAudioError() {{ | |
console.error('Audio playback error'); | |
const player = document.getElementById('audio-player'); | |
player.style.opacity = '0.5'; | |
player.title = 'Error loading audio'; | |
}} | |
function handleAudioLoaded() {{ | |
const player = document.getElementById('audio-player'); | |
player.style.opacity = '1'; | |
player.title = ''; | |
}} | |
function updateAudio(url) {{ | |
try {{ | |
const audioPlayer = document.getElementById('audio-player'); | |
const audioSource = document.getElementById('audio-source'); | |
const wasPlaying = !audioPlayer.paused; | |
audioSource.src = url; | |
audioPlayer.load(); | |
if (wasPlaying) {{ | |
const playPromise = audioPlayer.play(); | |
if (playPromise !== undefined) {{ | |
playPromise.catch(error => {{ | |
console.error('Error playing audio:', error); | |
}}); | |
}} | |
}} catch (error) {{ | |
console.error('Error updating audio:', error); | |
}} | |
}} | |
// Add auto-retry for failed audio loads | |
document.addEventListener('DOMContentLoaded', function() {{ | |
const audioPlayer = document.getElementById('audio-player'); | |
let retryCount = 0; | |
const maxRetries = 3; | |
audioPlayer.addEventListener('error', function() {{ | |
if (retryCount < maxRetries) {{ | |
setTimeout(() => {{ | |
console.log('Retrying audio load...'); | |
audioPlayer.load(); | |
retryCount++; | |
}}, 1000 * retryCount); | |
}} | |
}}); | |
}}); | |
</script> | |
""" | |
def cleanup_old_sessions(): | |
"""Clean up old session directories with improved error handling and format detection""" | |
try: | |
workspace_dir = Path("autointerpreter-workspace") | |
if not workspace_dir.exists(): | |
return | |
current_time = datetime.now(timezone.utc) | |
current_session = st.session_state.get("session_dir") | |
# Define supported timestamp formats | |
timestamp_formats = [ | |
"%Y-%m-%d_%H_%M", # Standard format: 2024-01-20_14_30 | |
"%Y%m%d_%H%M%S", # Compact format: 20240120_143000 | |
] | |
for session_dir in workspace_dir.glob("session-*"): | |
try: | |
if str(session_dir) == current_session: | |
continue | |
dir_name = session_dir.name | |
timestamp_str = None | |
session_time = None | |
# Try to extract timestamp based on different patterns | |
if dir_name.startswith("session-"): | |
# Try standard timestamp format first | |
timestamp_str = dir_name.replace("session-", "") | |
# Try each supported format | |
for fmt in timestamp_formats: | |
try: | |
session_time = datetime.strptime(timestamp_str, fmt).replace(tzinfo=timezone.utc) | |
break | |
except ValueError: | |
continue | |
# If no supported format matches, check for UUID-like format | |
if not session_time and ( | |
len(timestamp_str) == 36 or # Standard UUID | |
len(timestamp_str) == 32 or # Compact UUID | |
'-' in timestamp_str # Any UUID-like string | |
): | |
# For UUID-based sessions, use file modification time | |
try: | |
mtime = session_dir.stat().st_mtime | |
session_time = datetime.fromtimestamp(mtime, tz=timezone.utc) | |
except Exception: | |
continue | |
if not session_time: | |
# Skip without warning for unrecognized formats | |
continue | |
# If older than 24 hours | |
if (current_time - session_time).days >= 1: | |
try: | |
if session_dir.exists(): # Double check existence | |
# Check if directory is empty | |
has_files = any(session_dir.iterdir()) | |
if has_files: | |
# Move to archive instead of deleting if contains files | |
archive_dir = workspace_dir / "archived_sessions" | |
archive_dir.mkdir(exist_ok=True) | |
new_name = f"archived_{dir_name}_{current_time.strftime('%Y%m%d_%H%M%S')}" | |
shutil.move(str(session_dir), str(archive_dir / new_name)) | |
print(f"Archived session with files: {session_dir}") | |
else: | |
# Delete if empty | |
shutil.rmtree(session_dir) | |
print(f"Cleaned up empty session: {session_dir}") | |
except Exception as e: | |
print(f"Error processing session directory {session_dir}: {str(e)}") | |
except Exception as e: | |
print(f"Error processing session directory {session_dir}: {str(e)}") | |
continue | |
except Exception as e: | |
print(f"Error in cleanup: {str(e)}") | |
def main(): | |
# 1. Initialize session state | |
init_session_state() | |
# 2. Page config (should be at top) | |
st.set_page_config( | |
page_title="AutoInterpreter", | |
layout="wide", | |
initial_sidebar_state="collapsed", | |
menu_items={ | |
'Get Help': 'https://github.com/samihalawa/autointerpreter', | |
'Report a bug': "https://github.com/samihalawa/autointerpreter/issues", | |
'About': "# AutoInterpreter\nThe Final AI Coding Experience" | |
} | |
) | |
# 3. Load and apply styles | |
st.markdown(""" | |
<style> | |
.stButton button { | |
border-radius: 20px; | |
padding: 0.2rem 0.5rem; | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
transition: all 0.3s ease; | |
} | |
.stButton button:hover { | |
opacity: 0.8; | |
} | |
.header-buttons { | |
display: flex; | |
justify-content: flex-end; | |
gap: 0.5rem; | |
} | |
.header-buttons button { | |
min-width: 0 !important; | |
height: auto !important; | |
padding: 0.2rem !important; | |
} | |
.stTextInput > div > div > input { | |
border-radius: 10px; | |
} | |
/* Add floating audio player styles */ | |
.floating-audio { | |
position: fixed; | |
bottom: 20px; | |
right: 20px; | |
z-index: 9999; | |
padding: 10px; | |
border-radius: 10px; | |
box-shadow: 0 2px 5px rgba(0,0,0,0.1); | |
backdrop-filter: blur(5px); | |
display: flex; | |
flex-direction: column; | |
gap: 5px; | |
max-width: 300px; | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
} | |
.floating-audio audio { | |
width: 250px; | |
height: 40px; | |
opacity: 0.8; | |
} | |
.floating-audio select { | |
width: 100%; | |
padding: 5px; | |
border-radius: 5px; | |
border: 1px solid rgba(128, 128, 128, 0.2); | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# 4. Setup UI components | |
lottie_coding = load_lottieurl('https://assets5.lottiefiles.com/packages/lf20_fcfjwiyb.json') | |
# 5. Create header and settings UI | |
col1, col2, col3 = st.columns([0.2, 0.6, 0.2]) | |
with col2: | |
st_lottie(lottie_coding, height=200, key="coding") | |
colored_header( | |
label="AutoInterpreter", | |
description="Run Any Code. The Final AI Coding Experience.", | |
color_name="red-70" | |
) | |
with col3: | |
st.markdown('<div class="header-buttons">', unsafe_allow_html=True) | |
btn_col1, btn_col2 = st.columns([1, 1]) | |
with btn_col1: | |
# Toggle settings when button is clicked | |
if st.button("βοΈ", help="Configure AutoInterpreter", key="settings_btn"): | |
st.session_state.settings_open = not st.session_state.settings_open | |
with btn_col2: | |
if st.button("β", help="Clear chat history", key="clear_btn"): | |
if st.button("β", help="Confirm clear", key="confirm_btn"): | |
clear_chat_history() | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Move theme application before settings panel | |
theme = st.session_state.settings.get("theme", "light") | |
st.markdown(f"<style>{get_theme_styles(theme)}</style>", unsafe_allow_html=True) | |
# Enhanced Settings modal with tabs | |
if st.session_state.settings_open: | |
with st.expander("Settings Panel", expanded=True): | |
# Add close button to top-right of settings panel | |
col1, col2 = st.columns([0.9, 0.1]) | |
with col2: | |
if st.button("βοΈ", help="Close settings", key="close_settings"): | |
st.session_state.settings_open = False | |
save_settings() # Save settings before closing | |
st.rerun() | |
# Settings tabs | |
tab1, tab2, tab3 = st.tabs(["API & Model", "Code Settings", "Assistant Settings"]) | |
with tab1: | |
# API Settings | |
st.text_input( | |
"API Key", | |
value=st.session_state.settings["api_key"], | |
type="password", | |
key="api_key", | |
help="Enter your HuggingFace API key" | |
) | |
st.markdown("---") | |
st.markdown("### π€ Model Selection") | |
# Current model display | |
current_model = st.session_state.selected_model.split('/')[-1] | |
st.info(f"Current Model: **{current_model}**", icon="π€") | |
# Model Selection with categories | |
model_category = st.radio( | |
"Model Category", | |
["Qwen Coder Series", "Qwen General Series", "Other Models"], | |
help="Select model category" | |
) | |
filtered_models = { | |
k: v for k, v in model_options.items() | |
if ( | |
(model_category == "Qwen Coder Series" and "Qwen2.5-Coder" in k) or | |
(model_category == "Qwen General Series" and "Qwen2.5-" in k and "Coder" not in k) or | |
(model_category == "Other Models" and "Qwen" not in k) | |
) | |
} | |
selected_model = st.selectbox( | |
"Select Model", | |
options=list(filtered_models.keys()), | |
format_func=lambda x: x.split('/')[-1], | |
key="model_select", | |
help="Choose your preferred model" | |
) | |
# Theme Selection | |
st.markdown("---") | |
st.markdown("### π¨ Theme") | |
theme_selection = st.selectbox( | |
"UI Theme", | |
options=["light", "dark"], | |
index=0 if theme == "light" else 1, | |
key="theme_select", | |
help="Select the UI theme" | |
) | |
# Save Settings Button - More prominent | |
st.markdown("---") | |
col1, col2 = st.columns([0.7, 0.3]) | |
with col2: | |
if st.button("πΎ Save Changes", type="primary", use_container_width=True): | |
# Update settings | |
st.session_state.settings.update({ | |
"api_key": st.session_state.api_key, | |
"model": st.session_state.model_select, | |
"api_base": model_options[st.session_state.model_select], | |
"theme": st.session_state.theme_select | |
}) | |
st.session_state.selected_model = st.session_state.model_select | |
save_settings() | |
st.session_state.settings_open = False | |
st.success("β Settings saved successfully!") | |
time.sleep(0.5) | |
st.rerun() | |
with tab2: | |
# Code Execution Settings | |
col1, col2 = st.columns(2) | |
with col1: | |
st.toggle( | |
"Auto Run Code", | |
value=st.session_state.settings["auto_run"], | |
key="auto_run", | |
help="Automatically execute code without confirmation", | |
on_change=lambda: st.session_state.settings.update({"auto_run": st.session_state.auto_run}) | |
) | |
st.selectbox( | |
"Code Style", | |
options=["monokai", "github", "dracula"], | |
index=0, | |
key="code_style", | |
help="Code highlighting theme", | |
on_change=lambda: st.session_state.settings.update({"code_style": st.session_state.code_style}) | |
) | |
with col2: | |
st.selectbox( | |
"Safe Mode", | |
options=["off", "ask", "auto"], | |
index=0, | |
key="safe_mode", | |
help="Code execution safety level", | |
on_change=lambda: st.session_state.settings.update({"safe_mode": st.session_state.safe_mode}) | |
) | |
with tab3: | |
# Assistant Behavior Settings | |
""" | |
st.toggle( | |
"Save Chat History", | |
value=st.session_state.settings["conversation_history"], | |
key="conversation_history", | |
help="Preserve conversation between sessions", | |
on_change=lambda: st.session_state.settings.update({"conversation_history": st.session_state.conversation_history}) | |
) | |
""" | |
# System Message Settings | |
st.text_area( | |
"Default System Message", | |
value=interpreter.system_message, | |
disabled=True, | |
help="Base instructions for the AI assistant", | |
height=100 | |
) | |
st.text_area( | |
"Custom Instructions", | |
value=st.session_state.settings["custom_instructions"], | |
key="custom_instructions", | |
help="Additional instructions for the assistant", | |
height=100, | |
on_change=lambda: st.session_state.settings.update({"custom_instructions": st.session_state.custom_instructions}) | |
) | |
# OS Control Settings | |
st.markdown("---") | |
st.markdown("### System Access Settings") | |
st.markdown("**Warning**: OS mode enables system control (mouse, keyboard, screen access)") | |
st.toggle( | |
"Enable System Control", | |
value=st.session_state.settings["os_mode"], | |
key="os_mode", | |
help="Allow assistant to control system", | |
on_change=lambda: st.session_state.settings.update({"os_mode": st.session_state.os_mode}) | |
) | |
if st.session_state.settings["os_mode"]: | |
st.toggle( | |
"Restricted Access", | |
value=st.session_state.settings["os_restricted_mode"], | |
key="os_restricted_mode", | |
help="Limit system access to specific paths", | |
on_change=lambda: st.session_state.settings.update({"os_restricted_mode": st.session_state.os_restricted_mode}) | |
) | |
if st.session_state.settings["os_restricted_mode"]: | |
st.text_area( | |
"Allowed Paths", | |
value="\n".join(st.session_state.settings["allowed_paths"]), | |
help="One path per line", | |
key="allowed_paths", | |
on_change=lambda: st.session_state.settings.update({ | |
"allowed_paths": [p.strip() for p in st.session_state.allowed_paths.split("\n") if p.strip()] | |
}) | |
) | |
# Add current model display after header | |
col1, col2, col3 = st.columns([0.2, 0.6, 0.2]) | |
with col3: | |
st.markdown( | |
f"""<div style="text-align: right; font-size: 0.8em; color: var(--text-color); opacity: 0.8;"> | |
π€ Model: {st.session_state.selected_model.split('/')[-1]} | |
</div>""", | |
unsafe_allow_html=True | |
) | |
# Initialize selected track in session state if not exists | |
if "selected_audio_track" not in st.session_state: | |
st.session_state.selected_audio_track = "Ambient" | |
# Create floating audio player | |
st.markdown(create_audio_player(), unsafe_allow_html=True) | |
# Simplified workspace control in sidebar | |
with st.sidebar: | |
st.markdown("### ποΈ Workspace Control") | |
# Single workspace toggle in sidebar | |
st.toggle( | |
"Use Workspace", | |
value=st.session_state.settings["use_workspace"], | |
key="use_workspace", | |
help="Restrict file access to workspace directory only", | |
on_change=lambda: ( | |
st.session_state.settings.update({"use_workspace": st.session_state.use_workspace}), | |
apply_interpreter_settings() | |
) | |
) | |
if st.session_state.settings["use_workspace"]: | |
st.markdown("### π Workspace") | |
# Add workspace path input | |
workspace_path = st.text_input( | |
"Workspace Path", | |
value=str(Path(st.session_state.get("session_dir", "autointerpreter-workspace")).resolve()), | |
help="Enter the full path to your workspace directory", | |
key="workspace_path_input" | |
) | |
# Add Set Path button | |
if st.button("π Set Workspace Path", use_container_width=True): | |
try: | |
workspace_dir = Path(workspace_path) | |
workspace_dir.mkdir(parents=True, exist_ok=True) | |
st.session_state.session_dir = str(workspace_dir) | |
apply_interpreter_settings() | |
st.success(f"β Workspace set to: {workspace_path}") | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Error setting workspace path: {str(e)}") | |
# File browser for workspace management | |
event = sfb.st_file_browser( | |
path=st.session_state.get("session_dir", "autointerpreter-workspace"), | |
key="file_browser", | |
show_choose_file=True, | |
show_delete_file=True, | |
show_new_folder=True, | |
show_upload_file=True, | |
show_preview=True | |
) | |
if event: | |
if event.get("type") == "file_selected": | |
st.session_state.session_dir = event["path"] | |
st.code(f"Current workspace: {event['path']}", language="bash") | |
apply_interpreter_settings() | |
elif event.get("type") == "folder_created": | |
st.success(f"Created folder: {event['path']}") | |
elif event.get("type") == "file_deleted": | |
st.warning(f"Deleted: {event['path']}") | |
if str(event['path']) == st.session_state.get('session_dir'): | |
st.session_state.pop('session_dir', None) | |
apply_interpreter_settings() | |
# After settings panel and before chat history display | |
# Apply interpreter settings | |
apply_interpreter_settings() | |
# Display chat history with enhanced formatting | |
for message in st.session_state.messages: | |
with st.chat_message(message.role, avatar="π§βπ»" if message.role == "user" else "π€"): | |
st.markdown(message.get_formatted_content()) | |
# Handle new user input | |
user_input = st.chat_input("Ask me anything about coding...", key="chat_input") | |
if user_input: | |
handle_user_input(user_input) | |
# Add ARIA labels to main UI components | |
st.markdown(""" | |
<div role="main" aria-label="AI Code Assistant Interface"> | |
<div role="complementary" aria-label="Settings Panel"> | |
<!-- Settings content --> | |
</div> | |
<div role="log" aria-label="Chat History"> | |
<!-- Chat messages --> | |
</div> | |
<div role="form" aria-label="Chat Input"> | |
<!-- Input field --> | |
</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Add cleanup of old sessions at the end | |
if st.session_state.settings["use_workspace"]: | |
cleanup_old_sessions() | |
if __name__ == "__main__": | |
main() |