File size: 4,923 Bytes
748e637
 
 
 
 
 
 
c127215
748e637
 
 
 
c127215
 
748e637
 
 
c127215
 
748e637
c127215
 
 
 
 
 
748e637
c127215
 
 
 
 
748e637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
009ce34
 
748e637
009ce34
748e637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c127215
748e637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d7fa99f
748e637
 
 
 
 
 
 
 
f52fdd9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import shutil, os, logging, uvicorn, tempfile
from typing import Optional

from utils.process_video import process_video
from utils.zip_response import zip_response
from utils.read_html import read_html

from fastapi import FastAPI, UploadFile, HTTPException, Form, Depends, File
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic
from pydantic import BaseModel, field_validator

# TODO: fix error with validation(possibly)

app = FastAPI()
security = HTTPBasic()

# class MP4Video(BaseModel):
#     video_file: UploadFile
    
#     @property
#     def filename(self):
#         return self.video_file.filename
#     @property
#     def file(self):
#         return self.video_file.file

#     # @field_validator('video_file')
#     # def validate_video_file(cls, v):
#     #     if "video/" not in v.content_type:
#     #         raise HTTPException(status_code=500, detail='Invalid video file type. Please upload an MP4 file.')
#     #     return v

class SRTFile(BaseModel):
    srt_file: Optional[UploadFile] = None
    
    @property
    def filename(self):
        return self.srt_file.filename
    @property
    def file(self):
        return self.srt_file.file
    @property
    def size(self):
        return self.srt_file.size

    @field_validator('srt_file')
    def validate_srt_file(cls, self):
        if self.size > 0 and not self.filename.endswith('.srt'):
            raise HTTPException(status_code=422, detail='Invalid subtitle file type. Please upload an SRT file.')
        return self

@app.get("/")
async def root():
    html_content = f"""
    {read_html(os.path.join(os.getcwd(),"static/landing_page.html"))}
    """
    return HTMLResponse(content=html_content)


@app.get("/submit_video/")
async def get_form():
    html_content = f"""
    {read_html(os.path.join(os.getcwd(),"static/submit_video.html"))}
    """
    return HTMLResponse(content=html_content)

async def get_temp_dir():
    dir = tempfile.TemporaryDirectory()
    try:
        yield dir.name
    finally:
        del dir

@app.post("/process_video/")
async def process_video_api(video_file: MP4Video = File(media_type="video"),
                            srt_file: SRTFile = Depends(),
                            task: Optional[str] = Form("transcribe"),
                            max_words_per_line: Optional[int] = Form(6),
                            fontsize: Optional[int] = Form(42),
                            font: Optional[str] = Form("FuturaPTHeavy"),
                            bg_color: Optional[str] = Form("#070a13b3"),
                            text_color: Optional[str] = Form("white"),
                            caption_mode: Optional[str] = Form("desktop"),
                            temp_dir: str = Depends(get_temp_dir)
                            ):
    try:
        logging.info("Creating temporary directories")
        with open(os.path.join(temp_dir, video_file.filename), 'w+b') as temp_file:
            logging.info("Copying video UploadFile to the temporary directory")
            try:
                shutil.copyfileobj(video_file.file, temp_file)
            finally:
                video_file.file.close()
            logging.info("Copying SRT UploadFile to the temp_input_path")
            if srt_file.size > 0:
                with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:
                    try:
                        shutil.copyfileobj(srt_file.file, temp_srt_file)
                    finally:
                        srt_file.file.close()
                logging.info("Processing the video...")
                output_path, _ = process_video(temp_file.name, temp_srt_file.name, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)
                logging.info("Zipping response...")
                with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file:
                    zip_file = zip_response(temp_zip_file.name, [output_path, srt_path])
                return Response(content = zip_file)
            with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:
                logging.info("Processing the video...")
                output_path, srt_path = process_video(temp_file.name, None, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)
                logging.info("Zipping response...")
                with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file:
                    zip_file = zip_response(temp_zip_file.name, [output_path, srt_path])
                return Response(content = zip_file)  
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)