import json import httpx from datetime import datetime from log_config import logger async def generate_sse_response(timestamp, model, content=None, tools_id=None, function_call_name=None, function_call_content=None, role=None, tokens_use=None, total_tokens=None): sample_data = { "id": "chatcmpl-9ijPeRHa0wtyA2G8wq5z8FC3wGMzc", "object": "chat.completion.chunk", "created": timestamp, "model": model, "system_fingerprint": "fp_d576307f90", "choices": [ { "index": 0, "delta": {"content": content}, "logprobs": None, "finish_reason": None } ], "usage": None } if function_call_content: sample_data["choices"][0]["delta"] = {"tool_calls":[{"index":0,"function":{"arguments": function_call_content}}]} if tools_id and function_call_name: sample_data["choices"][0]["delta"] = {"tool_calls":[{"index":0,"id": tools_id,"type":"function","function":{"name": function_call_name, "arguments":""}}]} # sample_data["choices"][0]["delta"] = {"tool_calls":[{"index":0,"function":{"id": tools_id, "name": function_call_name}}]} if role: sample_data["choices"][0]["delta"] = {"role": role, "content": ""} json_data = json.dumps(sample_data, ensure_ascii=False) # 构建SSE响应 sse_response = f"data: {json_data}\n\r" return sse_response async def check_response(response, error_log): if response.status_code != 200: error_message = await response.aread() error_str = error_message.decode('utf-8', errors='replace') try: error_json = json.loads(error_str) except json.JSONDecodeError: error_json = error_str return {"error": f"{error_log} HTTP Error {response.status_code}", "details": error_json} return None async def fetch_gemini_response_stream(client, url, headers, payload, model): timestamp = datetime.timestamp(datetime.now()) async with client.stream('POST', url, headers=headers, json=payload) as response: error_message = await check_response(response, "fetch_gemini_response_stream") if error_message: yield error_message return buffer = "" revicing_function_call = False function_full_response = "{" need_function_call = False async for chunk in response.aiter_text(): buffer += chunk while "\n" in buffer: line, buffer = buffer.split("\n", 1) # print(line) if line and '\"text\": \"' in line: try: json_data = json.loads( "{" + line + "}") content = json_data.get('text', '') content = "\n".join(content.split("\\n")) sse_string = await generate_sse_response(timestamp, model, content) yield sse_string except json.JSONDecodeError: logger.error(f"无法解析JSON: {line}") if line and ('\"functionCall\": {' in line or revicing_function_call): revicing_function_call = True need_function_call = True if ']' in line: revicing_function_call = False continue function_full_response += line if need_function_call: function_call = json.loads(function_full_response) function_call_name = function_call["functionCall"]["name"] sse_string = await generate_sse_response(timestamp, model, content=None, tools_id="chatcmpl-9inWv0yEtgn873CxMBzHeCeiHctTV", function_call_name=function_call_name) yield sse_string function_full_response = json.dumps(function_call["functionCall"]["args"]) sse_string = await generate_sse_response(timestamp, model, content=None, tools_id="chatcmpl-9inWv0yEtgn873CxMBzHeCeiHctTV", function_call_name=None, function_call_content=function_full_response) yield sse_string yield "data: [DONE]\n\r" async def fetch_vertex_claude_response_stream(client, url, headers, payload, model): timestamp = datetime.timestamp(datetime.now()) async with client.stream('POST', url, headers=headers, json=payload) as response: error_message = await check_response(response, "fetch_vertex_claude_response_stream") if error_message: yield error_message return buffer = "" revicing_function_call = False function_full_response = "{" need_function_call = False async for chunk in response.aiter_text(): buffer += chunk while "\n" in buffer: line, buffer = buffer.split("\n", 1) logger.info(f"{line}") if line and '\"text\": \"' in line: try: json_data = json.loads( "{" + line + "}") content = json_data.get('text', '') content = "\n".join(content.split("\\n")) sse_string = await generate_sse_response(timestamp, model, content) yield sse_string except json.JSONDecodeError: logger.error(f"无法解析JSON: {line}") if line and ('\"type\": \"tool_use\"' in line or revicing_function_call): revicing_function_call = True need_function_call = True if ']' in line: revicing_function_call = False continue function_full_response += line if need_function_call: function_call = json.loads(function_full_response) function_call_name = function_call["name"] function_call_id = function_call["id"] sse_string = await generate_sse_response(timestamp, model, content=None, tools_id=function_call_id, function_call_name=function_call_name) yield sse_string function_full_response = json.dumps(function_call["input"]) sse_string = await generate_sse_response(timestamp, model, content=None, tools_id=function_call_id, function_call_name=None, function_call_content=function_full_response) yield sse_string yield "data: [DONE]\n\r" async def fetch_gpt_response_stream(client, url, headers, payload, max_redirects=5): redirect_count = 0 while redirect_count < max_redirects: # logger.info(f"fetch_gpt_response_stream: {url}") async with client.stream('POST', url, headers=headers, json=payload) as response: error_message = await check_response(response, "fetch_gpt_response_stream") if error_message: yield error_message return buffer = "" try: async for chunk in response.aiter_text(): # logger.info(f"chunk: {repr(chunk)}") buffer += chunk if chunk.startswith("