Peleck's picture
Initial
fa8453f
raw
history blame
5.82 kB
import os
import cv2
import glob
import shutil
import subprocess
from datetime import datetime
image_extensions = ["jpg", "jpeg", "png", "bmp", "tiff", "ico", "webp"]
def get_images_from_directory(directory_path):
file_paths =[]
for file_path in glob.glob(os.path.join(directory_path, "*")):
if any(file_path.lower().endswith(ext) for ext in image_extensions):
file_paths.append(file_path)
file_paths.sort()
return file_paths
def open_directory(path=None):
if path is None:
return
try:
os.startfile(path)
except:
subprocess.Popen(["xdg-open", path])
def copy_files_to_directory(files, destination):
file_paths = []
for file_path in files:
new_file_path = shutil.copy(file_path, destination)
file_paths.append(new_file_path)
return file_paths
def create_directory(directory_path, remove_existing=True):
if os.path.exists(directory_path) and remove_existing:
shutil.rmtree(directory_path)
if not os.path.exists(directory_path):
os.mkdir(directory_path)
return directory_path
else:
counter = 1
while True:
new_directory_path = f"{directory_path}_{counter}"
if not os.path.exists(new_directory_path):
os.mkdir(new_directory_path)
return new_directory_path
counter += 1
def add_datetime_to_filename(filename):
current_datetime = datetime.now()
formatted_datetime = current_datetime.strftime("%Y%m%d_%H%M%S")
file_name, file_extension = os.path.splitext(filename)
new_filename = f"{file_name}_{formatted_datetime}{file_extension}"
return new_filename
def get_single_video_frame(video_path, frame_index):
cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_index = min(int(frame_index), total_frames-1)
cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_index))
valid_frame, frame = cap.read()
cap.release()
if valid_frame:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return frame
return None
def get_video_fps(video_path):
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return fps
def ffmpeg_extract_frames(video_path, destination, remove_existing=True, fps=30, name='frame_%d.jpg', ffmpeg_path=None):
ffmpeg_path = 'ffmpeg' if ffmpeg_path is None else ffmpeg_path
destination = create_directory(destination, remove_existing=remove_existing)
cmd = [
ffmpeg_path,
'-loglevel', 'info',
'-hwaccel', 'auto',
'-i', video_path,
'-q:v', '3',
'-pix_fmt', 'rgb24',
'-vf', 'fps=' + str(fps),
'-y',
os.path.join(destination, name)
]
process = subprocess.Popen(cmd)
process.communicate()
if process.returncode == 0:
return True, get_images_from_directory(destination)
else:
print(f"Error: Failed to extract video.")
return False, None
def ffmpeg_merge_frames(sequence_directory, pattern, destination, fps=30, crf=18, ffmpeg_path=None):
ffmpeg_path = 'ffmpeg' if ffmpeg_path is None else ffmpeg_path
cmd = [
ffmpeg_path,
'-loglevel', 'info',
'-hwaccel', 'auto',
'-r', str(fps),
# '-pattern_type', 'glob',
'-i', os.path.join(sequence_directory, pattern),
'-c:v', 'libx264',
'-crf', str(crf),
'-pix_fmt', 'yuv420p',
'-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1',
'-y', destination
]
process = subprocess.Popen(cmd)
process.communicate()
if process.returncode == 0:
return True, destination
else:
print(f"Error: Failed to merge image sequence.")
return False, None
def ffmpeg_replace_video_segments(main_video_path, sub_clips_info, output_path, ffmpeg_path="ffmpeg"):
ffmpeg_path = 'ffmpeg' if ffmpeg_path is None else ffmpeg_path
filter_complex = ""
filter_complex += f"[0:v]split=2[v0][main_end]; "
filter_complex += f"[1:v]split={len(sub_clips_info)}{', '.join([f'[v{index + 1}]' for index in range(len(sub_clips_info))])}; "
overlay_exprs = "".join([f"[v{index + 1}]" for index in range(len(sub_clips_info))])
overlay_filters = f"[main_end][{overlay_exprs}]overlay=eof_action=pass[vout]; "
filter_complex += overlay_filters
cmd = [
ffmpeg_path, '-i', main_video_path,
]
for sub_clip_path, _, _ in sub_clips_info:
cmd.extend(['-i', sub_clip_path])
cmd.extend([
'-filter_complex', filter_complex,
'-map', '[vout]',
output_path
])
subprocess.run(cmd)
def ffmpeg_mux_audio(source, target, output, ffmpeg_path=None):
ffmpeg_path = 'ffmpeg' if ffmpeg_path is None else ffmpeg_path
extracted_audio_path = os.path.join(os.path.dirname(output), 'extracted_audio.aac')
cmd1 = [
ffmpeg_path,
'-loglevel', 'info',
'-i', source,
'-vn',
'-c:a', 'aac',
'-y',
extracted_audio_path
]
process = subprocess.Popen(cmd1)
process.communicate()
if process.returncode != 0:
print(f"Error: Failed to extract audio.")
return False, target
cmd2 = [
ffmpeg_path,
'-loglevel', 'info',
'-hwaccel', 'auto',
'-i', target,
'-i', extracted_audio_path,
'-c:v', 'copy',
'-map', '0:v:0',
'-map', '1:a:0',
'-y', output
]
process = subprocess.Popen(cmd2)
process.communicate()
if process.returncode == 0:
if os.path.exists(extracted_audio_path):
os.remove(extracted_audio_path)
return True, output
else:
print(f"Error: Failed to mux audio.")
return False, None