uni-api / test /test_httpx.py
yym68686's picture
✨ Feature: Add feature: Support vertex ai API and vertex tool use invocation.
edb14b7
raw
history blame
1.61 kB
import httpx
import asyncio
import ssl
import logging
# 设置日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def make_request():
# SSL 上下文设置
# ssl_context = ssl.create_default_context()
# ssl_context.set_alpn_protocols(["h2", "http/1.1"])
# 创建自定义传输
transport = httpx.AsyncHTTPTransport(
http2=True,
# verify=ssl_context,
verify=False,
retries=1
)
# 设置头部
headers = {
"User-Agent": "curl/8.7.1",
"Accept": "*/*",
"Content-Type": "application/json",
"Authorization": "Bearer sk-xxxxxxx"
}
# 请求数据
data = {
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": "say test"
}
],
"stream": True
}
async with httpx.AsyncClient(transport=transport) as client:
try:
response = await client.post(
"https://api.xxxxxxxxxx.me/v1/chat/completions",
headers=headers,
json=data,
timeout=30.0
)
logger.info(f"Status Code: {response.status_code}")
logger.info(f"Headers: {response.headers}")
# 处理流式响应
async for line in response.aiter_lines():
if line:
print(line)
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting {e.request.url!r}.")
# 运行异步函数
asyncio.run(make_request())