File size: 5,531 Bytes
1352988
 
b96d288
06bad2e
1352988
d7328ff
26946a6
d377dff
93b2c8a
1352988
93b2c8a
1352988
 
93b2c8a
1352988
 
cf598a0
1352988
3fae5f0
 
1352988
 
 
 
 
 
 
d377dff
08ce8d2
d377dff
08ce8d2
3fae5f0
6a076a8
b7f8699
5138fea
 
 
1352988
 
 
 
 
 
 
5138fea
1352988
5138fea
520369d
7c36af7
520369d
 
 
 
 
5138fea
 
 
 
3fae5f0
520369d
5138fea
 
 
 
 
 
 
 
 
1352988
 
 
 
 
 
 
 
 
 
 
 
 
 
520369d
 
5138fea
d7328ff
 
 
 
1352988
 
 
3fae5f0
 
d7328ff
1352988
d7328ff
 
 
 
 
 
 
 
 
 
 
 
 
d377dff
d7328ff
 
1352988
 
d7328ff
 
 
ca0120c
1352988
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from ultralyticsplus import YOLO
from typing import Optional, Union

from scipy.spatial import distance as dist
import time
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.gzip import GZipMiddleware
from io import BytesIO
from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read
import zipfile
import soundfile as sf
import openai

# Config for camera picture
model = YOLO('ultralyticsplus/yolov8s')
CLASS = model.model.names
ZIP = False
default_bot_voice = "おはいようございます"
area_threshold = 0.3

# Config for human input
prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。あなたの名前はアイサツです。"\
                  "あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\
                  "質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。"
system_prompt = [{"role": "system", "content": prompt_template}]
openai.api_key = os.environ["OPENAI_API_KEY"]

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)


@app.get("/")
def read_root():
    return {"Message": "Application startup complete"}


@app.get("/client_settings/")
def client_settings_api():
    return {"camera_picture_period": 5}


@app.post("/camera_picture/")
async def camera_picture_api(
        file: UploadFile = File(...),
        last_seen: Optional[Union[str, UploadFile]] = Form(None),
):
    # parameters
    total_time = time.time()
    most_close = 0
    out_img = None
    diff_value = 0.5

    # read image and predict
    image = read_image_file(await file.read())
    results = model.predict(image, show=False)[0]
    masks, boxes = results.masks, results.boxes
    area_image = image.width * image.height

    # select and crop face image
    if boxes is not None:
        for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls):
            if int(cls) != 0:
                continue
            box = xyxy.tolist()
            area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image
            if area_rate >= most_close:
                out_img = image.crop(tuple(box)).resize((64, 64))
                most_close = area_rate

    # check detect people or not
    if out_img is None:
        return {
            "status": "No face detected",
            "text": None,
            "voice": None,
            "image": None
        }
    else:
        if ZIP:
            image_bot_path = pil_to_base64(out_img, encode=False)
        else:
            image_bot_path = pil_to_base64(out_img, encode=True)

    # check with previous image if have
    if last_seen is not None:
        if type(last_seen) == str:
            last_seen = base64_to_pil(last_seen)
        else:
            last_seen = read_image_file(await last_seen.read())
        diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen))
        print(f"Distance: {most_close}. Different value: {diff_value}")

    # return results
    if most_close >= area_threshold and diff_value >= 0.5:
        if ZIP:
            voice_bot_path = tts(default_bot_voice, language="ja", encode=False)
            io = BytesIO()
            zip_filename = "final_archive.zip"
            with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
                for file_path in [voice_bot_path, image_bot_path]:
                    zf.write(file_path)
                zf.close()
            print("Total time", time.time() - total_time)
            return StreamingResponse(
                iter([io.getvalue()]),
                media_type="application/x-zip-compressed",
                headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename}
            )
        else:
            voice_bot_path = tts(default_bot_voice, language="ja", encode=True)
            print("Total time", time.time() - total_time)
            return {
                "status": "New people",
                "text": default_bot_voice,
                "voice": voice_bot_path,
                "image": image_bot_path
            }
    else:
        print("Total time", time.time() - total_time)
        return {
            "status": "Old people",
            "text": None,
            "voice": None,
            "image": image_bot_path,
        }


@app.post("/human_input/")
async def human_input_api(
        input_data: Union[str, bytes],
        temperature: float = 0.7,
        max_tokens: int = 1000,
):
    print("Input data type", type(input_data))
    if type(input_data) != str:
        upload_audio = ffmpeg_read(input_data, sampling_rate=24000)
        sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16')
        text = stt('temp.wav')
    else:
        text = input_data
    prompt_msg = {"role": "user", "content": text}
    messages = system_prompt + [prompt_msg]
    completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature,
                                              max_tokens=max_tokens)
    print(completion['usage']['total_tokens'])
    return {
        "human_text": str(text),
        "robot_text": completion.choices[0].message.content,
        "robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True)
    }