Spaces:
Build error
Build error
import os | |
import cv2 | |
import glob | |
import shutil | |
import subprocess | |
from datetime import datetime | |
image_extensions = ["jpg", "jpeg", "png", "bmp", "tiff", "ico", "webp"] | |
def get_images_from_directory(directory_path): | |
file_paths =[] | |
for file_path in glob.glob(os.path.join(directory_path, "*")): | |
if any(file_path.lower().endswith(ext) for ext in image_extensions): | |
file_paths.append(file_path) | |
file_paths.sort() | |
return file_paths | |
def open_directory(path=None): | |
if path is None: | |
return | |
try: | |
os.startfile(path) | |
except: | |
subprocess.Popen(["xdg-open", path]) | |
def copy_files_to_directory(files, destination): | |
file_paths = [] | |
for file_path in files: | |
new_file_path = shutil.copy(file_path, destination) | |
file_paths.append(new_file_path) | |
return file_paths | |
def create_directory(directory_path, remove_existing=True): | |
if os.path.exists(directory_path) and remove_existing: | |
shutil.rmtree(directory_path) | |
if not os.path.exists(directory_path): | |
os.mkdir(directory_path) | |
return directory_path | |
else: | |
counter = 1 | |
while True: | |
new_directory_path = f"{directory_path}_{counter}" | |
if not os.path.exists(new_directory_path): | |
os.mkdir(new_directory_path) | |
return new_directory_path | |
counter += 1 | |
def add_datetime_to_filename(filename): | |
current_datetime = datetime.now() | |
formatted_datetime = current_datetime.strftime("%Y%m%d_%H%M%S") | |
file_name, file_extension = os.path.splitext(filename) | |
new_filename = f"{file_name}_{formatted_datetime}{file_extension}" | |
return new_filename | |
def get_single_video_frame(video_path, frame_index): | |
cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
frame_index = min(int(frame_index), total_frames-1) | |
cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_index)) | |
valid_frame, frame = cap.read() | |
cap.release() | |
if valid_frame: | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
return frame | |
return None | |
def get_video_fps(video_path): | |
cap = cv2.VideoCapture(video_path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
cap.release() | |
return fps | |
def ffmpeg_extract_frames(video_path, destination, remove_existing=True, fps=30, name='frame_%d.jpg', ffmpeg_path=None): | |
ffmpeg_path = 'ffmpeg' if ffmpeg_path is None else ffmpeg_path | |
destination = create_directory(destination, remove_existing=remove_existing) | |
cmd = [ | |
ffmpeg_path, | |
'-loglevel', 'info', | |
'-hwaccel', 'auto', | |
'-i', video_path, | |
'-q:v', '3', | |
'-pix_fmt', 'rgb24', | |
'-vf', 'fps=' + str(fps), | |
'-y', | |
os.path.join(destination, name) | |
] | |
process = subprocess.Popen(cmd) | |
process.communicate() | |
if process.returncode == 0: | |
return True, get_images_from_directory(destination) | |
else: | |
print(f"Error: Failed to extract video.") | |
return False, None | |
def ffmpeg_merge_frames(sequence_directory, pattern, destination, fps=30, crf=18, ffmpeg_path=None): | |
ffmpeg_path = 'ffmpeg' if ffmpeg_path is None else ffmpeg_path | |
cmd = [ | |
ffmpeg_path, | |
'-loglevel', 'info', | |
'-hwaccel', 'auto', | |
'-r', str(fps), | |
# '-pattern_type', 'glob', | |
'-i', os.path.join(sequence_directory, pattern), | |
'-c:v', 'libx264', | |
'-crf', str(crf), | |
'-pix_fmt', 'yuv420p', | |
'-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', | |
'-y', destination | |
] | |
process = subprocess.Popen(cmd) | |
process.communicate() | |
if process.returncode == 0: | |
return True, destination | |
else: | |
print(f"Error: Failed to merge image sequence.") | |
return False, None | |
def ffmpeg_replace_video_segments(main_video_path, sub_clips_info, output_path, ffmpeg_path="ffmpeg"): | |
ffmpeg_path = 'ffmpeg' if ffmpeg_path is None else ffmpeg_path | |
filter_complex = "" | |
filter_complex += f"[0:v]split=2[v0][main_end]; " | |
filter_complex += f"[1:v]split={len(sub_clips_info)}{', '.join([f'[v{index + 1}]' for index in range(len(sub_clips_info))])}; " | |
overlay_exprs = "".join([f"[v{index + 1}]" for index in range(len(sub_clips_info))]) | |
overlay_filters = f"[main_end][{overlay_exprs}]overlay=eof_action=pass[vout]; " | |
filter_complex += overlay_filters | |
cmd = [ | |
ffmpeg_path, '-i', main_video_path, | |
] | |
for sub_clip_path, _, _ in sub_clips_info: | |
cmd.extend(['-i', sub_clip_path]) | |
cmd.extend([ | |
'-filter_complex', filter_complex, | |
'-map', '[vout]', | |
output_path | |
]) | |
subprocess.run(cmd) | |
def ffmpeg_mux_audio(source, target, output, ffmpeg_path=None): | |
ffmpeg_path = 'ffmpeg' if ffmpeg_path is None else ffmpeg_path | |
extracted_audio_path = os.path.join(os.path.dirname(output), 'extracted_audio.aac') | |
cmd1 = [ | |
ffmpeg_path, | |
'-loglevel', 'info', | |
'-i', source, | |
'-vn', | |
'-c:a', 'aac', | |
'-y', | |
extracted_audio_path | |
] | |
process = subprocess.Popen(cmd1) | |
process.communicate() | |
if process.returncode != 0: | |
print(f"Error: Failed to extract audio.") | |
return False, target | |
cmd2 = [ | |
ffmpeg_path, | |
'-loglevel', 'info', | |
'-hwaccel', 'auto', | |
'-i', target, | |
'-i', extracted_audio_path, | |
'-c:v', 'copy', | |
'-map', '0:v:0', | |
'-map', '1:a:0', | |
'-y', output | |
] | |
process = subprocess.Popen(cmd2) | |
process.communicate() | |
if process.returncode == 0: | |
if os.path.exists(extracted_audio_path): | |
os.remove(extracted_audio_path) | |
return True, output | |
else: | |
print(f"Error: Failed to mux audio.") | |
return False, None | |