aisatsu-api / main.py
vumichien's picture
Update main.py
1352988
raw
history blame
5.53 kB
from ultralyticsplus import YOLO
from typing import Optional, Union
from scipy.spatial import distance as dist
import time
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.gzip import GZipMiddleware
from io import BytesIO
from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read
import zipfile
import soundfile as sf
import openai
# Config for camera picture
model = YOLO('ultralyticsplus/yolov8s')
CLASS = model.model.names
ZIP = False
default_bot_voice = "おはいようございます"
area_threshold = 0.3
# Config for human input
prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。あなたの名前はアイサツです。"\
"あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\
"質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。"
system_prompt = [{"role": "system", "content": prompt_template}]
openai.api_key = os.environ["OPENAI_API_KEY"]
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/")
def read_root():
return {"Message": "Application startup complete"}
@app.get("/client_settings/")
def client_settings_api():
return {"camera_picture_period": 5}
@app.post("/camera_picture/")
async def camera_picture_api(
file: UploadFile = File(...),
last_seen: Optional[Union[str, UploadFile]] = Form(None),
):
# parameters
total_time = time.time()
most_close = 0
out_img = None
diff_value = 0.5
# read image and predict
image = read_image_file(await file.read())
results = model.predict(image, show=False)[0]
masks, boxes = results.masks, results.boxes
area_image = image.width * image.height
# select and crop face image
if boxes is not None:
for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls):
if int(cls) != 0:
continue
box = xyxy.tolist()
area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image
if area_rate >= most_close:
out_img = image.crop(tuple(box)).resize((64, 64))
most_close = area_rate
# check detect people or not
if out_img is None:
return {
"status": "No face detected",
"text": None,
"voice": None,
"image": None
}
else:
if ZIP:
image_bot_path = pil_to_base64(out_img, encode=False)
else:
image_bot_path = pil_to_base64(out_img, encode=True)
# check with previous image if have
if last_seen is not None:
if type(last_seen) == str:
last_seen = base64_to_pil(last_seen)
else:
last_seen = read_image_file(await last_seen.read())
diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen))
print(f"Distance: {most_close}. Different value: {diff_value}")
# return results
if most_close >= area_threshold and diff_value >= 0.5:
if ZIP:
voice_bot_path = tts(default_bot_voice, language="ja", encode=False)
io = BytesIO()
zip_filename = "final_archive.zip"
with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for file_path in [voice_bot_path, image_bot_path]:
zf.write(file_path)
zf.close()
print("Total time", time.time() - total_time)
return StreamingResponse(
iter([io.getvalue()]),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename}
)
else:
voice_bot_path = tts(default_bot_voice, language="ja", encode=True)
print("Total time", time.time() - total_time)
return {
"status": "New people",
"text": default_bot_voice,
"voice": voice_bot_path,
"image": image_bot_path
}
else:
print("Total time", time.time() - total_time)
return {
"status": "Old people",
"text": None,
"voice": None,
"image": image_bot_path,
}
@app.post("/human_input/")
async def human_input_api(
input_data: Union[str, bytes],
temperature: float = 0.7,
max_tokens: int = 1000,
):
print("Input data type", type(input_data))
if type(input_data) != str:
upload_audio = ffmpeg_read(input_data, sampling_rate=24000)
sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16')
text = stt('temp.wav')
else:
text = input_data
prompt_msg = {"role": "user", "content": text}
messages = system_prompt + [prompt_msg]
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature,
max_tokens=max_tokens)
print(completion['usage']['total_tokens'])
return {
"human_text": str(text),
"robot_text": completion.choices[0].message.content,
"robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True)
}