Spaces:
Sleeping
Sleeping
import bleach | |
import gradio as gr | |
import requests | |
import json | |
import os | |
import markdown | |
from markdown.extensions.codehilite import CodeHiliteExtension | |
from markdown.extensions.fenced_code import FencedCodeExtension | |
import markdown.extensions.fenced_code | |
import html | |
import re | |
ALLOWED_TAGS = [ | |
'b', 'i', 'strong', 'em', 'a', 'p', 'ul', 'ol', 'li', 'br', 'hr', | |
'blockquote', 'code', 'pre', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'img' | |
] | |
ALLOWED_ATTRIBUTES = { | |
'a': ['href', 'title', 'target'], | |
'img': ['src', 'alt', 'title', 'width', 'height'] | |
} | |
API_URL = "https://host.palple.polrambora.com/pmsq" | |
sessions = {} | |
PRIMARY_SYSTEM_INSTRUCTIONS = "You are P-MSQ (Messaging Service Query), a friendly AI Chatbot that can help in any situations" | |
ASSISTANT_PIC_PATH = "https://huggingface.co/spaces/PLRMB/P-MSQ-API-PREVIEW/resolve/main/API.png" | |
USER_PIC_PATH = "https://huggingface.co/spaces/PLRMB/P-MSQ-API-PREVIEW/resolve/main/usr.png" | |
def render_avatars(userid): | |
try: | |
response = requests.post( | |
'https://host.palple.polrambora.com/userexistence', | |
json={ | |
'userid': userid | |
}, | |
timeout=10 | |
) | |
if response.status_code == 200: | |
response_json = response.json() | |
return response_json["avatar"]["link"] | |
except Exception as e: | |
return None | |
except requests.exceptions.Timeout: | |
return None | |
def authorize(user, api_key, system_message): | |
test_data = { | |
"user": user, | |
"key": api_key | |
} | |
try: | |
response = requests.post( | |
"https://host.palple.polrambora.com/check_key_impv", | |
json=test_data, | |
) | |
if response.status_code == 200: | |
response_json = response.json() | |
avatar = render_avatars(user) or USER_PIC_PATH | |
if api_key not in sessions: | |
sessions[api_key] = { | |
"history": [], | |
"headers": { | |
"authorization": api_key, | |
"Content-Type": 'application/json' | |
}, | |
"avatar": avatar, | |
"system_message": system_message | |
} | |
return True | |
elif response.status_code == 403: | |
return 403 | |
else: | |
return False | |
except Exception as e: | |
return False | |
def respond(message, api_key, max_tokens, top_p, temperature): | |
session = sessions.get(api_key, {}) | |
headers = session.get("headers", {}) | |
system_message = session.get("system_message", PRIMARY_SYSTEM_INSTRUCTIONS) | |
messages = [ | |
{"role": "system", "content": system_message}, | |
*[ | |
{"role": "user", "content": user_msg} if user_msg else {"role": "assistant", "content": assistant_msg} | |
for user_msg, assistant_msg, _, _, _, _ in session.get("history", []) | |
], | |
{"role": "user", "content": message} | |
] | |
data = { | |
"preferences": { | |
"max_char": max_tokens, | |
"temperature": temperature, | |
"top_p": top_p, | |
"system_message": system_message, | |
}, | |
"conversation_history": messages, | |
"input": message, | |
"stream": True, | |
} | |
assistant_reply = "" | |
try: | |
response = requests.post(API_URL, headers=headers, data=json.dumps(data), stream=True) | |
if response.status_code == 200: | |
for line in response.iter_lines(): | |
line = line.decode('utf-8') | |
if line.strip(): | |
try: | |
if line.startswith("data:"): | |
line = line[5:].strip() | |
if line: | |
chunk = json.loads(line) | |
chunk_message = chunk.get("delta", {}).get("content", "") | |
assistant_reply += chunk_message | |
yield assistant_reply # Stream the response incrementally | |
except json.JSONDecodeError: | |
pass | |
session["history"].append((message, assistant_reply, "You", "P-ALPLE", session["avatar"], ASSISTANT_PIC_PATH)) | |
else: | |
yield f"Error: {response.status_code} - {response.text}" | |
except Exception as e: | |
yield f"Error: {str(e)}" | |
def format_code_blocks(text): | |
code_pattern = re.compile(r'```(.*?)```', re.DOTALL) | |
return code_pattern.sub(lambda match: f"<pre><code>{html.escape(match.group(1))}</code></pre>", text) | |
def render_markdown(text): | |
return markdown.markdown(text, extensions=[FencedCodeExtension()]) | |
import re | |
def render_message(history): | |
messages_html = """ | |
<div id="chatbox-container" class="chatbox" style="height: 400px; overflow: auto; border: 1px solid #262626; padding: 10px; background-color: #171717; display: flex; flex-direction: column-reverse;"> | |
<div id="messages" style="display: block; margin-bottom: 10px;">""" | |
seen_messages = set() | |
for user_message, assistant_message, user_profile, assistant_profile, user_pic, assistant_pic in history: | |
if user_message and ("user", user_message) not in seen_messages: | |
seen_messages.add(("user", user_message)) | |
user_message_html = process_message(user_message) | |
messages_html += f""" | |
<div style='display: flex; flex-direction: column; align-items: flex-start; margin-bottom: 10px;'> | |
<img src='{user_pic}' style='width: 40px; height: 40px; border-radius: 50%; margin-bottom: 5px;'> | |
<div style='color: white; white-space: pre-wrap;'>{user_message_html}</div> | |
</div>""" | |
if assistant_message and ("assistant", assistant_message) not in seen_messages: | |
seen_messages.add(("assistant", assistant_message)) | |
assistant_message_html = process_message(assistant_message) | |
messages_html += f""" | |
<div style='display: flex; flex-direction: column; align-items: flex-start; margin-bottom: 10px;'> | |
<img src='{assistant_pic}' style='width: 40px; height: 40px; border-radius: 50%; margin-bottom: 5px;'> | |
<div style='color: white; white-space: pre-wrap;'>{assistant_message_html}</div> | |
</div>""" | |
messages_html += "</div></div>" | |
return messages_html | |
def process_message(message): | |
code_block_pattern = re.compile(r"```(\w+)?\n(.*?)```", re.DOTALL) | |
parts = code_block_pattern.split(message) | |
for i in range(len(parts)): | |
if i % 3 == 0: | |
parts[i] = escape_html(parts[i]) | |
parts[i] = render_inline_markdown(parts[i]) | |
elif i % 3 == 1: | |
continue | |
else: | |
lang_class = parts[i - 1] or "" | |
parts[i] = f"<pre><code class='language-{lang_class}'>{escape_html(parts[i])}</code></pre>" | |
return "".join(parts) | |
def escape_html(text): | |
return (text | |
.replace("&", "&") | |
.replace("<", "<") | |
.replace(">", ">") | |
.replace('"', """) | |
.replace("'", "'")) | |
def render_inline_markdown(text): | |
text = markdown.markdown(text, output_format='xhtml') | |
return text | |
css=""" | |
.chatbox {height: 400px; overflow: auto; border: 1px solid #262626; padding: 10px; background-color: #171717; display: flex; flex-direction: column-reverse;} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(visible=False) as auth_view: | |
gr.Markdown("## P-MSQ Authorization") | |
gr.Markdown("P-MSQ is in closed alpha test! The model, api and more are subject to change.") | |
api_user_input = gr.Textbox(placeholder="snowflake", label="UserID", type='email') | |
api_key_input = gr.Textbox(placeholder="Enter your API key", label="Token", type='password') | |
auth_button = gr.Button("Authorize") | |
auth_status = gr.Textbox(label="Authorization Status", interactive=False) | |
with gr.Column(visible=False) as chat_view: | |
gr.HTML(""" | |
<link href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism.css" rel="stylesheet"> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-python.min.js"></script> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-javascript.min.js"></script> | |
<script> | |
MathJax = { | |
tex: { inlineMath: [['\\(', '\\)'], ['\\[', '\\]']] }, | |
svg: { scale: 1.2 } | |
}; | |
</script> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/3.2.2/es5/tex-svg.min.js"></script> | |
""") | |
gr.Markdown("## P-MSQ Chat Interface") | |
chatbot_output = gr.HTML(elem_id="chatbox-container") | |
gr.Markdown(elem_id="chatbox-container") | |
msg_input = gr.Text( | |
show_label=False, | |
placeholder="Type your message and press Shift+Enter...", | |
lines=2, | |
elem_id="input-text" | |
) | |
send_btn = gr.Button("Send") | |
regen_btn = gr.Button("Clear") | |
system_instructions_input = gr.Textbox(placeholder="Enter custom instructions (optional)", | |
label="Custom System Instructions", | |
lines=2) | |
save_instructions_btn = gr.Button("Save Instructions") | |
gr.Markdown("### Settings") | |
max_tokens = gr.Slider(minimum=1, maximum=2048, value=1024, step=1, label="Max new tokens") | |
top_p = gr.Slider(minimum=0, maximum=2, value=0.8, step=0.1, label="Top P") | |
temperature = gr.Slider(minimum=0.1, maximum=1, value=0.7, step=0.1, label="Temperature") | |
history_state = gr.State([]) | |
last_message_state = gr.State("") | |
def user_interaction(message, history, api_key, max_tokens, top_p, temperature): | |
loading_message = history + [(message, "Loading...", "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)] | |
yield render_message(loading_message), loading_message, "" | |
assistant_response = "" | |
for partial_reply in respond(message, api_key, max_tokens, top_p, temperature): | |
assistant_response = partial_reply # Accumulate streamed response | |
partial_history = history + [(message, assistant_response, "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)] | |
yield render_message(partial_history), partial_history, "" | |
final_history = history + [(message, assistant_response, "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)] | |
yield render_message(final_history), final_history, "" | |
def regenerate_response(history, last_message, max_tokens, top_p, temperature): | |
return "", [] | |
def clear_history(api_key): | |
if api_key in sessions: | |
sessions[api_key]["history"] = [] | |
return "", [] | |
def load_conversation(api_key): | |
session = sessions.get(api_key, {}) | |
history = session.get("history", []) | |
return render_message(history), history | |
msg_input.submit( | |
user_interaction, | |
inputs=[msg_input, history_state, api_key_input, max_tokens, top_p, temperature], | |
outputs=[chatbot_output, history_state, msg_input], | |
) | |
send_btn.click( | |
user_interaction, | |
inputs=[msg_input, history_state, api_key_input, max_tokens, top_p, temperature], | |
outputs=[chatbot_output, history_state, msg_input], | |
) | |
regen_btn.click(clear_history, | |
inputs=[api_key_input], | |
outputs=[chatbot_output, history_state]) | |
with gr.Column(visible=True) as blacklist_view: | |
gr.Markdown("## Sunsetting gradio P-MSQ interface") | |
gr.Markdown("P-MSQ Gradio Interface is moving to another - [Next.js powered PMSQ](https://pmsq.polrambora.com)") | |
def authorize_and_proceed(user, api_key): | |
if authorize(user, api_key, PRIMARY_SYSTEM_INSTRUCTIONS): | |
gr.Info("Loading, please wait.") | |
messages_html, history = load_conversation(api_key) | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True), | |
messages_html, | |
history | |
) | |
elif authorize(user, api_key, PRIMARY_SYSTEM_INSTRUCTIONS) == 403: | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=True), | |
) | |
else: | |
gr.Warning("Incorrect userid/token") | |
return ( | |
gr.update(visible=True), | |
gr.update(visible=False), | |
auth_status.update(value="Invalid userid/token") | |
) | |
def save_custom_instructions(api_key, custom_instructions): | |
if api_key in sessions: | |
gr.Info("Instructions updated, we recommend to start the new conversation to make it more efficient.") | |
sessions[api_key]["system_message"] = custom_instructions | |
return "Instructions updated!", gr.update(value="") | |
else: | |
gr.Warning("Your session has been expired, please refresh the page and login again.") | |
return "Session not found.", gr.update(value="") | |
auth_button.click(authorize_and_proceed, inputs=[api_user_input, api_key_input], outputs=[auth_view, chat_view, chatbot_output, history_state]) | |
save_instructions_btn.click(save_custom_instructions, inputs=[api_key_input, system_instructions_input], outputs=auth_status) | |
demo.launch(show_api=False) | |
if __name__ == "__main__": | |
demo.queue = False |