File size: 18,062 Bytes
1af48fa 4e57426 819dd2f a1d3f78 73319d1 0a30d42 1af48fa 1de140d 4e57426 1af48fa 4e57426 1af48fa 73319d1 1af48fa e09244d cb6cbda e09244d 52bcfe4 1de140d 09c584b 1de140d 1af48fa 0a30d42 1af48fa 73319d1 4e57426 73319d1 4e57426 73319d1 2ec0842 3d5dc7e 2ec0842 3972d74 2ec0842 1af48fa 237c936 819dd2f 2ec0842 819dd2f cb6cbda 819dd2f 93c3f12 819dd2f 1af48fa 819dd2f 1af48fa 1de140d 1af48fa a1d3f78 1af48fa cb6cbda 0a30d42 1af48fa 7d44776 237c936 7d44776 2ec0842 7d44776 95ca783 7d44776 1de140d 7d44776 0a30d42 7d44776 95ca783 6257666 8eca72e 133f622 8eca72e 6257666 133f622 95ca783 0a30d42 95ca783 eb02b52 0a30d42 eb02b52 c405f98 237c936 819dd2f 2ec0842 819dd2f 1de140d 2cdadb6 6916ae4 2cdadb6 819dd2f e81d06a 3e3ea9a 819dd2f 1de140d 819dd2f e09244d 1de140d 819dd2f 1de140d 819dd2f 1af48fa 819dd2f 0a30d42 819dd2f 73319d1 17409c4 2ec0842 73319d1 1af48fa 1130ba9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
import json
import httpx
import random
import string
from datetime import datetime
from log_config import logger
from utils import safe_get
# end_of_line = "\n\r\n"
# end_of_line = "\r\n"
# end_of_line = "\n\r"
end_of_line = "\n\n"
# end_of_line = "\r"
# end_of_line = "\n"
async def generate_sse_response(timestamp, model, content=None, tools_id=None, function_call_name=None, function_call_content=None, role=None, total_tokens=0, prompt_tokens=0, completion_tokens=0):
random.seed(timestamp)
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=29))
sample_data = {
"id": f"chatcmpl-{random_str}",
"object": "chat.completion.chunk",
"created": timestamp,
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content},
"logprobs": None,
"finish_reason": None
}
],
"usage": None,
"system_fingerprint": "fp_d576307f90",
}
if function_call_content:
sample_data["choices"][0]["delta"] = {"tool_calls":[{"index":0,"function":{"arguments": function_call_content}}]}
if tools_id and function_call_name:
sample_data["choices"][0]["delta"] = {"tool_calls":[{"index":0,"id": tools_id,"type":"function","function":{"name": function_call_name, "arguments":""}}]}
# sample_data["choices"][0]["delta"] = {"tool_calls":[{"index":0,"function":{"id": tools_id, "name": function_call_name}}]}
if role:
sample_data["choices"][0]["delta"] = {"role": role, "content": ""}
if total_tokens:
total_tokens = prompt_tokens + completion_tokens
sample_data["usage"] = {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}
sample_data["choices"] = []
json_data = json.dumps(sample_data, ensure_ascii=False)
# 构建SSE响应
sse_response = f"data: {json_data}" + end_of_line
return sse_response
async def generate_no_stream_response(timestamp, model, content=None, tools_id=None, function_call_name=None, function_call_content=None, role=None, total_tokens=0, prompt_tokens=0, completion_tokens=0):
random.seed(timestamp)
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=29))
sample_data = {
"id": f"chatcmpl-{random_str}",
"object": "chat.completion",
"created": timestamp,
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": role,
"content": content,
"refusal": None
},
"logprobs": None,
"finish_reason": "stop"
}
],
"usage": None,
"system_fingerprint": "fp_a7d06e42a7"
}
if total_tokens:
total_tokens = prompt_tokens + completion_tokens
sample_data["usage"] = {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}
json_data = json.dumps(sample_data, ensure_ascii=False)
return json_data
async def check_response(response, error_log):
if response and not (200 <= response.status_code < 300):
error_message = await response.aread()
error_str = error_message.decode('utf-8', errors='replace')
try:
error_json = json.loads(error_str)
except json.JSONDecodeError:
error_json = error_str
return {"error": f"{error_log} HTTP Error", "status_code": response.status_code, "details": error_json}
return None
async def fetch_gemini_response_stream(client, url, headers, payload, model):
timestamp = int(datetime.timestamp(datetime.now()))
async with client.stream('POST', url, headers=headers, json=payload) as response:
error_message = await check_response(response, "fetch_gemini_response_stream")
if error_message:
yield error_message
return
buffer = ""
revicing_function_call = False
function_full_response = "{"
need_function_call = False
async for chunk in response.aiter_text():
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
# print(line)
if line and '\"text\": \"' in line:
try:
json_data = json.loads( "{" + line + "}")
content = json_data.get('text', '')
content = "\n".join(content.split("\\n"))
sse_string = await generate_sse_response(timestamp, model, content=content)
yield sse_string
except json.JSONDecodeError:
logger.error(f"无法解析JSON: {line}")
if line and ('\"functionCall\": {' in line or revicing_function_call):
revicing_function_call = True
need_function_call = True
if ']' in line:
revicing_function_call = False
continue
function_full_response += line
if need_function_call:
function_call = json.loads(function_full_response)
function_call_name = function_call["functionCall"]["name"]
sse_string = await generate_sse_response(timestamp, model, content=None, tools_id="chatcmpl-9inWv0yEtgn873CxMBzHeCeiHctTV", function_call_name=function_call_name)
yield sse_string
function_full_response = json.dumps(function_call["functionCall"]["args"])
sse_string = await generate_sse_response(timestamp, model, content=None, tools_id="chatcmpl-9inWv0yEtgn873CxMBzHeCeiHctTV", function_call_name=None, function_call_content=function_full_response)
yield sse_string
yield "data: [DONE]" + end_of_line
async def fetch_vertex_claude_response_stream(client, url, headers, payload, model):
timestamp = int(datetime.timestamp(datetime.now()))
async with client.stream('POST', url, headers=headers, json=payload) as response:
error_message = await check_response(response, "fetch_vertex_claude_response_stream")
if error_message:
yield error_message
return
buffer = ""
revicing_function_call = False
function_full_response = "{"
need_function_call = False
async for chunk in response.aiter_text():
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
# logger.info(f"{line}")
if line and '\"text\": \"' in line:
try:
json_data = json.loads( "{" + line + "}")
content = json_data.get('text', '')
content = "\n".join(content.split("\\n"))
sse_string = await generate_sse_response(timestamp, model, content=content)
yield sse_string
except json.JSONDecodeError:
logger.error(f"无法解析JSON: {line}")
if line and ('\"type\": \"tool_use\"' in line or revicing_function_call):
revicing_function_call = True
need_function_call = True
if ']' in line:
revicing_function_call = False
continue
function_full_response += line
if need_function_call:
function_call = json.loads(function_full_response)
function_call_name = function_call["name"]
function_call_id = function_call["id"]
sse_string = await generate_sse_response(timestamp, model, content=None, tools_id=function_call_id, function_call_name=function_call_name)
yield sse_string
function_full_response = json.dumps(function_call["input"])
sse_string = await generate_sse_response(timestamp, model, content=None, tools_id=function_call_id, function_call_name=None, function_call_content=function_full_response)
yield sse_string
yield "data: [DONE]" + end_of_line
async def fetch_gpt_response_stream(client, url, headers, payload):
timestamp = int(datetime.timestamp(datetime.now()))
random.seed(timestamp)
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=29))
async with client.stream('POST', url, headers=headers, json=payload) as response:
error_message = await check_response(response, "fetch_gpt_response_stream")
if error_message:
yield error_message
return
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
# logger.info("line: %s", repr(line))
if line and line != "data: " and line != "data:" and not line.startswith(": "):
line = json.loads(line.lstrip("data: "))
line['id'] = f"chatcmpl-{random_str}"
yield "data: " + json.dumps(line).strip() + end_of_line
async def fetch_cloudflare_response_stream(client, url, headers, payload, model):
timestamp = int(datetime.timestamp(datetime.now()))
async with client.stream('POST', url, headers=headers, json=payload) as response:
error_message = await check_response(response, "fetch_gpt_response_stream")
if error_message:
yield error_message
return
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
# logger.info("line: %s", repr(line))
if line.startswith("data:"):
line = line.lstrip("data: ")
if line == "[DONE]":
yield "data: [DONE]" + end_of_line
return
resp: dict = json.loads(line)
message = resp.get("response")
if message:
sse_string = await generate_sse_response(timestamp, model, content=message)
yield sse_string
async def fetch_cohere_response_stream(client, url, headers, payload, model):
timestamp = int(datetime.timestamp(datetime.now()))
async with client.stream('POST', url, headers=headers, json=payload) as response:
error_message = await check_response(response, "fetch_gpt_response_stream")
if error_message:
yield error_message
return
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
# logger.info("line: %s", repr(line))
resp: dict = json.loads(line)
if resp.get("is_finished") == True:
yield "data: [DONE]" + end_of_line
return
if resp.get("event_type") == "text-generation":
message = resp.get("text")
sse_string = await generate_sse_response(timestamp, model, content=message)
yield sse_string
async def fetch_claude_response_stream(client, url, headers, payload, model):
timestamp = int(datetime.timestamp(datetime.now()))
async with client.stream('POST', url, headers=headers, json=payload) as response:
error_message = await check_response(response, "fetch_claude_response_stream")
if error_message:
yield error_message
return
buffer = ""
input_tokens = 0
async for chunk in response.aiter_text():
# logger.info(f"chunk: {repr(chunk)}")
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
# logger.info(line)
if line.startswith("data:"):
line = line.lstrip("data: ")
resp: dict = json.loads(line)
message = resp.get("message")
if message:
role = message.get("role")
if role:
sse_string = await generate_sse_response(timestamp, model, None, None, None, None, role)
yield sse_string
tokens_use = message.get("usage")
if tokens_use:
input_tokens = tokens_use.get("input_tokens", 0)
usage = resp.get("usage")
if usage:
output_tokens = usage.get("output_tokens", 0)
total_tokens = input_tokens + output_tokens
sse_string = await generate_sse_response(timestamp, model, None, None, None, None, None, total_tokens, input_tokens, output_tokens)
yield sse_string
# print("\n\rtotal_tokens", total_tokens)
tool_use = resp.get("content_block")
tools_id = None
function_call_name = None
if tool_use and "tool_use" == tool_use['type']:
# print("tool_use", tool_use)
tools_id = tool_use["id"]
if "name" in tool_use:
function_call_name = tool_use["name"]
sse_string = await generate_sse_response(timestamp, model, None, tools_id, function_call_name, None)
yield sse_string
delta = resp.get("delta")
# print("delta", delta)
if not delta:
continue
if "text" in delta:
content = delta["text"]
sse_string = await generate_sse_response(timestamp, model, content, None, None)
yield sse_string
if "partial_json" in delta:
# {"type":"input_json_delta","partial_json":""}
function_call_content = delta["partial_json"]
sse_string = await generate_sse_response(timestamp, model, None, None, None, function_call_content)
yield sse_string
yield "data: [DONE]" + end_of_line
async def fetch_response(client, url, headers, payload, engine, model):
response = None
if payload.get("file"):
file = payload.pop("file")
response = await client.post(url, headers=headers, data=payload, files={"file": file})
else:
response = await client.post(url, headers=headers, json=payload)
error_message = await check_response(response, "fetch_response")
if error_message:
yield error_message
return
response_json = response.json()
if engine == "gemini" or engine == "vertex-gemini":
if isinstance(response_json, str):
import ast
parsed_data = ast.literal_eval(str(response_json))
elif isinstance(response_json, list):
parsed_data = response_json
else:
logger.error(f"error fetch_response: Unknown response_json type: {type(response_json)}")
parsed_data = response_json
content = ""
for item in parsed_data:
chunk = safe_get(item, "candidates", 0, "content", "parts", 0, "text")
# logger.info(f"chunk: {repr(chunk)}")
if chunk:
content += chunk
usage_metadata = safe_get(parsed_data, -1, "usageMetadata")
prompt_tokens = usage_metadata.get("promptTokenCount", 0)
candidates_tokens = usage_metadata.get("candidatesTokenCount", 0)
total_tokens = usage_metadata.get("totalTokenCount", 0)
role = safe_get(parsed_data, -1, "candidates", 0, "content", "role")
if role == "model":
role = "assistant"
else:
logger.error(f"Unknown role: {role}")
role = "assistant"
timestamp = int(datetime.timestamp(datetime.now()))
yield await generate_no_stream_response(timestamp, model, content=content, tools_id=None, function_call_name=None, function_call_content=None, role=role, total_tokens=total_tokens, prompt_tokens=prompt_tokens, completion_tokens=candidates_tokens)
else:
yield response_json
async def fetch_response_stream(client, url, headers, payload, engine, model):
# try:
if engine == "gemini" or engine == "vertex-gemini":
async for chunk in fetch_gemini_response_stream(client, url, headers, payload, model):
yield chunk
elif engine == "claude" or engine == "vertex-claude":
async for chunk in fetch_claude_response_stream(client, url, headers, payload, model):
yield chunk
elif engine == "gpt":
async for chunk in fetch_gpt_response_stream(client, url, headers, payload):
yield chunk
elif engine == "openrouter":
async for chunk in fetch_gpt_response_stream(client, url, headers, payload):
yield chunk
elif engine == "cloudflare":
async for chunk in fetch_cloudflare_response_stream(client, url, headers, payload, model):
yield chunk
elif engine == "cohere":
async for chunk in fetch_cohere_response_stream(client, url, headers, payload, model):
yield chunk
else:
raise ValueError("Unknown response")
# except httpx.ConnectError as e:
# yield {"error": f"500", "details": "fetch_response_stream Connect Error"}
# except httpx.ReadTimeout as e:
# yield {"error": f"500", "details": "fetch_response_stream Read Response Timeout"} |