POLRAMBORA's picture
Update app.py
af6836b verified
raw
history blame
13.9 kB
import bleach
import gradio as gr
import requests
import json
import os
import markdown
from markdown.extensions.codehilite import CodeHiliteExtension
from markdown.extensions.fenced_code import FencedCodeExtension
import markdown.extensions.fenced_code
import html
import re
ALLOWED_TAGS = [
'b', 'i', 'strong', 'em', 'a', 'p', 'ul', 'ol', 'li', 'br', 'hr',
'blockquote', 'code', 'pre', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'img'
]
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title', 'target'],
'img': ['src', 'alt', 'title', 'width', 'height']
}
API_URL = "https://host.palple.polrambora.com/pmsq"
sessions = {}
PRIMARY_SYSTEM_INSTRUCTIONS = "You are P-MSQ (Messaging Service Query), a friendly AI Chatbot that can help in any situations"
ASSISTANT_PIC_PATH = "https://huggingface.co/spaces/PLRMB/P-MSQ-API-PREVIEW/resolve/main/API.png"
USER_PIC_PATH = "https://huggingface.co/spaces/PLRMB/P-MSQ-API-PREVIEW/resolve/main/usr.png"
def render_avatars(userid):
try:
response = requests.post(
'https://host.palple.polrambora.com/userexistence',
json={
'userid': userid
},
timeout=10
)
if response.status_code == 200:
response_json = response.json()
return response_json["avatar"]["link"]
except Exception as e:
return None
except requests.exceptions.Timeout:
return None
def authorize(user, api_key, system_message):
test_data = {
"user": user,
"key": api_key
}
try:
response = requests.post(
"https://host.palple.polrambora.com/check_key_impv",
json=test_data,
)
if response.status_code == 200:
response_json = response.json()
avatar = render_avatars(user) or USER_PIC_PATH
if api_key not in sessions:
sessions[api_key] = {
"history": [],
"headers": {
"authorization": api_key,
"Content-Type": 'application/json'
},
"avatar": avatar,
"system_message": system_message
}
return True
elif response.status_code == 403:
return 403
else:
return False
except Exception as e:
return False
def respond(message, api_key, max_tokens, top_p, temperature):
session = sessions.get(api_key, {})
headers = session.get("headers", {})
system_message = session.get("system_message", PRIMARY_SYSTEM_INSTRUCTIONS)
messages = [
{"role": "system", "content": system_message},
*[
{"role": "user", "content": user_msg} if user_msg else {"role": "assistant", "content": assistant_msg}
for user_msg, assistant_msg, _, _, _, _ in session.get("history", [])
],
{"role": "user", "content": message}
]
data = {
"preferences": {
"max_char": max_tokens,
"temperature": temperature,
"top_p": top_p,
"system_message": system_message,
},
"conversation_history": messages,
"input": message,
"stream": True,
}
assistant_reply = ""
try:
response = requests.post(API_URL, headers=headers, data=json.dumps(data), stream=True)
if response.status_code == 200:
for line in response.iter_lines():
line = line.decode('utf-8')
if line.strip():
try:
if line.startswith("data:"):
line = line[5:].strip()
if line:
chunk = json.loads(line)
chunk_message = chunk.get("delta", {}).get("content", "")
assistant_reply += chunk_message
yield assistant_reply # Stream the response incrementally
except json.JSONDecodeError:
pass
session["history"].append((message, assistant_reply, "You", "P-ALPLE", session["avatar"], ASSISTANT_PIC_PATH))
else:
yield f"Error: {response.status_code} - {response.text}"
except Exception as e:
yield f"Error: {str(e)}"
def format_code_blocks(text):
code_pattern = re.compile(r'```(.*?)```', re.DOTALL)
return code_pattern.sub(lambda match: f"<pre><code>{html.escape(match.group(1))}</code></pre>", text)
def render_markdown(text):
return markdown.markdown(text, extensions=[FencedCodeExtension()])
import re
def render_message(history):
messages_html = """
<div id="chatbox-container" class="chatbox" style="height: 400px; overflow: auto; border: 1px solid #262626; padding: 10px; background-color: #171717; display: flex; flex-direction: column-reverse;">
<div id="messages" style="display: block; margin-bottom: 10px;">"""
seen_messages = set()
for user_message, assistant_message, user_profile, assistant_profile, user_pic, assistant_pic in history:
if user_message and ("user", user_message) not in seen_messages:
seen_messages.add(("user", user_message))
user_message_html = process_message(user_message)
messages_html += f"""
<div style='display: flex; flex-direction: column; align-items: flex-start; margin-bottom: 10px;'>
<img src='{user_pic}' style='width: 40px; height: 40px; border-radius: 50%; margin-bottom: 5px;'>
<div style='color: white; white-space: pre-wrap;'>{user_message_html}</div>
</div>"""
if assistant_message and ("assistant", assistant_message) not in seen_messages:
seen_messages.add(("assistant", assistant_message))
assistant_message_html = process_message(assistant_message)
messages_html += f"""
<div style='display: flex; flex-direction: column; align-items: flex-start; margin-bottom: 10px;'>
<img src='{assistant_pic}' style='width: 40px; height: 40px; border-radius: 50%; margin-bottom: 5px;'>
<div style='color: white; white-space: pre-wrap;'>{assistant_message_html}</div>
</div>"""
messages_html += "</div></div>"
return messages_html
def process_message(message):
code_block_pattern = re.compile(r"```(\w+)?\n(.*?)```", re.DOTALL)
parts = code_block_pattern.split(message)
for i in range(len(parts)):
if i % 3 == 0:
parts[i] = escape_html(parts[i])
parts[i] = render_inline_markdown(parts[i])
elif i % 3 == 1:
continue
else:
lang_class = parts[i - 1] or ""
parts[i] = f"<pre><code class='language-{lang_class}'>{escape_html(parts[i])}</code></pre>"
return "".join(parts)
def escape_html(text):
return (text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#x27;"))
def render_inline_markdown(text):
text = markdown.markdown(text, output_format='xhtml')
return text
css="""
.chatbox {height: 400px; overflow: auto; border: 1px solid #262626; padding: 10px; background-color: #171717; display: flex; flex-direction: column-reverse;}
"""
with gr.Blocks(css=css) as demo:
with gr.Column(visible=False) as auth_view:
gr.Markdown("## P-MSQ Authorization")
gr.Markdown("P-MSQ is in closed alpha test! The model, api and more are subject to change.")
api_user_input = gr.Textbox(placeholder="snowflake", label="UserID", type='email')
api_key_input = gr.Textbox(placeholder="Enter your API key", label="Token", type='password')
auth_button = gr.Button("Authorize")
auth_status = gr.Textbox(label="Authorization Status", interactive=False)
with gr.Column(visible=False) as chat_view:
gr.HTML("""
<link href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism.css" rel="stylesheet">
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-python.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-javascript.min.js"></script>
<script>
MathJax = {
tex: { inlineMath: [['\\(', '\\)'], ['\\[', '\\]']] },
svg: { scale: 1.2 }
};
</script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/3.2.2/es5/tex-svg.min.js"></script>
""")
gr.Markdown("## P-MSQ Chat Interface")
chatbot_output = gr.HTML(elem_id="chatbox-container")
gr.Markdown(elem_id="chatbox-container")
msg_input = gr.Text(
show_label=False,
placeholder="Type your message and press Shift+Enter...",
lines=2,
elem_id="input-text"
)
send_btn = gr.Button("Send")
regen_btn = gr.Button("Clear")
system_instructions_input = gr.Textbox(placeholder="Enter custom instructions (optional)",
label="Custom System Instructions",
lines=2)
save_instructions_btn = gr.Button("Save Instructions")
gr.Markdown("### Settings")
max_tokens = gr.Slider(minimum=1, maximum=2048, value=1024, step=1, label="Max new tokens")
top_p = gr.Slider(minimum=0, maximum=2, value=0.8, step=0.1, label="Top P")
temperature = gr.Slider(minimum=0.1, maximum=1, value=0.7, step=0.1, label="Temperature")
history_state = gr.State([])
last_message_state = gr.State("")
def user_interaction(message, history, api_key, max_tokens, top_p, temperature):
loading_message = history + [(message, "Loading...", "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)]
yield render_message(loading_message), loading_message, ""
assistant_response = ""
for partial_reply in respond(message, api_key, max_tokens, top_p, temperature):
assistant_response = partial_reply # Accumulate streamed response
partial_history = history + [(message, assistant_response, "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)]
yield render_message(partial_history), partial_history, ""
final_history = history + [(message, assistant_response, "You", "P-ALPLE", sessions[api_key]["avatar"], ASSISTANT_PIC_PATH)]
yield render_message(final_history), final_history, ""
def regenerate_response(history, last_message, max_tokens, top_p, temperature):
return "", []
def clear_history(api_key):
if api_key in sessions:
sessions[api_key]["history"] = []
return "", []
def load_conversation(api_key):
session = sessions.get(api_key, {})
history = session.get("history", [])
return render_message(history), history
msg_input.submit(
user_interaction,
inputs=[msg_input, history_state, api_key_input, max_tokens, top_p, temperature],
outputs=[chatbot_output, history_state, msg_input],
)
send_btn.click(
user_interaction,
inputs=[msg_input, history_state, api_key_input, max_tokens, top_p, temperature],
outputs=[chatbot_output, history_state, msg_input],
)
regen_btn.click(clear_history,
inputs=[api_key_input],
outputs=[chatbot_output, history_state])
with gr.Column(visible=True) as blacklist_view:
gr.Markdown("## Sunsetting gradio P-MSQ interface")
gr.Markdown("P-MSQ Gradio Interface is moving to another - [Next.js powered PMSQ](https://pmsq.polrambora.com)")
def authorize_and_proceed(user, api_key):
if authorize(user, api_key, PRIMARY_SYSTEM_INSTRUCTIONS):
gr.Info("Loading, please wait.")
messages_html, history = load_conversation(api_key)
return (
gr.update(visible=False),
gr.update(visible=True),
messages_html,
history
)
elif authorize(user, api_key, PRIMARY_SYSTEM_INSTRUCTIONS) == 403:
return (
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=True),
)
else:
gr.Warning("Incorrect userid/token")
return (
gr.update(visible=True),
gr.update(visible=False),
auth_status.update(value="Invalid userid/token")
)
def save_custom_instructions(api_key, custom_instructions):
if api_key in sessions:
gr.Info("Instructions updated, we recommend to start the new conversation to make it more efficient.")
sessions[api_key]["system_message"] = custom_instructions
return "Instructions updated!", gr.update(value="")
else:
gr.Warning("Your session has been expired, please refresh the page and login again.")
return "Session not found.", gr.update(value="")
auth_button.click(authorize_and_proceed, inputs=[api_user_input, api_key_input], outputs=[auth_view, chat_view, chatbot_output, history_state])
save_instructions_btn.click(save_custom_instructions, inputs=[api_key_input, system_instructions_input], outputs=auth_status)
demo.launch(show_api=False)
if __name__ == "__main__":
demo.queue = False