File size: 4,005 Bytes
748e637
 
 
 
 
 
 
c127215
748e637
 
 
 
080d626
4a0b504
748e637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00936bf
4a0b504
748e637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1888874
748e637
 
 
1926546
748e637
 
d7fa99f
748e637
 
 
1926546
748e637
 
 
 
f52fdd9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import shutil, os, logging, uvicorn, tempfile
from typing import Optional

from utils.process_video import process_video
from utils.zip_response import zip_response
from utils.read_html import read_html

from fastapi import FastAPI, UploadFile, HTTPException, Form, Depends, File
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic
from pydantic import BaseModel, field_validator

# TODO: fix resizing issues when video comes from mobile.

app = FastAPI()
security = HTTPBasic()

@app.get("/")
async def root():
    html_content = f"""
    {read_html(os.path.join(os.getcwd(),"static/landing_page.html"))}
    """
    return HTMLResponse(content=html_content)


@app.get("/submit_video/")
async def get_form():
    html_content = f"""
    {read_html(os.path.join(os.getcwd(),"static/submit_video.html"))}
    """
    return HTMLResponse(content=html_content)

async def get_temp_dir():
    dir = tempfile.TemporaryDirectory()
    try:
        yield dir.name
    finally:
        del dir

@app.post("/process_video/")
async def process_video_api(video_file: UploadFile = File(media_type="video"),
                            srt_file: UploadFile = File(media_type="text"), #ad validation
                            task: Optional[str] = Form("transcribe"),
                            max_words_per_line: Optional[int] = Form(6),
                            fontsize: Optional[int] = Form(42),
                            font: Optional[str] = Form("FuturaPTHeavy"),
                            bg_color: Optional[str] = Form("#070a13b3"),
                            text_color: Optional[str] = Form("white"),
                            caption_mode: Optional[str] = Form("desktop"),
                            temp_dir: str = Depends(get_temp_dir)
                            ):
    try:
        logging.info("Creating temporary directories")
        with open(os.path.join(temp_dir, video_file.filename), 'w+b') as temp_file:
            logging.info("Copying video UploadFile to the temporary directory")
            try:
                shutil.copyfileobj(video_file.file, temp_file)
            finally:
                video_file.file.close()
            logging.info("Copying SRT UploadFile to the temp_input_path")
            if srt_file.size > 0:
                with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:
                    try:
                        shutil.copyfileobj(srt_file.file, temp_srt_file)
                    finally:
                        srt_file.file.close()
                logging.info("Processing the video...")
                output_path, srt_path = process_video(temp_file.name, temp_srt_file.name, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)
                logging.info("Zipping response...")
                with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file:
                    zip_file = zip_response(temp_zip_file.name, [output_path, srt_path])
                    return Response(content = zip_file, media_type = 'application/zip')
            with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:
                logging.info("Processing the video...")
                output_path, srt_path = process_video(temp_file.name, None, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)
                logging.info("Zipping response...")
                with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file:
                    zip_file = zip_response(temp_zip_file.name, [output_path, srt_path])
                    return Response(content = zip_file, media_type = 'application/zip')  
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)