uni-api / request.py
yym68686's picture
Fix the bug where Claude cannot use tool use.
894ff24
raw
history blame
27.5 kB
import json
from models import RequestModel
from utils import c35s, c3s, c3o, c3h, gem, BaseAPI
async def get_image_message(base64_image, engine = None):
if "gpt" == engine:
return {
"type": "image_url",
"image_url": {
"url": base64_image,
}
}
if "claude" == engine:
return {
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_image.split(",")[1],
}
}
if "gemini" == engine:
return {
"inlineData": {
"mimeType": "image/jpeg",
"data": base64_image.split(",")[1],
}
}
raise ValueError("Unknown engine")
async def get_text_message(role, message, engine = None):
if "gpt" == engine or "claude" == engine or "openrouter" == engine:
return {"type": "text", "text": message}
if "gemini" == engine:
return {"text": message}
raise ValueError("Unknown engine")
async def get_gemini_payload(request, engine, provider):
headers = {
'Content-Type': 'application/json'
}
model = provider['model'][request.model]
if request.stream:
gemini_stream = "streamGenerateContent"
url = provider['base_url']
if url.endswith("v1beta"):
url = "https://generativelanguage.googleapis.com/v1beta/models/{model}:{stream}?key={api_key}".format(model=model, stream=gemini_stream, api_key=provider['api'])
if url.endswith("v1"):
url = "https://generativelanguage.googleapis.com/v1/models/{model}:{stream}?key={api_key}".format(model=model, stream=gemini_stream, api_key=provider['api'])
messages = []
systemInstruction = None
function_arguments = None
for msg in request.messages:
if msg.role == "assistant":
msg.role = "model"
tool_calls = None
if isinstance(msg.content, list):
content = []
for item in msg.content:
if item.type == "text":
text_message = await get_text_message(msg.role, item.text, engine)
content.append(text_message)
elif item.type == "image_url":
image_message = await get_image_message(item.image_url.url, engine)
content.append(image_message)
else:
content = [{"text": msg.content}]
tool_calls = msg.tool_calls
if tool_calls:
tool_call = tool_calls[0]
function_arguments = {
"functionCall": {
"name": tool_call.function.name,
"args": json.loads(tool_call.function.arguments)
}
}
messages.append(
{
"role": "model",
"parts": [function_arguments]
}
)
elif msg.role == "tool":
function_call_name = function_arguments["functionCall"]["name"]
messages.append(
{
"role": "function",
"parts": [{
"functionResponse": {
"name": function_call_name,
"response": {
"name": function_call_name,
"content": {
"result": msg.content,
}
}
}
}]
}
)
elif msg.role != "system":
messages.append({"role": msg.role, "parts": content})
elif msg.role == "system":
systemInstruction = {"parts": content}
payload = {
"contents": messages,
"safetySettings": [
{
"category": "HARM_CATEGORY_HARASSMENT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_NONE"
}
]
}
if systemInstruction:
payload["systemInstruction"] = systemInstruction
miss_fields = [
'model',
'messages',
'stream',
'tool_choice',
'temperature',
'top_p',
'max_tokens',
'presence_penalty',
'frequency_penalty',
'n',
'user',
'include_usage',
'logprobs',
'top_logprobs'
]
for field, value in request.model_dump(exclude_unset=True).items():
if field not in miss_fields and value is not None:
if field == "tools":
payload.update({
"tools": [{
"function_declarations": [tool["function"] for tool in value]
}],
"tool_config": {
"function_calling_config": {
"mode": "AUTO"
}
}
})
else:
payload[field] = value
return url, headers, payload
import time
import httpx
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_private_key
def create_jwt(client_email, private_key):
# JWT Header
header = json.dumps({
"alg": "RS256",
"typ": "JWT"
}).encode()
# JWT Payload
now = int(time.time())
payload = json.dumps({
"iss": client_email,
"scope": "https://www.googleapis.com/auth/cloud-platform",
"aud": "https://oauth2.googleapis.com/token",
"exp": now + 3600,
"iat": now
}).encode()
# Encode header and payload
segments = [
base64.urlsafe_b64encode(header).rstrip(b'='),
base64.urlsafe_b64encode(payload).rstrip(b'=')
]
# Create signature
signing_input = b'.'.join(segments)
private_key = load_pem_private_key(private_key.encode(), password=None)
signature = private_key.sign(
signing_input,
padding.PKCS1v15(),
hashes.SHA256()
)
segments.append(base64.urlsafe_b64encode(signature).rstrip(b'='))
return b'.'.join(segments).decode()
def get_access_token(client_email, private_key):
jwt = create_jwt(client_email, private_key)
with httpx.Client() as client:
response = client.post(
"https://oauth2.googleapis.com/token",
data={
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": jwt
},
headers={'Content-Type': "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]
async def get_vertex_gemini_payload(request, engine, provider):
headers = {
'Content-Type': 'application/json'
}
if provider.get("client_email") and provider.get("private_key"):
access_token = get_access_token(provider['client_email'], provider['private_key'])
headers['Authorization'] = f"Bearer {access_token}"
if provider.get("project_id"):
project_id = provider.get("project_id")
if request.stream:
gemini_stream = "streamGenerateContent"
model = provider['model'][request.model]
location = gem
url = "https://{LOCATION}-aiplatform.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{MODEL_ID}:{stream}".format(LOCATION=location.next(), PROJECT_ID=project_id, MODEL_ID=model, stream=gemini_stream)
messages = []
systemInstruction = None
function_arguments = None
for msg in request.messages:
if msg.role == "assistant":
msg.role = "model"
tool_calls = None
if isinstance(msg.content, list):
content = []
for item in msg.content:
if item.type == "text":
text_message = await get_text_message(msg.role, item.text, engine)
content.append(text_message)
elif item.type == "image_url":
image_message = await get_image_message(item.image_url.url, engine)
content.append(image_message)
else:
content = [{"text": msg.content}]
tool_calls = msg.tool_calls
if tool_calls:
tool_call = tool_calls[0]
function_arguments = {
"functionCall": {
"name": tool_call.function.name,
"args": json.loads(tool_call.function.arguments)
}
}
messages.append(
{
"role": "model",
"parts": [function_arguments]
}
)
elif msg.role == "tool":
function_call_name = function_arguments["functionCall"]["name"]
messages.append(
{
"role": "function",
"parts": [{
"functionResponse": {
"name": function_call_name,
"response": {
"name": function_call_name,
"content": {
"result": msg.content,
}
}
}
}]
}
)
elif msg.role != "system":
messages.append({"role": msg.role, "parts": content})
elif msg.role == "system":
systemInstruction = {"parts": content}
payload = {
"contents": messages,
# "safetySettings": [
# {
# "category": "HARM_CATEGORY_HARASSMENT",
# "threshold": "BLOCK_NONE"
# },
# {
# "category": "HARM_CATEGORY_HATE_SPEECH",
# "threshold": "BLOCK_NONE"
# },
# {
# "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
# "threshold": "BLOCK_NONE"
# },
# {
# "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
# "threshold": "BLOCK_NONE"
# }
# ]
"generationConfig": {
"temperature": 0.5,
"max_output_tokens": 8192,
"top_k": 40,
"top_p": 0.95
},
}
if systemInstruction:
payload["system_instruction"] = systemInstruction
miss_fields = [
'model',
'messages',
'stream',
'tool_choice',
'temperature',
'top_p',
'max_tokens',
'presence_penalty',
'frequency_penalty',
'n',
'user',
'include_usage',
'logprobs',
'top_logprobs'
]
for field, value in request.model_dump(exclude_unset=True).items():
if field not in miss_fields and value is not None:
if field == "tools":
payload.update({
"tools": [{
"function_declarations": [tool["function"] for tool in value]
}],
"tool_config": {
"function_calling_config": {
"mode": "AUTO"
}
}
})
else:
payload[field] = value
return url, headers, payload
async def get_vertex_claude_payload(request, engine, provider):
headers = {
'Content-Type': 'application/json',
}
if provider.get("client_email") and provider.get("private_key"):
access_token = get_access_token(provider['client_email'], provider['private_key'])
headers['Authorization'] = f"Bearer {access_token}"
if provider.get("project_id"):
project_id = provider.get("project_id")
model = provider['model'][request.model]
if "claude-3-5-sonnet" in model:
location = c35s
elif "claude-3-opus" in model:
location = c3o
elif "claude-3-sonnet" in model:
location = c3s
elif "claude-3-haiku" in model:
location = c3h
if request.stream:
claude_stream = "streamRawPredict"
url = "https://{LOCATION}-aiplatform.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/publishers/anthropic/models/{MODEL}:{stream}".format(LOCATION=location.next(), PROJECT_ID=project_id, MODEL=model, stream=claude_stream)
messages = []
system_prompt = None
tool_id = None
for msg in request.messages:
tool_call_id = None
tool_calls = None
if isinstance(msg.content, list):
content = []
for item in msg.content:
if item.type == "text":
text_message = await get_text_message(msg.role, item.text, engine)
content.append(text_message)
elif item.type == "image_url":
image_message = await get_image_message(item.image_url.url, engine)
content.append(image_message)
else:
content = msg.content
tool_calls = msg.tool_calls
tool_id = tool_calls[0].id if tool_calls else None or tool_id
tool_call_id = msg.tool_call_id
if tool_calls:
tool_calls_list = []
tool_call = tool_calls[0]
tool_calls_list.append({
"type": "tool_use",
"id": tool_call.id,
"name": tool_call.function.name,
"input": json.loads(tool_call.function.arguments),
})
messages.append({"role": msg.role, "content": tool_calls_list})
elif tool_call_id:
messages.append({"role": "user", "content": [{
"type": "tool_result",
"tool_use_id": tool_id,
"content": content
}]})
elif msg.role != "system":
messages.append({"role": msg.role, "content": content})
elif msg.role == "system":
system_prompt = content
conversation_len = len(messages) - 1
message_index = 0
while message_index < conversation_len:
if messages[message_index]["role"] == messages[message_index + 1]["role"]:
if messages[message_index].get("content"):
if isinstance(messages[message_index]["content"], list):
messages[message_index]["content"].extend(messages[message_index + 1]["content"])
elif isinstance(messages[message_index]["content"], str) and isinstance(messages[message_index + 1]["content"], list):
content_list = [{"type": "text", "text": messages[message_index]["content"]}]
content_list.extend(messages[message_index + 1]["content"])
messages[message_index]["content"] = content_list
else:
messages[message_index]["content"] += messages[message_index + 1]["content"]
messages.pop(message_index + 1)
conversation_len = conversation_len - 1
else:
message_index = message_index + 1
model = provider['model'][request.model]
payload = {
"anthropic_version": "vertex-2023-10-16",
"messages": messages,
"system": system_prompt or "You are Claude, a large language model trained by Anthropic.",
}
# 檢查是否需要添加 max_tokens
if 'max_tokens' not in payload:
if "claude-3-5-sonnet" in model:
payload['max_tokens'] = 8192
elif "claude-3" in model: # 處理其他 Claude 3 模型
payload['max_tokens'] = 4096
miss_fields = [
'model',
'messages',
'presence_penalty',
'frequency_penalty',
'n',
'user',
'include_usage',
]
for field, value in request.model_dump(exclude_unset=True).items():
if field not in miss_fields and value is not None:
payload[field] = value
if request.tools and provider.get("tools"):
tools = []
for tool in request.tools:
json_tool = await gpt2claude_tools_json(tool.dict()["function"])
tools.append(json_tool)
payload["tools"] = tools
if "tool_choice" in payload:
payload["tool_choice"] = {
"type": "auto"
}
if provider.get("tools") == False:
payload.pop("tools", None)
payload.pop("tool_choice", None)
return url, headers, payload
async def get_gpt_payload(request, engine, provider):
headers = {
'Content-Type': 'application/json',
}
if provider.get("api"):
headers['Authorization'] = f"Bearer {provider['api']}"
url = provider['base_url']
messages = []
for msg in request.messages:
tool_calls = None
tool_call_id = None
if isinstance(msg.content, list):
content = []
for item in msg.content:
if item.type == "text":
text_message = await get_text_message(msg.role, item.text, engine)
content.append(text_message)
elif item.type == "image_url":
image_message = await get_image_message(item.image_url.url, engine)
content.append(image_message)
else:
content = msg.content
tool_calls = msg.tool_calls
tool_call_id = msg.tool_call_id
if tool_calls:
tool_calls_list = []
for tool_call in tool_calls:
tool_calls_list.append({
"id": tool_call.id,
"type": tool_call.type,
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments
}
})
messages.append({"role": msg.role, "tool_calls": tool_calls_list})
elif tool_call_id:
messages.append({"role": msg.role, "tool_call_id": tool_call_id, "content": content})
else:
messages.append({"role": msg.role, "content": content})
model = provider['model'][request.model]
payload = {
"model": model,
"messages": messages,
}
miss_fields = [
'model',
'messages'
]
for field, value in request.model_dump(exclude_unset=True).items():
if field not in miss_fields and value is not None:
payload[field] = value
if provider.get("tools") == False:
payload.pop("tools", None)
payload.pop("tool_choice", None)
return url, headers, payload
async def get_openrouter_payload(request, engine, provider):
headers = {
'Content-Type': 'application/json'
}
if provider.get("api"):
headers['Authorization'] = f"Bearer {provider['api']}"
url = provider['base_url']
messages = []
for msg in request.messages:
name = None
if isinstance(msg.content, list):
content = []
for item in msg.content:
if item.type == "text":
text_message = await get_text_message(msg.role, item.text, engine)
content.append(text_message)
elif item.type == "image_url":
image_message = await get_image_message(item.image_url.url, engine)
content.append(image_message)
else:
content = msg.content
name = msg.name
if name:
messages.append({"role": msg.role, "name": name, "content": content})
else:
# print("content", content)
if isinstance(content, list):
for item in content:
if item["type"] == "text":
messages.append({"role": msg.role, "content": item["text"]})
elif item["type"] == "image_url":
messages.append({"role": msg.role, "content": item["url"]})
else:
messages.append({"role": msg.role, "content": content})
model = provider['model'][request.model]
payload = {
"model": model,
"messages": messages,
}
miss_fields = [
'model',
'messages',
'tools',
'tool_choice',
'temperature',
'top_p',
'max_tokens',
'presence_penalty',
'frequency_penalty',
'n',
'user',
'include_usage',
'logprobs',
'top_logprobs'
]
for field, value in request.model_dump(exclude_unset=True).items():
if field not in miss_fields and value is not None:
payload[field] = value
return url, headers, payload
async def gpt2claude_tools_json(json_dict):
import copy
json_dict = copy.deepcopy(json_dict)
keys_to_change = {
"parameters": "input_schema",
}
for old_key, new_key in keys_to_change.items():
if old_key in json_dict:
if new_key:
if json_dict[old_key] == None:
json_dict[old_key] = {
"type": "object",
"properties": {}
}
json_dict[new_key] = json_dict.pop(old_key)
else:
json_dict.pop(old_key)
return json_dict
async def get_claude_payload(request, engine, provider):
model = provider['model'][request.model]
headers = {
"content-type": "application/json",
"x-api-key": f"{provider['api']}",
"anthropic-version": "2023-06-01",
"anthropic-beta": "max-tokens-3-5-sonnet-2024-07-15" if "claude-3-5-sonnet" in model else "tools-2024-05-16",
}
url = provider['base_url']
messages = []
system_prompt = None
tool_id = None
for msg in request.messages:
tool_call_id = None
tool_calls = None
if isinstance(msg.content, list):
content = []
for item in msg.content:
if item.type == "text":
text_message = await get_text_message(msg.role, item.text, engine)
content.append(text_message)
elif item.type == "image_url":
image_message = await get_image_message(item.image_url.url, engine)
content.append(image_message)
else:
content = msg.content
tool_calls = msg.tool_calls
tool_id = tool_calls[0].id if tool_calls else None or tool_id
tool_call_id = msg.tool_call_id
if tool_calls:
tool_calls_list = []
tool_call = tool_calls[0]
tool_calls_list.append({
"type": "tool_use",
"id": tool_call.id,
"name": tool_call.function.name,
"input": json.loads(tool_call.function.arguments),
})
messages.append({"role": msg.role, "content": tool_calls_list})
elif tool_call_id:
messages.append({"role": "user", "content": [{
"type": "tool_result",
"tool_use_id": tool_id,
"content": content
}]})
elif msg.role != "system":
messages.append({"role": msg.role, "content": content})
elif msg.role == "system":
system_prompt = content
conversation_len = len(messages) - 1
message_index = 0
while message_index < conversation_len:
if messages[message_index]["role"] == messages[message_index + 1]["role"]:
if messages[message_index].get("content"):
if isinstance(messages[message_index]["content"], list):
messages[message_index]["content"].extend(messages[message_index + 1]["content"])
elif isinstance(messages[message_index]["content"], str) and isinstance(messages[message_index + 1]["content"], list):
content_list = [{"type": "text", "text": messages[message_index]["content"]}]
content_list.extend(messages[message_index + 1]["content"])
messages[message_index]["content"] = content_list
else:
messages[message_index]["content"] += messages[message_index + 1]["content"]
messages.pop(message_index + 1)
conversation_len = conversation_len - 1
else:
message_index = message_index + 1
model = provider['model'][request.model]
payload = {
"model": model,
"messages": messages,
"system": system_prompt or "You are Claude, a large language model trained by Anthropic.",
}
miss_fields = [
'model',
'messages',
'presence_penalty',
'frequency_penalty',
'n',
'user',
'include_usage',
]
for field, value in request.model_dump(exclude_unset=True).items():
if field not in miss_fields and value is not None:
payload[field] = value
if request.tools and provider.get("tools"):
tools = []
for tool in request.tools:
# print("tool", type(tool), tool)
json_tool = await gpt2claude_tools_json(tool.dict()["function"])
tools.append(json_tool)
payload["tools"] = tools
if "tool_choice" in payload:
payload["tool_choice"] = {
"type": "auto"
}
if provider.get("tools") == False:
payload.pop("tools", None)
payload.pop("tool_choice", None)
# print("payload", json.dumps(payload, indent=2, ensure_ascii=False))
return url, headers, payload
async def get_dalle_payload(request, engine, provider):
model = provider['model'][request.model]
headers = {
"Content-Type": "application/json",
}
if provider.get("api"):
headers['Authorization'] = f"Bearer {provider['api']}"
url = provider['base_url']
url = BaseAPI(url).image_url
payload = {
"model": model,
"prompt": request.prompt,
"n": request.n,
"size": request.size
}
return url, headers, payload
async def get_payload(request: RequestModel, engine, provider):
if engine == "gemini":
return await get_gemini_payload(request, engine, provider)
elif engine == "vertex" and "gemini" in provider['model'][request.model]:
return await get_vertex_gemini_payload(request, engine, provider)
elif engine == "vertex" and "claude" in provider['model'][request.model]:
return await get_vertex_claude_payload(request, engine, provider)
elif engine == "claude":
return await get_claude_payload(request, engine, provider)
elif engine == "gpt":
return await get_gpt_payload(request, engine, provider)
elif engine == "openrouter":
return await get_openrouter_payload(request, engine, provider)
elif engine == "dalle":
return await get_dalle_payload(request, engine, provider)
else:
raise ValueError("Unknown payload")