import bleach import gradio as gr import requests import json import os import markdown from markdown.extensions.codehilite import CodeHiliteExtension from markdown.extensions.fenced_code import FencedCodeExtension import markdown.extensions.fenced_code import html import re ALLOWED_TAGS = [ 'b', 'i', 'strong', 'em', 'a', 'p', 'ul', 'ol', 'li', 'br', 'hr', 'blockquote', 'code', 'pre', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'img' ] ALLOWED_ATTRIBUTES = { 'a': ['href', 'title', 'target'], 'img': ['src', 'alt', 'title', 'width', 'height'] } API_URL = "https://host.palple.polrambora.com/pmsq" sessions = {} PRIMARY_SYSTEM_INSTRUCTIONS = "You are P-MSQ (Messaging Service Query), a friendly AI Chatbot that can help in any situations" ASSISTANT_PIC_PATH = "https://huggingface.co/spaces/PLRMB/P-MSQ-API-PREVIEW/resolve/main/API.png" USER_PIC_PATH = "https://huggingface.co/spaces/PLRMB/P-MSQ-API-PREVIEW/resolve/main/usr.png" def render_avatars(userid): try: response = requests.post( 'https://host.palple.polrambora.com/userexistence', json={ 'userid': userid }, timeout=10 ) if response.status_code == 200: response_json = response.json() return response_json["avatar"]["link"] except Exception as e: return None except requests.exceptions.Timeout: return None def authorize(user, api_key, system_message): test_data = { "user": user, "key": api_key } try: response = requests.post( "https://host.palple.polrambora.com/check_key_impv", json=test_data, ) if response.status_code == 200: response_json = response.json() avatar = render_avatars(user) or USER_PIC_PATH if api_key not in sessions: sessions[api_key] = { "history": [], "headers": { "authorization": api_key, "Content-Type": 'application/json' }, "avatar": avatar, "system_message": system_message } return True elif response.status_code == 403: return 403 else: return False except Exception as e: return False def respond(message, api_key, max_tokens, top_p, temperature): session = sessions.get(api_key, {}) headers = session.get("headers", {}) system_message = session.get("system_message", PRIMARY_SYSTEM_INSTRUCTIONS) messages = [ {"role": "system", "content": system_message}, *[ {"role": "user", "content": user_msg} if user_msg else {"role": "assistant", "content": assistant_msg} for user_msg, assistant_msg, _, _, _, _ in session.get("history", []) ], {"role": "user", "content": message} ] data = { "preferences": { "max_char": max_tokens, "temperature": temperature, "top_p": top_p, "system_message": system_message, }, "conversation_history": messages, "input": message, "stream": True, } assistant_reply = "" try: response = requests.post(API_URL, headers=headers, data=json.dumps(data), stream=True) if response.status_code == 200: for line in response.iter_lines(): line = line.decode('utf-8') if line.strip(): try: if line.startswith("data:"): line = line[5:].strip() if line: chunk = json.loads(line) chunk_message = chunk.get("delta", {}).get("content", "") assistant_reply += chunk_message yield assistant_reply # Stream the response incrementally except json.JSONDecodeError: pass session["history"].append((message, assistant_reply, "You", "P-ALPLE", session["avatar"], ASSISTANT_PIC_PATH)) else: yield f"Error: {response.status_code} - {response.text}" except Exception as e: yield f"Error: {str(e)}" def format_code_blocks(text): code_pattern = re.compile(r'```(.*?)```', re.DOTALL) return code_pattern.sub(lambda match: f"
{html.escape(match.group(1))}
", text) def render_markdown(text): return markdown.markdown(text, extensions=[FencedCodeExtension()]) import re def render_message(history): messages_html = """
""" seen_messages = set() for user_message, assistant_message, user_profile, assistant_profile, user_pic, assistant_pic in history: if user_message and ("user", user_message) not in seen_messages: seen_messages.add(("user", user_message)) user_message_html = process_message(user_message) messages_html += f"""
{user_message_html}
""" if assistant_message and ("assistant", assistant_message) not in seen_messages: seen_messages.add(("assistant", assistant_message)) assistant_message_html = process_message(assistant_message) messages_html += f"""
{assistant_message_html}
""" messages_html += "
" return messages_html def process_message(message): code_block_pattern = re.compile(r"```(\w+)?\n(.*?)```", re.DOTALL) parts = code_block_pattern.split(message) for i in range(len(parts)): if i % 3 == 0: parts[i] = escape_html(parts[i]) parts[i] = render_inline_markdown(parts[i]) elif i % 3 == 1: continue else: lang_class = parts[i - 1] or "" parts[i] = f"
{escape_html(parts[i])}
" return "".join(parts) def escape_html(text): return (text .replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) .replace("'", "'")) def render_inline_markdown(text): text = markdown.markdown(text, output_format='xhtml') return text css=""" .chatbox {height: 400px; overflow: auto; border: 1px solid #262626; padding: 10px; background-color: #171717; display: flex; flex-direction: column-reverse;} """ with gr.Blocks(css=css) as demo: with gr.Column(visible=True) as auth_view: gr.Markdown("## P-MSQ Authorization") gr.Markdown("P-MSQ is in closed alpha test! The model, api and more are subject to change.") api_user_input = gr.Textbox(placeholder="snowflake", label="UserID", type='email') api_key_input = gr.Textbox(placeholder="Enter your API key", label="Token", type='password') auth_button = gr.Button("Authorize") auth_status = gr.Textbox(label="Authorization Status", interactive=False) with gr.Column(visible=False) as chat_view: gr.HTML(""" """) gr.Markdown("## P-MSQ Chat Interface") chatbot_output = gr.HTML(elem_id="chatbox-container") gr.Markdown(elem_id="chatbox-container") msg_input = gr.Text( show_label=False, placeholder="Type your message and press Shift+Enter...", lines=2, elem_id="input-text" ) send_btn = gr.Button("Send") regen_btn = gr.Button("Clear") system_instructions_input = gr.Textbox(placeholder="Enter custom instructions (optional)", label="Custom System Instructions", lines=2) save_instructions_btn = gr.Button("Save Instructions") gr.Markdown("### Settings") max_tokens = gr.Slider(minimum=1, maximum=2048, value=1024, step=1, label="Max new tokens") top_p = gr.Slider(minimum=0, maximum=2, value=0.8, step=0.1, label="Top P") temperature = gr.Slider(minimum=0.1, maximum=1, value=0.7, step=0.1, label="Temperature") history_state = gr.State([]) last_message_state = gr.State("") def user_interaction(message, history, api_key, max_tokens, top_p, temperature): loading_message = history + [(message, "Loading...", "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)] yield render_message(loading_message), loading_message, "" assistant_response = "" for partial_reply in respond(message, api_key, max_tokens, top_p, temperature): assistant_response = partial_reply # Accumulate streamed response partial_history = history + [(message, assistant_response, "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)] yield render_message(partial_history), partial_history, "" final_history = history + [(message, assistant_response, "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)] yield render_message(final_history), final_history, "" def regenerate_response(history, last_message, max_tokens, top_p, temperature): return "", [] def clear_history(api_key): if api_key in sessions: sessions[api_key]["history"] = [] return "", [] def load_conversation(api_key): session = sessions.get(api_key, {}) history = session.get("history", []) return render_message(history), history msg_input.submit( user_interaction, inputs=[msg_input, history_state, api_key_input, max_tokens, top_p, temperature], outputs=[chatbot_output, history_state, msg_input], ) send_btn.click( user_interaction, inputs=[msg_input, history_state, api_key_input, max_tokens, top_p, temperature], outputs=[chatbot_output, history_state, msg_input], ) regen_btn.click(clear_history, inputs=[api_key_input], outputs=[chatbot_output, history_state]) with gr.Column(visible=False) as blacklist_view: gr.Markdown("## P-MSQ Authorization") gr.Markdown("Your linked ID appears to be blacklisted, and your API Key is pending on removal, if you believe this is a mistake, please try reaching us out.") def authorize_and_proceed(user, api_key): if authorize(user, api_key, PRIMARY_SYSTEM_INSTRUCTIONS): gr.Info("Loading, please wait.") messages_html, history = load_conversation(api_key) return ( gr.update(visible=False), gr.update(visible=True), messages_html, history ) elif authorize(user, api_key, PRIMARY_SYSTEM_INSTRUCTIONS) == 403: return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), ) else: gr.Warning("Incorrect userid/token") return ( gr.update(visible=True), gr.update(visible=False), auth_status.update(value="Invalid userid/token") ) def save_custom_instructions(api_key, custom_instructions): if api_key in sessions: gr.Info("Instructions updated, we recommend to start the new conversation to make it more efficient.") sessions[api_key]["system_message"] = custom_instructions return "Instructions updated!", gr.update(value="") else: gr.Warning("Your session has been expired, please refresh the page and login again.") return "Session not found.", gr.update(value="") auth_button.click(authorize_and_proceed, inputs=[api_user_input, api_key_input], outputs=[auth_view, chat_view, chatbot_output, history_state]) save_instructions_btn.click(save_custom_instructions, inputs=[api_key_input, system_instructions_input], outputs=auth_status) demo.launch(show_api=True) if __name__ == "__main__": demo.queue = False