marquesafonso's picture
add media_type to response
1926546 verified
raw
history blame
4.01 kB
import shutil, os, logging, uvicorn, tempfile
from typing import Optional
from utils.process_video import process_video
from utils.zip_response import zip_response
from utils.read_html import read_html
from fastapi import FastAPI, UploadFile, HTTPException, Form, Depends, File
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic
from pydantic import BaseModel, field_validator
# TODO: fix resizing issues when video comes from mobile.
app = FastAPI()
security = HTTPBasic()
@app.get("/")
async def root():
html_content = f"""
{read_html(os.path.join(os.getcwd(),"static/landing_page.html"))}
"""
return HTMLResponse(content=html_content)
@app.get("/submit_video/")
async def get_form():
html_content = f"""
{read_html(os.path.join(os.getcwd(),"static/submit_video.html"))}
"""
return HTMLResponse(content=html_content)
async def get_temp_dir():
dir = tempfile.TemporaryDirectory()
try:
yield dir.name
finally:
del dir
@app.post("/process_video/")
async def process_video_api(video_file: UploadFile = File(media_type="video"),
srt_file: UploadFile = File(media_type="text"), #ad validation
task: Optional[str] = Form("transcribe"),
max_words_per_line: Optional[int] = Form(6),
fontsize: Optional[int] = Form(42),
font: Optional[str] = Form("FuturaPTHeavy"),
bg_color: Optional[str] = Form("#070a13b3"),
text_color: Optional[str] = Form("white"),
caption_mode: Optional[str] = Form("desktop"),
temp_dir: str = Depends(get_temp_dir)
):
try:
logging.info("Creating temporary directories")
with open(os.path.join(temp_dir, video_file.filename), 'w+b') as temp_file:
logging.info("Copying video UploadFile to the temporary directory")
try:
shutil.copyfileobj(video_file.file, temp_file)
finally:
video_file.file.close()
logging.info("Copying SRT UploadFile to the temp_input_path")
if srt_file.size > 0:
with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:
try:
shutil.copyfileobj(srt_file.file, temp_srt_file)
finally:
srt_file.file.close()
logging.info("Processing the video...")
output_path, srt_path = process_video(temp_file.name, temp_srt_file.name, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)
logging.info("Zipping response...")
with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file:
zip_file = zip_response(temp_zip_file.name, [output_path, srt_path])
return Response(content = zip_file, media_type = 'application/zip')
with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:
logging.info("Processing the video...")
output_path, srt_path = process_video(temp_file.name, None, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)
logging.info("Zipping response...")
with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file:
zip_file = zip_response(temp_zip_file.name, [output_path, srt_path])
return Response(content = zip_file, media_type = 'application/zip')
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)