bpiyush's picture
Upload folder using huggingface_hub
c5f65a4 verified
"""Audio loading utils."""
import os
import numpy as np
import torch
import torchaudio
import decord
import librosa
import einops
import PIL
import matplotlib.pyplot as plt
# Add serif font
plt.rcParams['font.family'] = 'serif'
from PIL import Image, ImageOps
import librosa.display
import shared.utils as su
def read_info(path):
"""
Reads the info of the given audio file.
Args:
path (str): path to the audio file
"""
import ffmpeg
probe = ffmpeg.probe(path)
audio_info = next(
(s for s in probe['streams'] if s['codec_type'] == 'audio'),
None,
)
video_info = next(
(s for s in probe['streams'] if s['codec_type'] == 'video'),
None,
)
return dict(video=video_info, audio=audio_info)
def load_audio_clips(
audio_path,
clips,
sr,
clip_len,
backend='decord',
load_entire=False,
cut_to_clip_len=True,
):
"""
Loads audio clips from the given audio file.
Args:
audio_path (str): path to the audio file
clips (np.ndarray): sized [T, 2], where T is the number of clips
and each row is a pair of start and end times of the clip
sr (int): sample rate
clip_len (float): length of the audio clip in seconds
backend (str): backend to use for loading audio clips
load_entire (bool): whether to load the entire audio file
cut_to_clip_len (bool): whether to cut the audio clip to clip_len
"""
if backend == 'torchaudio':
audio_info = read_info(audio_path)["audio"]
true_sr = int(audio_info["sample_rate"])
true_nf = audio_info["duration_ts"]
audio_duration = true_nf / true_sr
# metadata = torchaudio.info(audio_path)
# true_sr = metadata.sample_rate
# true_nf = metadata.num_frames
elif backend == "decord":
# duration = librosa.get_duration(filename=audio_path)
ar = decord.AudioReader(audio_path, sample_rate=sr, mono=True)
# Mono=False gives NaNs in inputs.
# This (https://gist.github.com/nateraw/fcc2bdb9c8738224957c8617c3360445) might
# be a related issue. Ignoring for now. Need to use torchaudio for now.
true_nf = ar.shape[1]
audio_duration = ar.shape[1] / sr
else:
raise ValueError(f"Unknown backend: {backend}")
if load_entire:
# Load the entire audio as a single clip and return
if backend == 'torchaudio':
y, _ = torchaudio.load(audio_path)
if y.shape[0] > 1:
# Convert to a single channel
y = y.mean(dim=0, keepdim=True)
resampler = torchaudio.transforms.Resample(true_sr, sr)
y = resampler(y)
audio = y
elif backend == "decord":
audio = ar.get_batch(np.arange(true_nf)).asnumpy()
audio = torch.from_numpy(audio)
return [audio]
else:
# Clip the clips to avoid going out of bounds
clips = np.clip(clips, 0, audio_duration)
audio_clips = []
for st, et in clips:
if backend == 'torchaudio':
# Load audio within the given time range
sf = max(int(true_sr * st), 0)
ef = min(int(true_sr * et), true_nf)
nf = ef - sf
y, _ = torchaudio.load(audio_path, frame_offset=sf, num_frames=nf)
# Stereo to mono
if y.shape[0] > 1:
# Convert to a single channel
y = y.mean(dim=0, keepdim=True)
# Resample to the given sample rate
resampler = torchaudio.transforms.Resample(true_sr, sr)
y = resampler(y)
audio = y
elif backend == "decord":
# Load audio within the given time range
sf = max(int(st * sr), 0)
ef = min(int(et * sr), true_nf)
audio = ar.get_batch(np.arange(sf, ef)).asnumpy()
audio = torch.from_numpy(audio)
# No need to convert to mono since we are using mono=True
# No need to resample since we are using sample_rate=sr
else:
raise ValueError(f"Unknown backend: {backend}")
# Pad the clip to clip_len
nf_reqd = int(clip_len * sr)
nf_curr = audio.shape[1]
npad_side = max(0, nf_reqd - nf_curr)
if nf_curr < nf_reqd:
audio = torch.nn.functional.pad(audio, (0, npad_side))
elif (nf_curr > nf_reqd) and cut_to_clip_len:
audio = audio[:, :nf_reqd]
audio_clips.append(audio)
return audio_clips
def show_audio_clips_waveform(
audio_clips, clips, title=None, show=True, figsize=(10, 2),
):
"""
Visualizes the given audio clips.
Args:
audio_clips (list): list of audio clips
sr (int): sample rate
title (str): title of the plot
show (bool): whether to show the clips
figsize (tuple): figure size
"""
clip_centers = (clips[:, 0] + clips[:, 1]) / 2
clip_durations = clips[:, 1] - clips[:, 0]
fig, ax = plt.subplots(1, len(audio_clips), figsize=figsize)
if len(audio_clips) == 1:
ax = [ax]
for i, audio in enumerate(audio_clips):
timestamps = np.linspace(
clip_centers[i] - clip_durations[i],
clip_centers[i] + clip_durations[i],
audio.shape[-1],
)
ax[i].plot(timestamps, audio.squeeze().numpy(), alpha=0.5)
ax[i].set_title(f'$t=$ {clip_centers[i]:.2f}')
ax[i].grid(alpha=0.4)
plt.tight_layout()
if show:
plt.show()
else:
plt.savefig('audio_clips_waveform.png')
# TODO: preprocess audio clips (e.g., wav-to-spectrogram, etc.)
# Note that this is different from transforms applied as augmentation
# during training. This is more like a preprocessing step that is applied
# to the entire audio before sampling the clips.
import torchaudio.functional as TAF
import torchaudio.transforms as TAT
def load_audio(path, sr=16000, **kwargs):
y, true_sr = torchaudio.load(path, **kwargs)
y = y.mean(dim=0, keepdim=True)
resampler = torchaudio.transforms.Resample(true_sr, sr)
y = resampler(y)
return y, sr
def load_audio_librosa(path, sr=16000, **kwargs):
y, true_sr = librosa.load(path, sr=sr, **kwargs)
y = torch.from_numpy(y).unsqueeze(0)
return y, sr
def librosa_harmonic_spectrogram_db(
y, sr=16000, n_fft=512, hop_length=256, margin=16., n_mels=64,
):
if isinstance(y, torch.Tensor):
y = y.numpy()
if len(y.shape) == 2:
y = y.mean(axis=0)
# center=True outputs 1 more frame than center=False
# Currently, using just center=False
D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length, center=False)
DH, DP = librosa.decompose.hpss(D, margin=margin)
amplitude_h = np.sqrt(2) * np.abs(DH)
if n_mels is None:
# Usual dB spectrogram
SH = librosa.amplitude_to_db(amplitude_h, ref=np.max)
else:
# Mel-scaled dB spectrogram
S = librosa.amplitude_to_db(amplitude_h)
SH = librosa.feature.melspectrogram(S=S, n_mels=n_mels, sr=sr)
return SH
def show_logmelspectrogram(
S,
sr,
n_fft=512,
hop_length=256,
figsize=(10, 3),
ax=None,
show=True,
title="LogMelSpectrogram",
xlabel="Time (s)",
ylabel="Mel bins (Hz)",
return_as_image=False,
):
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize)
librosa.display.specshow(
S,
sr=sr,
hop_length=hop_length,
n_fft=n_fft,
y_axis='mel',
x_axis='time',
ax=ax,
auto_aspect=True,
)
ax.set_title(title)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
if return_as_image:
fig.canvas.draw()
image = PIL.Image.frombytes(
'RGB', fig.canvas.get_width_height(), fig.canvas.tostring_rgb(),
)
plt.close(fig)
return image
if show:
plt.show()
def show_logspectrogram(
S, sr, n_fft=512, hop_length=256, figsize=(10, 3), ax=None, show=True,
):
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize)
librosa.display.specshow(
S,
sr=sr,
hop_length=hop_length,
n_fft=n_fft,
y_axis='linear',
x_axis='time',
ax=ax,
)
ax.set_title("LogSpectrogram")
if show:
plt.show()
def audio_clips_wav_to_spec(
audio_clips, n_fft=512, hop_length=256, margin=16., n_mels=None,
):
"""
Converts the given audio clips to spectrograms.
Args:
audio_clips (list): list of audio clips
n_fft (int): number of FFT points
hop_length (int): hop length
margin (float): margin for harmonic-percussive source separation
n_mels (int): number of mel bands (optional, if None, then dB spectrogram is returned)
"""
audio_specs = []
for audio in audio_clips:
spec = librosa_harmonic_spectrogram_db(
audio,
n_fft=n_fft,
hop_length=hop_length,
margin=margin,
n_mels=n_mels,
)
spec = torch.from_numpy(spec).unsqueeze(0)
audio_specs.append(spec)
return audio_specs
def show_audio_clips_spec(
audio_specs,
clips,
sr,
n_fft=512,
hop_length=256,
margin=16.,
cmap='magma',
n_mels=None,
show=True,
):
"""
Visualizes the given audio clips.
Args:
audio_specs (list): list of audio spectrograms
clips (np.ndarray): sized [T, 2], where T is the number of clips
and each row is a pair of start and end times of the clip
show (bool): whether to show the clips
"""
clip_centers = (clips[:, 0] + clips[:, 1]) / 2
clip_durations = clips[:, 1] - clips[:, 0]
fig, ax = plt.subplots(1, len(audio_specs), figsize=(10, 4))
if len(audio_specs) == 1:
ax = [ax]
for i, spec in enumerate(audio_specs):
clip_start = clips[i][0]
# ax[i].imshow(spec, aspect='auto', origin='lower')
if isinstance(spec, torch.Tensor):
spec = spec.numpy()
if len(spec.shape) == 3:
spec = spec[0]
args = dict(
data=spec,
sr=sr,
n_fft=n_fft,
hop_length=hop_length,
ax=ax[i],
x_axis="time",
cmap=cmap,
)
if n_mels is None:
args.update(dict(y_axis="linear"))
else:
args.update(dict(y_axis="mel"))
librosa.display.specshow(**args)
# Get xticks and replace them by xticks + clip_start
xticks = ax[i].get_xticks()
xticks = xticks + clip_start
ax[i].set_xticklabels([f'{x:.1f}' for x in xticks])
ax[i].set_title(f'$t=$ {clip_centers[i]:.2f}')
plt.tight_layout()
if show:
plt.show()
else:
plt.savefig('audio_clips_spec.png')
def basic_pipeline_audio_clips(
audio_clips,
spec_args=None,
audio_transform=None,
stack=True,
):
wave_transform = audio_transform.get('wave', None)
spec_transform = audio_transform.get('spec', None)
# Apply transforms to raw waveforms
if wave_transform is not None:
audio_clips = wave_transform(audio_clips)
if spec_args is not None:
# Convert waveforms to spectrograms
audio_clips = audio_clips_wav_to_spec(audio_clips, **spec_args)
# Apply transforms to spectrograms
if spec_transform is not None:
audio_clips = spec_transform(audio_clips)
if stack:
audio_clips = torch.stack(audio_clips)
return audio_clips
def load_and_process_audio(
audio_path,
clips,
cut_to_clip_len=True,
load_entire=False,
audio_transform=None,
aload_args=dict(),
apipe_args=dict(),
):
"""Loads and preprocess audio."""
# [C1] Load video clips: List[torch.Tensor]
audio_clips = load_audio_clips(
audio_path=audio_path,
clips=clips,
load_entire=load_entire,
cut_to_clip_len=cut_to_clip_len,
**aload_args,
)
# [C2] Pipeline: [Preprocessing -> Transform]
audio_clips = basic_pipeline_audio_clips(
audio_clips=audio_clips,
audio_transform=audio_transform,
**apipe_args,
)
return audio_clips
def crop_height(image, height):
"""Crops image from the top and bottom to the desired height."""
width, curr_height = image.size
if curr_height < height:
raise ValueError(f"Height of the image is less than {height}")
top = (curr_height - height) // 2
bottom = top + height
return image.crop((0, top, width, bottom))
def pad_to_height(image, height):
"""Pads image with black strips at the top and bottom."""
width, curr_height = image.size
if curr_height > height:
raise ValueError(f"Height of the image is already greater than {height}")
top = (height - curr_height) // 2
bottom = height - curr_height - top
return ImageOps.expand(image, (0, top, 0, bottom), fill="black")
def crop_width(image, width):
"""Crops image from the left and right to the desired width."""
curr_width, height = image.size
if curr_width < width:
raise ValueError(f"Width of the image is less than {width}")
left = (curr_width - width) // 2
right = left + width
return image.crop((left, 0, right, height))
def crop_or_pad_height(image, height):
"""Crops or pads image to the desired height."""
width, curr_height = image.size
if curr_height < height:
return pad_to_height(image, height)
elif curr_height > height:
return crop_height(image, height)
return image
def crop_or_pad_width(image, width):
"""Crops or pads image to the desired width."""
curr_width, height = image.size
if curr_width < width:
return pad_to_width(image, width)
elif curr_width > width:
return crop_width(image, width)
return image
def pad_to_width(image, width):
"""Pads image with black strips at the left and right."""
curr_width, height = image.size
if curr_width > width:
raise ValueError(f"Width of the image is already greater than {width}")
left = (width - curr_width) // 2
right = width - curr_width - left
return ImageOps.expand(image, (left, 0, right, 0), fill="black")
def crop_or_pad_to_size(image, size=(270, 480)):
"""Crops or pads image to the desired size."""
image = crop_or_pad_height(image, size[1])
image = crop_or_pad_width(image, size[0])
return image
if __name__ == "__main__":
import decord
import sound_of_water.data.audio_transforms as at
# Testing on a sample file
file_path = "media_assets/ayNzH0uygFw_9.0_21.0.mp4"
assert os.path.exists(file_path), f"File not found: {file_path}"
# Define audio transforms
cfg_transform = {
"audio": {
"wave": [
{
"name": "AddNoise",
"args": {
"noise_level": 0.001
},
"augmentation": True,
},
{
"name": "ChangeVolume",
"args": {
"volume_factor": [0.8, 1.2]
},
"augmentation": True,
},
{
"name": "Wav2Vec2WaveformProcessor",
"args": {
"model_name": "facebook/wav2vec2-base-960h",
"sr": 16000
}
}
],
"spec": None,
}
}
audio_transform = at.define_audio_transforms(
cfg_transform, augment=False,
)
# Define audio load arguments
aload_args = {
"sr": 16000,
"clip_len": None,
"backend": "decord",
}
# Define audio pipeline arguments
apipe_args = {
"spec_args": None,
"stack": True,
}
# Run the pipeline (this is used to pass to the model)
audio = load_and_process_audio(
audio_path=file_path,
clips=None,
load_entire=True,
cut_to_clip_len=False,
audio_transform=audio_transform,
aload_args=aload_args,
apipe_args=apipe_args,
)[0]
# This will be used to visualise
visualise_args = {
"sr": 16000,
"n_fft": 400,
"hop_length": 320,
"n_mels": 64,
"margin": 16.,
"C": 340 * 100.,
"audio_output_fps": 49.,
}
y = load_audio_clips(
audio_path=file_path,
clips=None,
load_entire=True,
cut_to_clip_len=False,
**aload_args,
)[0]
S = librosa_harmonic_spectrogram_db(
y,
sr=visualise_args["sr"],
n_fft=visualise_args["n_fft"],
hop_length=visualise_args["hop_length"],
n_mels=visualise_args['n_mels'],
)
# Load video frame
vr = decord.VideoReader(file_path, num_threads=1)
frame = PIL.Image.fromarray(vr[0].asnumpy())
"""
# Cut to desired width
new_width, new_height = 270, 480
width, height = frame.size
if width > new_width:
# Crop the width
left = (width - new_width) // 2
right = left + new_width
frame = frame.crop((left, 0, right, height))
else:
# Resize along width to have the desired width
frame = su.visualize.resize_width(frame, new_width)
assert frame.size[0] == new_width, \
f"Width mismatch: {frame.size[0]} != {new_width}"
# Now pad/crop to desired height
if height > new_height:
# Crop the height
top = (height - new_height) // 2
bottom = top + new_height
frame = frame.crop((0, top, new_width, bottom))
else:
# Pad the height
frame = pad_to_height(frame, new_height)
assert frame.size[1] == new_height, \
f"Height mismatch: {frame.size[1]} != {new_height}"
"""
frame = crop_or_pad_to_size(frame)
# frame.save("1.png")
# Visualise
fig, axes = plt.subplots(
1, 2, figsize=(13, 4), width_ratios=[0.25, 0.75],
)
ax = axes[0]
ax.imshow(frame, aspect="auto")
ax.set_title("Example frame")
ax.set_xticks([])
ax.set_yticks([])
ax = axes[1]
show_logmelspectrogram(
S=S,
ax=ax,
show=False,
sr=visualise_args["sr"],
n_fft=visualise_args["n_fft"],
hop_length=visualise_args["hop_length"],
)
plt.savefig("./media_assets/audio_visualisation.png", bbox_inches="tight")
plt.close()