from ultralyticsplus import YOLO from typing import Optional, Union, Annotated from scipy.spatial import distance as dist import time from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import StreamingResponse from fastapi.middleware.gzip import GZipMiddleware from io import BytesIO from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read import zipfile import soundfile as sf import openai import os import random # Config for camera picture model = YOLO('ultralyticsplus/yolov8s') # model = YOLO('kadirnar/yolov8n-v8.0') CLASS = model.model.names ZIP = False # bot_voice_time = "おはようございます" bot_voice_time = "こんにちは" default_bot_voice_list = [f"{bot_voice_time}、アイティコンサルティングとシステム開発を支援します。よろしくお願いします。", f"{bot_voice_time}、デトモです。システム開発全般を支援します。", f"{bot_voice_time}、デトモです。オフショア開発全般を支援します。", f"{bot_voice_time}、私はアイサロボです。システム開発全般を支援します。", f"{bot_voice_time}、エッジコンピューティングソリューションを提供します。"] area_threshold = 0 diff_value_threshold = 0 # Config for human input prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。デトモは高度なデジタル化社会を支えます。"\ "ビジネスの課題解決策を提案するコンサ ルティング・サービスと、課題解決を実現す るシステムの開発サービス、また、企業内 の情報システム部門の業務の代行サー ビスにも対応しています。"\ "デトモはITコンサルティング・システム開発を得意とし、お客様の課題解決をお手伝いいたします。"\ "あなたの名前はアイサロボです。"\ "あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\ "質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。" system_prompt = [{"role": "system", "content": prompt_template}] openai.api_key = os.environ["OPENAI_API_KEY"] app = FastAPI() app.add_middleware(GZipMiddleware, minimum_size=1000) @app.get("/") def read_root(): return {"Message": "Application startup complete"} @app.get("/client_settings/") def client_settings_api(): return {"camera_picture_period": 5} @app.post("/camera_picture/") async def camera_picture_api( file: UploadFile = File(...), last_seen: Optional[Union[str, UploadFile]] = Form(None), return_voice: Annotated[bool, Form()] = True, ): # parameters total_time = time.time() most_close = 0 out_img = None diff_value = 0.5 default_bot_voice = random.choice(default_bot_voice_list) # read image and predict image = read_image_file(await file.read()) results = model.predict(image, show=False)[0] masks, boxes = results.masks, results.boxes area_image = image.width * image.height # select and crop face image if boxes is not None: for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): if int(cls) != 0: continue box = xyxy.tolist() area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image if area_rate >= most_close: out_img = image.crop(tuple(box)).resize((64, 64)) most_close = area_rate # check detect people or not if out_img is None: return { "status": "No face detected", "text": None, "voice": None, "image": None } else: if ZIP: image_bot_path = pil_to_base64(out_img, encode=False) else: image_bot_path = pil_to_base64(out_img, encode=True) # check with previous image if have if last_seen is not None: if type(last_seen) == str: last_seen = base64_to_pil(last_seen) else: last_seen = read_image_file(await last_seen.read()) diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) print(f"Distance: {most_close}. Different value: {diff_value}") # return results if most_close >= area_threshold and diff_value >= diff_value_threshold: if ZIP: voice_bot_path = tts(default_bot_voice, language="ja", encode=False) io = BytesIO() zip_filename = "final_archive.zip" with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for file_path in [voice_bot_path, image_bot_path]: zf.write(file_path) zf.close() print("Total time", time.time() - total_time) return StreamingResponse( iter([io.getvalue()]), media_type="application/x-zip-compressed", headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} ) else: if return_voice: print("Total time", time.time() - total_time) return { "status": "New people", "text": default_bot_voice, "voice": tts(default_bot_voice, language="ja", encode=True), "image": image_bot_path } else: print("Total time", time.time() - total_time) return { "status": "New people", "text": default_bot_voice, "voice": None, "image": image_bot_path } elif most_close < area_threshold: print("Total time", time.time() - total_time) return { "status": "People far from camera", "text": None, "voice": None, "image": image_bot_path, } else: print("Total time", time.time() - total_time) return { "status": "Old people", "text": None, "voice": None, "image": image_bot_path, } @app.post("/human_input/") async def human_input_api( voice_input: bytes = File(None), text_input: str = Form(None), temperature: Annotated[float, Form()] = 0.7, max_tokens: Annotated[int, Form()] = 1000, return_voice: Annotated[bool, Form()] = False, ): if text_input: text = text_input elif text_input is None and voice_input is not None: upload_audio = ffmpeg_read(voice_input, sampling_rate=24000) sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16') text = stt('temp.wav') print(text) else: if return_voice: return { "human_text": None, "robot_text": None, "robot_voice": None } else: return { "human_text": None, "robot_text": None, } prompt_msg = {"role": "user", "content": text} messages = system_prompt + [prompt_msg] completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature, max_tokens=max_tokens) print(completion['usage']['total_tokens']) if return_voice: return { "human_text": text, "robot_text": completion.choices[0].message.content, "robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True) } else: return { "human_text": text, "robot_text": completion.choices[0].message.content, }