# import smplx | |
# import torch | |
# import pickle | |
# import numpy as np | |
# # Global: Load the SMPL-X model once | |
# smplx_model = smplx.create( | |
# "/content/drive/MyDrive/003_Codes/TANGO-JointEmbedding/emage/smplx_models/", | |
# model_type='smplx', | |
# gender='NEUTRAL_2020', | |
# use_face_contour=False, | |
# num_betas=300, | |
# num_expression_coeffs=100, | |
# ext='npz', | |
# use_pca=True, | |
# num_pca_comps=12, | |
# ).to("cuda").eval() | |
# device = "cuda" | |
# def pkl_to_npz(pkl_path, npz_path): | |
# # Load the pickle file | |
# with open(pkl_path, "rb") as f: | |
# pkl_example = pickle.load(f) | |
# bs = 1 | |
# n = pkl_example["expression"].shape[0] # Assuming this is the batch size | |
# # Convert numpy arrays to torch tensors | |
# def to_tensor(numpy_array): | |
# return torch.tensor(numpy_array, dtype=torch.float32).to(device) | |
# # Ensure that betas are loaded from the pickle data, converting them to torch tensors | |
# betas = to_tensor(pkl_example["betas"]) | |
# transl = to_tensor(pkl_example["transl"]) | |
# expression = to_tensor(pkl_example["expression"]) | |
# jaw_pose = to_tensor(pkl_example["jaw_pose"]) | |
# global_orient = to_tensor(pkl_example["global_orient"]) | |
# body_pose_axis = to_tensor(pkl_example["body_pose_axis"]) | |
# left_hand_pose = to_tensor(pkl_example['left_hand_pose']) | |
# right_hand_pose = to_tensor(pkl_example['right_hand_pose']) | |
# leye_pose = to_tensor(pkl_example['leye_pose']) | |
# reye_pose = to_tensor(pkl_example['reye_pose']) | |
# # Pass the loaded data into the SMPL-X model | |
# gt_vertex = smplx_model( | |
# betas=betas, | |
# transl=transl, # Translation | |
# expression=expression, # Expression | |
# jaw_pose=jaw_pose, # Jaw pose | |
# global_orient=global_orient, # Global orientation | |
# body_pose=body_pose_axis, # Body pose | |
# left_hand_pose=left_hand_pose, # Left hand pose | |
# right_hand_pose=right_hand_pose, # Right hand pose | |
# return_full_pose=True, | |
# leye_pose=leye_pose, # Left eye pose | |
# reye_pose=reye_pose, # Right eye pose | |
# ) | |
# # Save the relevant data to an npz file | |
# np.savez(npz_path, | |
# betas=pkl_example["betas"], | |
# poses=gt_vertex["full_pose"].cpu().numpy(), | |
# expressions=pkl_example["expression"], | |
# trans=pkl_example["transl"], | |
# model='smplx2020', | |
# gender='neutral', | |
# mocap_frame_rate=30, | |
# ) | |
# from tqdm import tqdm | |
# import os | |
# def convert_all_pkl_in_folder(folder_path): | |
# # Collect all .pkl files | |
# pkl_files = [] | |
# for root, dirs, files in os.walk(folder_path): | |
# for file in files: | |
# if file.endswith(".pkl"): | |
# pkl_files.append(os.path.join(root, file)) | |
# # Process each file with a progress bar | |
# for pkl_path in tqdm(pkl_files, desc="Converting .pkl to .npz"): | |
# npz_path = pkl_path.replace(".pkl", ".npz") # Replace .pkl with .npz | |
# pkl_to_npz(pkl_path, npz_path) | |
# convert_all_pkl_in_folder("/content/oliver/oliver/") | |
# import os | |
# import json | |
# def collect_dataset_info(root_dir): | |
# dataset_info = [] | |
# for root, dirs, files in os.walk(root_dir): | |
# for file in files: | |
# if file.endswith(".npz"): | |
# video_id = file[:-4] # Removing the .npz extension to get the video ID | |
# # Construct the paths based on the current root directory | |
# motion_path = os.path.join(root) | |
# video_path = os.path.join(root) | |
# audio_path = os.path.join(root) | |
# # Determine the mode (train, val, test) by checking parent directory | |
# mode = root.split(os.sep)[-2] # Assuming mode is one folder up in hierarchy | |
# dataset_info.append({ | |
# "video_id": video_id, | |
# "video_path": video_path, | |
# "audio_path": audio_path, | |
# "motion_path": motion_path, | |
# "mode": mode | |
# }) | |
# return dataset_info | |
# # Set the root directory path of your dataset | |
# root_dir = '/content/oliver/oliver/' # Adjust this to your actual root directory | |
# dataset_info = collect_dataset_info(root_dir) | |
# output_file = '/content/drive/MyDrive/003_Codes/TANGO-JointEmbedding/datasets/show-oliver-original.json' | |
# # Save the dataset information to a JSON file | |
# with open(output_file, 'w') as json_file: | |
# json.dump(dataset_info, json_file, indent=4) | |
# print(f"Dataset information saved to {output_file}") | |
# import os | |
# import json | |
# import numpy as np | |
# def load_npz(npz_path): | |
# try: | |
# data = np.load(npz_path) | |
# return data | |
# except Exception as e: | |
# print(f"Error loading {npz_path}: {e}") | |
# return None | |
# def generate_clips(data, stride, window_length): | |
# clips = [] | |
# for entry in data: | |
# npz_data = load_npz(os.path.join(entry['motion_path'],entry['video_id']+".npz")) | |
# # Only continue if the npz file is successfully loaded | |
# if npz_data is None: | |
# continue | |
# # Determine the total length of the sequence from npz data | |
# total_frames = npz_data["poses"].shape[0] | |
# # Generate clips based on stride and window_length | |
# for start_idx in range(0, total_frames - window_length + 1, stride): | |
# end_idx = start_idx + window_length | |
# clip = { | |
# "video_id": entry["video_id"], | |
# "video_path": entry["video_path"], | |
# "audio_path": entry["audio_path"], | |
# "motion_path": entry["motion_path"], | |
# "mode": entry["mode"], | |
# "start_idx": start_idx, | |
# "end_idx": end_idx | |
# } | |
# clips.append(clip) | |
# return clips | |
# # Load the existing dataset JSON file | |
# input_json = '/content/drive/MyDrive/003_Codes/TANGO-JointEmbedding/datasets/show-oliver-original.json' | |
# with open(input_json, 'r') as f: | |
# dataset_info = json.load(f) | |
# # Set stride and window length | |
# stride = 40 # Adjust stride as needed | |
# window_length = 64 # Adjust window length as needed | |
# # Generate clips for all data | |
# clips_data = generate_clips(dataset_info, stride, window_length) | |
# # Save the filtered clips data to a new JSON file | |
# output_json = f'/content/drive/MyDrive/003_Codes/TANGO-JointEmbedding/datasets/show-oliver-s{stride}_w{window_length}.json' | |
# with open(output_json, 'w') as f: | |
# json.dump(clips_data, f, indent=4) | |
# print(f"Filtered clips data saved to {output_json}") | |
from ast import Expression | |
import os | |
import numpy as np | |
import wave | |
from moviepy.editor import VideoFileClip | |
def split_npz(npz_path, output_prefix): | |
try: | |
# Load the npz file | |
data = np.load(npz_path) | |
# Get the arrays and split them along the time dimension (T) | |
poses = data["poses"] | |
betas = data["betas"] | |
expressions = data["expressions"] | |
trans = data["trans"] | |
# Determine the halfway point (T/2) | |
half = poses.shape[0] // 2 | |
# Save the first half (0-5 seconds) | |
np.savez(output_prefix + "_0_5.npz", | |
betas=betas[:half], | |
poses=poses[:half], | |
expressions=expressions[:half], | |
trans=trans[:half], | |
model=data['model'], | |
gender=data['gender'], | |
mocap_frame_rate=data['mocap_frame_rate']) | |
# Save the second half (5-10 seconds) | |
np.savez(output_prefix + "_5_10.npz", | |
betas=betas[half:], | |
poses=poses[half:], | |
expressions=expressions[half:], | |
trans=trans[half:], | |
model=data['model'], | |
gender=data['gender'], | |
mocap_frame_rate=data['mocap_frame_rate']) | |
print(f"NPZ split saved for {output_prefix}") | |
except Exception as e: | |
print(f"Error processing NPZ file {npz_path}: {e}") | |
def split_wav(wav_path, output_prefix): | |
try: | |
with wave.open(wav_path, 'rb') as wav_file: | |
params = wav_file.getparams() | |
frames = wav_file.readframes(wav_file.getnframes()) | |
half_frame = len(frames) // 2 | |
# Create two half files | |
for i, start_frame in enumerate([0, half_frame]): | |
with wave.open(f"{output_prefix}_{i*5}_{(i+1)*5}.wav", 'wb') as out_wav: | |
out_wav.setparams(params) | |
if i == 0: | |
out_wav.writeframes(frames[:half_frame]) | |
else: | |
out_wav.writeframes(frames[half_frame:]) | |
print(f"WAV split saved for {output_prefix}") | |
except Exception as e: | |
print(f"Error processing WAV file {wav_path}: {e}") | |
def split_mp4(mp4_path, output_prefix): | |
try: | |
clip = VideoFileClip(mp4_path) | |
for i in range(2): | |
subclip = clip.subclip(i*5, (i+1)*5) | |
subclip.write_videofile(f"{output_prefix}_{i*5}_{(i+1)*5}.mp4", codec="libx264", audio_codec="aac") | |
print(f"MP4 split saved for {output_prefix}") | |
except Exception as e: | |
print(f"Error processing MP4 file {mp4_path}: {e}") | |
def process_files(root_dir, output_dir): | |
import json | |
clips = [] | |
dirs = os.listdir(root_dir) | |
for dir in dirs: | |
video_id = dir | |
output_prefix = os.path.join(output_dir, video_id) | |
root = os.path.join(root_dir, dir) | |
npz_path = os.path.join(root, video_id + ".npz") | |
wav_path = os.path.join(root, video_id + ".wav") | |
mp4_path = os.path.join(root, video_id + ".mp4") | |
# split_npz(npz_path, output_prefix) | |
# split_wav(wav_path, output_prefix) | |
# split_mp4(mp4_path, output_prefix) | |
clip = { | |
"video_id": video_id, | |
"video_path": root, | |
"audio_path": root, | |
"motion_path": root, | |
"mode": "test", | |
"start_idx": 0, | |
"end_idx": 150 | |
} | |
clips.append(clip) | |
output_json = output_dir + "/test.json" | |
with open(output_json, 'w') as f: | |
json.dump(clips, f, indent=4) | |
# Set the root directory path of your dataset and output directory | |
root_dir = '/content/oliver/oliver/Abortion_Laws_-_Last_Week_Tonight_with_John_Oliver_HBO-DRauXXz6t0Y.webm/test/' | |
output_dir = '/content/test' | |
# Make sure the output directory exists | |
os.makedirs(output_dir, exist_ok=True) | |
# Process all the files | |
process_files(root_dir, output_dir) | |