File size: 3,458 Bytes
eafbf97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Misc utils."""
import os
from shared.utils.log import tqdm_iterator
import numpy as np


class AttrDict(dict):
    def __init__(self, *args, **kwargs):
        super(AttrDict, self).__init__(*args, **kwargs)
        self.__dict__ = self
        

def ignore_warnings(type="ignore"):
    import warnings
    warnings.filterwarnings(type)
    os.environ["TOKENIZERS_PARALLELISM"] = "false"


def download_youtube_video(youtube_id, ext='mp4', resolution="360p", **kwargs):
    import pytube
    video_url = f"https://www.youtube.com/watch?v={youtube_id}"
    yt = pytube.YouTube(video_url)
    try:
        streams = yt.streams.filter(
            file_extension=ext, res=resolution, progressive=True, **kwargs,
        )
        # streams[0].download(output_path=save_dir, filename=f"{video_id}.{ext}")
        streams[0].download(output_path='/tmp', filename='sample.mp4')
    except:
        print("Failed to download video: ", video_url)
        return None
    return "/tmp/sample.mp4"


def check_audio(video_path):
    from moviepy.video.io.VideoFileClip import VideoFileClip
    try:
        return VideoFileClip(video_path).audio is not None
    except:
        return False


def check_audio_multiple(video_paths, n_jobs=8):
    """Parallelly check if videos have audio"""
    iterator = tqdm_iterator(video_paths, desc="Checking audio")
    from joblib import Parallel, delayed
    return Parallel(n_jobs=n_jobs)(
            delayed(check_audio)(video_path) for video_path in iterator
        )


def num_trainable_params(model, round=3, verbose=True, return_count=False):
    n_params = sum([p.numel() for p in model.parameters() if p.requires_grad])
    model_name = model.__class__.__name__
    if round is not None:
        value = np.round(n_params / 1e6, round)
        unit = "M"
    else:
        value = n_params
        unit = ""
    if verbose:
        print(f"::: Number of trainable parameters in {model_name}: {value} {unit}")
    if return_count:
        return n_params


def num_params(model, round=3):
    n_params = sum([p.numel() for p in model.parameters()])
    model_name = model.__class__.__name__
    if round is not None:
        value = np.round(n_params / 1e6, round)
        unit = "M"
    else:
        value = n_params
        unit = ""
    print(f"::: Number of total parameters in {model_name}: {value}{unit}")


def fix_seed(seed=42):
    """Fix all numpy/pytorch/random seeds."""
    import random
    import torch
    import numpy as np
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True


def check_tensor(x):
    print(x.shape, x.min(), x.max())


def find_nearest_indices(a, b):
    """
    Finds the indices of the elements in `a` that are closest to each element in `b`.

    Args:
        a (np.ndarray): The array to search for the closest values.
        b (np.ndarray): The array of values to search for.
    
    Returns:
        np.ndarray: The indices of the closest values in `a` for each element in `b`.
    """
    # Reshape `a` and `b` to make use of broadcasting
    a = np.array(a)
    b = np.array(b)

    # Calculate the absolute difference between each element in `b` and all elements in `a`
    diff = np.abs(a - b[:, np.newaxis])

    # Find the index of the minimum value along the second axis (which corresponds to `a`)
    indices = np.argmin(diff, axis=1)

    return indices