File size: 3,043 Bytes
2a56a92 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
import httpx
import json
from datetime import datetime
import hashlib
import hmac
import base64
import os
# AWS凭证和配置
AWS_ACCESS_KEY = os.environ.get('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.environ.get('AWS_SECRET_KEY')
REGION = 'us-east-1'
SERVICE = 'bedrock'
ENDPOINT = f'https://bedrock-runtime.{REGION}.amazonaws.com'
# 辅助函数用于生成AWS SigV4签名
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def getSignatureKey(key, dateStamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
def create_authorization_header(method, url, payload, headers):
t = datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')
canonical_uri = url.split(ENDPOINT)[1]
canonical_querystring = ''
canonical_headers = '\n'.join([f"{h.lower()}:{headers[h]}" for h in sorted(headers)]) + '\n'
signed_headers = ';'.join([h.lower() for h in sorted(headers)])
payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
canonical_request = f"{method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = f"{datestamp}/{REGION}/{SERVICE}/aws4_request"
string_to_sign = f"{algorithm}\n{amzdate}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
signing_key = getSignatureKey(AWS_SECRET_KEY, datestamp, REGION, SERVICE)
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
authorization_header = (
f"{algorithm} "
f"Credential={AWS_ACCESS_KEY}/{credential_scope}, "
f"SignedHeaders={signed_headers}, "
f"Signature={signature}"
)
return authorization_header
# 主函数
def invoke_bedrock_model():
url = f"{ENDPOINT}/model/anthropic.claude-3-sonnet-20240229-v1:0/invoke"
payload = json.dumps({
"max_tokens": 256,
"messages": [{"role": "user", "content": "Hello, world"}],
"anthropic_version": "bedrock-2023-05-31"
})
t = datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-Amz-Date': amzdate,
'Host': f'bedrock-runtime.{REGION}.amazonaws.com'
}
authorization_header = create_authorization_header('POST', url, payload, headers)
headers['Authorization'] = authorization_header
with httpx.Client() as client:
response = client.post(url, headers=headers, data=payload)
if response.status_code == 200:
response_body = response.json()
print(response_body.get("content"))
else:
print(f"Error: {response.status_code}")
print(response.text)
# 运行函数
invoke_bedrock_model() |