Spaces:
Sleeping
Sleeping
from ultralyticsplus import YOLO | |
from typing import Optional, Union | |
from scipy.spatial import distance as dist | |
import time | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import StreamingResponse | |
from fastapi.middleware.gzip import GZipMiddleware | |
from io import BytesIO | |
from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read | |
import zipfile | |
import soundfile as sf | |
import openai | |
# Config for camera picture | |
model = YOLO('ultralyticsplus/yolov8s') | |
CLASS = model.model.names | |
ZIP = False | |
default_bot_voice = "おはいようございます" | |
area_threshold = 0.3 | |
# Config for human input | |
prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。あなたの名前はアイサツです。"\ | |
"あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\ | |
"質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。" | |
system_prompt = [{"role": "system", "content": prompt_template}] | |
openai.api_key = os.environ["OPENAI_API_KEY"] | |
app = FastAPI() | |
app.add_middleware(GZipMiddleware, minimum_size=1000) | |
def read_root(): | |
return {"Message": "Application startup complete"} | |
def client_settings_api(): | |
return {"camera_picture_period": 5} | |
async def camera_picture_api( | |
file: UploadFile = File(...), | |
last_seen: Optional[Union[str, UploadFile]] = Form(None), | |
): | |
# parameters | |
total_time = time.time() | |
most_close = 0 | |
out_img = None | |
diff_value = 0.5 | |
# read image and predict | |
image = read_image_file(await file.read()) | |
results = model.predict(image, show=False)[0] | |
masks, boxes = results.masks, results.boxes | |
area_image = image.width * image.height | |
# select and crop face image | |
if boxes is not None: | |
for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): | |
if int(cls) != 0: | |
continue | |
box = xyxy.tolist() | |
area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image | |
if area_rate >= most_close: | |
out_img = image.crop(tuple(box)).resize((64, 64)) | |
most_close = area_rate | |
# check detect people or not | |
if out_img is None: | |
return { | |
"status": "No face detected", | |
"text": None, | |
"voice": None, | |
"image": None | |
} | |
else: | |
if ZIP: | |
image_bot_path = pil_to_base64(out_img, encode=False) | |
else: | |
image_bot_path = pil_to_base64(out_img, encode=True) | |
# check with previous image if have | |
if last_seen is not None: | |
if type(last_seen) == str: | |
last_seen = base64_to_pil(last_seen) | |
else: | |
last_seen = read_image_file(await last_seen.read()) | |
diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) | |
print(f"Distance: {most_close}. Different value: {diff_value}") | |
# return results | |
if most_close >= area_threshold and diff_value >= 0.5: | |
if ZIP: | |
voice_bot_path = tts(default_bot_voice, language="ja", encode=False) | |
io = BytesIO() | |
zip_filename = "final_archive.zip" | |
with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: | |
for file_path in [voice_bot_path, image_bot_path]: | |
zf.write(file_path) | |
zf.close() | |
print("Total time", time.time() - total_time) | |
return StreamingResponse( | |
iter([io.getvalue()]), | |
media_type="application/x-zip-compressed", | |
headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} | |
) | |
else: | |
voice_bot_path = tts(default_bot_voice, language="ja", encode=True) | |
print("Total time", time.time() - total_time) | |
return { | |
"status": "New people", | |
"text": default_bot_voice, | |
"voice": voice_bot_path, | |
"image": image_bot_path | |
} | |
else: | |
print("Total time", time.time() - total_time) | |
return { | |
"status": "Old people", | |
"text": None, | |
"voice": None, | |
"image": image_bot_path, | |
} | |
async def human_input_api( | |
input_data: Union[str, bytes], | |
temperature: float = 0.7, | |
max_tokens: int = 1000, | |
): | |
print("Input data type", type(input_data)) | |
if type(input_data) != str: | |
upload_audio = ffmpeg_read(input_data, sampling_rate=24000) | |
sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16') | |
text = stt('temp.wav') | |
else: | |
text = input_data | |
prompt_msg = {"role": "user", "content": text} | |
messages = system_prompt + [prompt_msg] | |
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature, | |
max_tokens=max_tokens) | |
print(completion['usage']['total_tokens']) | |
return { | |
"human_text": str(text), | |
"robot_text": completion.choices[0].message.content, | |
"robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True) | |
} |