from ultralyticsplus import YOLO from typing import Optional, Union from scipy.spatial import distance as dist import time from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import StreamingResponse from fastapi.middleware.gzip import GZipMiddleware from io import BytesIO from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read import zipfile import soundfile as sf import openai # Config for camera picture model = YOLO('ultralyticsplus/yolov8s') CLASS = model.model.names ZIP = False default_bot_voice = "おはいようございます" area_threshold = 0.3 # Config for human input prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。あなたの名前はアイサツです。"\ "あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\ "質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。" system_prompt = [{"role": "system", "content": prompt_template}] openai.api_key = os.environ["OPENAI_API_KEY"] app = FastAPI() app.add_middleware(GZipMiddleware, minimum_size=1000) @app.get("/") def read_root(): return {"Message": "Application startup complete"} @app.get("/client_settings/") def client_settings_api(): return {"camera_picture_period": 5} @app.post("/camera_picture/") async def camera_picture_api( file: UploadFile = File(...), last_seen: Optional[Union[str, UploadFile]] = Form(None), ): # parameters total_time = time.time() most_close = 0 out_img = None diff_value = 0.5 # read image and predict image = read_image_file(await file.read()) results = model.predict(image, show=False)[0] masks, boxes = results.masks, results.boxes area_image = image.width * image.height # select and crop face image if boxes is not None: for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): if int(cls) != 0: continue box = xyxy.tolist() area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image if area_rate >= most_close: out_img = image.crop(tuple(box)).resize((64, 64)) most_close = area_rate # check detect people or not if out_img is None: return { "status": "No face detected", "text": None, "voice": None, "image": None } else: if ZIP: image_bot_path = pil_to_base64(out_img, encode=False) else: image_bot_path = pil_to_base64(out_img, encode=True) # check with previous image if have if last_seen is not None: if type(last_seen) == str: last_seen = base64_to_pil(last_seen) else: last_seen = read_image_file(await last_seen.read()) diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) print(f"Distance: {most_close}. Different value: {diff_value}") # return results if most_close >= area_threshold and diff_value >= 0.5: if ZIP: voice_bot_path = tts(default_bot_voice, language="ja", encode=False) io = BytesIO() zip_filename = "final_archive.zip" with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for file_path in [voice_bot_path, image_bot_path]: zf.write(file_path) zf.close() print("Total time", time.time() - total_time) return StreamingResponse( iter([io.getvalue()]), media_type="application/x-zip-compressed", headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} ) else: voice_bot_path = tts(default_bot_voice, language="ja", encode=True) print("Total time", time.time() - total_time) return { "status": "New people", "text": default_bot_voice, "voice": voice_bot_path, "image": image_bot_path } else: print("Total time", time.time() - total_time) return { "status": "Old people", "text": None, "voice": None, "image": image_bot_path, } @app.post("/human_input/") async def human_input_api( input_data: Union[str, bytes], temperature: float = 0.7, max_tokens: int = 1000, ): print("Input data type", type(input_data)) if type(input_data) != str: upload_audio = ffmpeg_read(input_data, sampling_rate=24000) sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16') text = stt('temp.wav') else: text = input_data prompt_msg = {"role": "user", "content": text} messages = system_prompt + [prompt_msg] completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature, max_tokens=max_tokens) print(completion['usage']['total_tokens']) return { "human_text": str(text), "robot_text": completion.choices[0].message.content, "robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True) }