import shutil, os, logging, uvicorn, tempfile from typing import Optional from utils.process_video import process_video from utils.zip_response import zip_response from utils.read_html import read_html from fastapi import FastAPI, UploadFile, HTTPException, Form, Depends, File from fastapi.responses import HTMLResponse, Response from import HTTPBasic from pydantic import BaseModel, field_validator # TODO: fix error with validation(possibly) app = FastAPI() security = HTTPBasic() # class MP4Video(BaseModel): # video_file: UploadFile # @property # def filename(self): # return self.video_file.filename # @property # def file(self): # return self.video_file.file # # @field_validator('video_file') # # def validate_video_file(cls, v): # # if "video/" not in v.content_type: # # raise HTTPException(status_code=500, detail='Invalid video file type. Please upload an MP4 file.') # # return v class SRTFile(BaseModel): srt_file: Optional[UploadFile] = None @property def filename(self): return self.srt_file.filename @property def file(self): return self.srt_file.file @property def size(self): return self.srt_file.size @field_validator('srt_file') def validate_srt_file(cls, self): if self.size > 0 and not self.filename.endswith('.srt'): raise HTTPException(status_code=422, detail='Invalid subtitle file type. Please upload an SRT file.') return self @app.get("/") async def root(): html_content = f""" {read_html(os.path.join(os.getcwd(),"static/landing_page.html"))} """ return HTMLResponse(content=html_content) @app.get("/submit_video/") async def get_form(): html_content = f""" {read_html(os.path.join(os.getcwd(),"static/submit_video.html"))} """ return HTMLResponse(content=html_content) async def get_temp_dir(): dir = tempfile.TemporaryDirectory() try: yield finally: del dir"/process_video/") async def process_video_api(video_file: MP4Video = File(media_type="video"), srt_file: SRTFile = Depends(), task: Optional[str] = Form("transcribe"), max_words_per_line: Optional[int] = Form(6), fontsize: Optional[int] = Form(42), font: Optional[str] = Form("FuturaPTHeavy"), bg_color: Optional[str] = Form("#070a13b3"), text_color: Optional[str] = Form("white"), caption_mode: Optional[str] = Form("desktop"), temp_dir: str = Depends(get_temp_dir) ): try:"Creating temporary directories") with open(os.path.join(temp_dir, video_file.filename), 'w+b') as temp_file:"Copying video UploadFile to the temporary directory") try: shutil.copyfileobj(video_file.file, temp_file) finally: video_file.file.close()"Copying SRT UploadFile to the temp_input_path") if srt_file.size > 0: with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file: try: shutil.copyfileobj(srt_file.file, temp_srt_file) finally: srt_file.file.close()"Processing the video...") output_path, _ = process_video(,, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)"Zipping response...") with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file: zip_file = zip_response(, [output_path, srt_path]) return Response(content = zip_file) with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:"Processing the video...") output_path, srt_path = process_video(, None, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)"Zipping response...") with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file: zip_file = zip_response(, [output_path, srt_path]) return Response(content = zip_file) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__":, host="", port=7860)