Spaces:
Sleeping
Sleeping
File size: 5,531 Bytes
1352988 b96d288 06bad2e 1352988 d7328ff 26946a6 d377dff 93b2c8a 1352988 93b2c8a 1352988 93b2c8a 1352988 cf598a0 1352988 3fae5f0 1352988 d377dff 08ce8d2 d377dff 08ce8d2 3fae5f0 6a076a8 b7f8699 5138fea 1352988 5138fea 1352988 5138fea 520369d 7c36af7 520369d 5138fea 3fae5f0 520369d 5138fea 1352988 520369d 5138fea d7328ff 1352988 3fae5f0 d7328ff 1352988 d7328ff d377dff d7328ff 1352988 d7328ff ca0120c 1352988 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
from ultralyticsplus import YOLO
from typing import Optional, Union
from scipy.spatial import distance as dist
import time
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.gzip import GZipMiddleware
from io import BytesIO
from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read
import zipfile
import soundfile as sf
import openai
# Config for camera picture
model = YOLO('ultralyticsplus/yolov8s')
CLASS = model.model.names
ZIP = False
default_bot_voice = "おはいようございます"
area_threshold = 0.3
# Config for human input
prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。あなたの名前はアイサツです。"\
"あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\
"質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。"
system_prompt = [{"role": "system", "content": prompt_template}]
openai.api_key = os.environ["OPENAI_API_KEY"]
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/")
def read_root():
return {"Message": "Application startup complete"}
@app.get("/client_settings/")
def client_settings_api():
return {"camera_picture_period": 5}
@app.post("/camera_picture/")
async def camera_picture_api(
file: UploadFile = File(...),
last_seen: Optional[Union[str, UploadFile]] = Form(None),
):
# parameters
total_time = time.time()
most_close = 0
out_img = None
diff_value = 0.5
# read image and predict
image = read_image_file(await file.read())
results = model.predict(image, show=False)[0]
masks, boxes = results.masks, results.boxes
area_image = image.width * image.height
# select and crop face image
if boxes is not None:
for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls):
if int(cls) != 0:
continue
box = xyxy.tolist()
area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image
if area_rate >= most_close:
out_img = image.crop(tuple(box)).resize((64, 64))
most_close = area_rate
# check detect people or not
if out_img is None:
return {
"status": "No face detected",
"text": None,
"voice": None,
"image": None
}
else:
if ZIP:
image_bot_path = pil_to_base64(out_img, encode=False)
else:
image_bot_path = pil_to_base64(out_img, encode=True)
# check with previous image if have
if last_seen is not None:
if type(last_seen) == str:
last_seen = base64_to_pil(last_seen)
else:
last_seen = read_image_file(await last_seen.read())
diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen))
print(f"Distance: {most_close}. Different value: {diff_value}")
# return results
if most_close >= area_threshold and diff_value >= 0.5:
if ZIP:
voice_bot_path = tts(default_bot_voice, language="ja", encode=False)
io = BytesIO()
zip_filename = "final_archive.zip"
with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for file_path in [voice_bot_path, image_bot_path]:
zf.write(file_path)
zf.close()
print("Total time", time.time() - total_time)
return StreamingResponse(
iter([io.getvalue()]),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename}
)
else:
voice_bot_path = tts(default_bot_voice, language="ja", encode=True)
print("Total time", time.time() - total_time)
return {
"status": "New people",
"text": default_bot_voice,
"voice": voice_bot_path,
"image": image_bot_path
}
else:
print("Total time", time.time() - total_time)
return {
"status": "Old people",
"text": None,
"voice": None,
"image": image_bot_path,
}
@app.post("/human_input/")
async def human_input_api(
input_data: Union[str, bytes],
temperature: float = 0.7,
max_tokens: int = 1000,
):
print("Input data type", type(input_data))
if type(input_data) != str:
upload_audio = ffmpeg_read(input_data, sampling_rate=24000)
sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16')
text = stt('temp.wav')
else:
text = input_data
prompt_msg = {"role": "user", "content": text}
messages = system_prompt + [prompt_msg]
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature,
max_tokens=max_tokens)
print(completion['usage']['total_tokens'])
return {
"human_text": str(text),
"robot_text": completion.choices[0].message.content,
"robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True)
} |