import gradio as gr import requests import json import os import markdown from markdown.extensions.codehilite import CodeHiliteExtension import markdown.extensions.fenced_code import bleach from html import escape API_URL = "https://host.palple.polrambora.com/pmsq" sessions = {} PRIMARY_SYSTEM_INSTRUCTIONS = "You are P-MSQ (Messaging Service Query), a friendly AI Chatbot that can help in any situations" ASSISTANT_PIC_PATH = "https://huggingface.co/spaces/PLRMB/P-MSQ-API-PREVIEW/resolve/main/API.png" USER_PIC_PATH = "https://huggingface.co/spaces/PLRMB/P-MSQ-API-PREVIEW/resolve/main/usr.png" def render_avatars(userid): try: response = requests.post( 'https://host.palple.polrambora.com/userexistence', json={ 'userid': userid }, timeout=10 ) if response.status_code == 200: response_json = response.json() return response_json["avatar"]["link"] except Exception as e: return None except requests.exceptions.Timeout: return None def authorize(user, api_key, system_message): test_data = { "user": user, "key": api_key } try: response = requests.post( "https://host.palple.polrambora.com/check_key_impv", json=test_data, ) if response.status_code == 200: response_json = response.json() avatar = render_avatars(user) or USER_PIC_PATH if api_key not in sessions: sessions[api_key] = { "history": [], "headers": { "authorization": api_key, "Content-Type": 'application/json' }, "avatar": avatar, "system_message": system_message } return True elif response.status_code == 403: return 403 else: return False except Exception as e: return False def respond(message, api_key, max_tokens, top_p, temperature): session = sessions.get(api_key, {}) headers = session.get("headers", {}) system_message = session.get("system_message", PRIMARY_SYSTEM_INSTRUCTIONS) messages = [ {"role": "system", "content": system_message}, *[ {"role": "user", "content": user_msg} if user_msg else {"role": "assistant", "content": assistant_msg} for user_msg, assistant_msg, _, _, _, _ in session.get("history", []) ], {"role": "user", "content": message} ] data = { "preferences": { "max_char": max_tokens, "temperature": temperature, "top_p": top_p, "system_message": system_message, }, "conversation_history": messages, "input": message, "stream": True, } assistant_reply = "" try: response = requests.post(API_URL, headers=headers, data=json.dumps(data), stream=True) if response.status_code == 200: for line in response.iter_lines(decode_unicode=True): if line.strip(): try: if line.startswith("data:"): line = line[5:].strip() if line: chunk = json.loads(line) chunk_message = chunk.get("delta", {}).get("content", "") assistant_reply += chunk_message yield assistant_reply # Stream the response incrementally except json.JSONDecodeError: pass session["history"].append((message, assistant_reply, "You", "P-ALPLE", session["avatar"], ASSISTANT_PIC_PATH)) else: yield f"Error: {response.status_code} - {response.text}" except Exception as e: yield f"Error: {str(e)}" def render_message(history): messages_html = """