|
import copy |
|
|
|
import julius |
|
import numpy as np |
|
import scipy |
|
import torch |
|
import torch.nn.functional as F |
|
import torchaudio |
|
|
|
|
|
class Meter(torch.nn.Module): |
|
"""Tensorized version of pyloudnorm.Meter. Works with batched audio tensors. |
|
|
|
Parameters |
|
---------- |
|
rate : int |
|
Sample rate of audio. |
|
filter_class : str, optional |
|
Class of weighting filter used. |
|
K-weighting' (default), 'Fenton/Lee 1' |
|
'Fenton/Lee 2', 'Dash et al.' |
|
by default "K-weighting" |
|
block_size : float, optional |
|
Gating block size in seconds, by default 0.400 |
|
zeros : int, optional |
|
Number of zeros to use in FIR approximation of |
|
IIR filters, by default 512 |
|
use_fir : bool, optional |
|
Whether to use FIR approximation or exact IIR formulation. |
|
If computing on GPU, ``use_fir=True`` will be used, as its |
|
much faster, by default False |
|
""" |
|
|
|
def __init__( |
|
self, |
|
rate: int, |
|
filter_class: str = "K-weighting", |
|
block_size: float = 0.400, |
|
zeros: int = 512, |
|
use_fir: bool = False, |
|
): |
|
super().__init__() |
|
|
|
self.rate = rate |
|
self.filter_class = filter_class |
|
self.block_size = block_size |
|
self.use_fir = use_fir |
|
|
|
G = torch.from_numpy(np.array([1.0, 1.0, 1.0, 1.41, 1.41])) |
|
self.register_buffer("G", G) |
|
|
|
|
|
|
|
impulse = np.zeros((zeros,)) |
|
impulse[..., 0] = 1.0 |
|
|
|
firs = np.zeros((len(self._filters), 1, zeros)) |
|
passband_gain = torch.zeros(len(self._filters)) |
|
|
|
for i, (_, filter_stage) in enumerate(self._filters.items()): |
|
firs[i] = scipy.signal.lfilter(filter_stage.b, filter_stage.a, impulse) |
|
passband_gain[i] = filter_stage.passband_gain |
|
|
|
firs = torch.from_numpy(firs[..., ::-1].copy()).float() |
|
|
|
self.register_buffer("firs", firs) |
|
self.register_buffer("passband_gain", passband_gain) |
|
|
|
def apply_filter_gpu(self, data: torch.Tensor): |
|
"""Performs FIR approximation of loudness computation. |
|
|
|
Parameters |
|
---------- |
|
data : torch.Tensor |
|
Audio data of shape (nb, nch, nt). |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Filtered audio data. |
|
""" |
|
|
|
|
|
nb, nt, nch = data.shape |
|
data = data.permute(0, 2, 1) |
|
data = data.reshape(nb * nch, 1, nt) |
|
|
|
|
|
pad_length = self.firs.shape[-1] |
|
|
|
|
|
for i in range(self.firs.shape[0]): |
|
data = F.pad(data, (pad_length, pad_length)) |
|
data = julius.fftconv.fft_conv1d(data, self.firs[i, None, ...]) |
|
data = self.passband_gain[i] * data |
|
data = data[..., 1 : nt + 1] |
|
|
|
data = data.permute(0, 2, 1) |
|
data = data[:, :nt, :] |
|
return data |
|
|
|
def apply_filter_cpu(self, data: torch.Tensor): |
|
"""Performs IIR formulation of loudness computation. |
|
|
|
Parameters |
|
---------- |
|
data : torch.Tensor |
|
Audio data of shape (nb, nch, nt). |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Filtered audio data. |
|
""" |
|
for _, filter_stage in self._filters.items(): |
|
passband_gain = filter_stage.passband_gain |
|
|
|
a_coeffs = torch.from_numpy(filter_stage.a).float().to(data.device) |
|
b_coeffs = torch.from_numpy(filter_stage.b).float().to(data.device) |
|
|
|
_data = data.permute(0, 2, 1) |
|
filtered = torchaudio.functional.lfilter( |
|
_data, a_coeffs, b_coeffs, clamp=False |
|
) |
|
data = passband_gain * filtered.permute(0, 2, 1) |
|
return data |
|
|
|
def apply_filter(self, data: torch.Tensor): |
|
"""Applies filter on either CPU or GPU, depending |
|
on if the audio is on GPU or is on CPU, or if |
|
``self.use_fir`` is True. |
|
|
|
Parameters |
|
---------- |
|
data : torch.Tensor |
|
Audio data of shape (nb, nch, nt). |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Filtered audio data. |
|
""" |
|
if data.is_cuda or self.use_fir: |
|
data = self.apply_filter_gpu(data) |
|
else: |
|
data = self.apply_filter_cpu(data) |
|
return data |
|
|
|
def forward(self, data: torch.Tensor): |
|
"""Computes integrated loudness of data. |
|
|
|
Parameters |
|
---------- |
|
data : torch.Tensor |
|
Audio data of shape (nb, nch, nt). |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Filtered audio data. |
|
""" |
|
return self.integrated_loudness(data) |
|
|
|
def _unfold(self, input_data): |
|
T_g = self.block_size |
|
overlap = 0.75 |
|
step = 1.0 - overlap |
|
|
|
kernel_size = int(T_g * self.rate) |
|
stride = int(T_g * self.rate * step) |
|
unfolded = julius.core.unfold(input_data.permute(0, 2, 1), kernel_size, stride) |
|
unfolded = unfolded.transpose(-1, -2) |
|
|
|
return unfolded |
|
|
|
def integrated_loudness(self, data: torch.Tensor): |
|
"""Computes integrated loudness of data. |
|
|
|
Parameters |
|
---------- |
|
data : torch.Tensor |
|
Audio data of shape (nb, nch, nt). |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Filtered audio data. |
|
""" |
|
if not torch.is_tensor(data): |
|
data = torch.from_numpy(data).float() |
|
else: |
|
data = data.float() |
|
|
|
input_data = copy.copy(data) |
|
|
|
|
|
if input_data.ndim < 2: |
|
input_data = input_data.unsqueeze(-1) |
|
if input_data.ndim < 3: |
|
input_data = input_data.unsqueeze(0) |
|
|
|
nb, nt, nch = input_data.shape |
|
|
|
|
|
|
|
input_data = self.apply_filter(input_data) |
|
|
|
G = self.G |
|
T_g = self.block_size |
|
Gamma_a = -70.0 |
|
|
|
unfolded = self._unfold(input_data) |
|
|
|
z = (1.0 / (T_g * self.rate)) * unfolded.square().sum(2) |
|
l = -0.691 + 10.0 * torch.log10((G[None, :nch, None] * z).sum(1, keepdim=True)) |
|
l = l.expand_as(z) |
|
|
|
|
|
z_avg_gated = z |
|
z_avg_gated[l <= Gamma_a] = 0 |
|
masked = l > Gamma_a |
|
z_avg_gated = z_avg_gated.sum(2) / masked.sum(2) |
|
|
|
|
|
Gamma_r = ( |
|
-0.691 + 10.0 * torch.log10((z_avg_gated * G[None, :nch]).sum(-1)) - 10.0 |
|
) |
|
Gamma_r = Gamma_r[:, None, None] |
|
Gamma_r = Gamma_r.expand(nb, nch, l.shape[-1]) |
|
|
|
|
|
z_avg_gated = z |
|
z_avg_gated[l <= Gamma_a] = 0 |
|
z_avg_gated[l <= Gamma_r] = 0 |
|
masked = (l > Gamma_a) * (l > Gamma_r) |
|
z_avg_gated = z_avg_gated.sum(2) / masked.sum(2) |
|
|
|
|
|
|
|
z_avg_gated = torch.where( |
|
z_avg_gated.isnan(), torch.zeros_like(z_avg_gated), z_avg_gated |
|
) |
|
z_avg_gated[z_avg_gated == float("inf")] = float(np.finfo(np.float32).max) |
|
z_avg_gated[z_avg_gated == -float("inf")] = float(np.finfo(np.float32).min) |
|
|
|
LUFS = -0.691 + 10.0 * torch.log10((G[None, :nch] * z_avg_gated).sum(1)) |
|
return LUFS.float() |
|
|
|
@property |
|
def filter_class(self): |
|
return self._filter_class |
|
|
|
@filter_class.setter |
|
def filter_class(self, value): |
|
from pyloudnorm import Meter |
|
|
|
meter = Meter(self.rate) |
|
meter.filter_class = value |
|
self._filter_class = value |
|
self._filters = meter._filters |
|
|
|
|
|
class LoudnessMixin: |
|
_loudness = None |
|
MIN_LOUDNESS = -70 |
|
"""Minimum loudness possible.""" |
|
|
|
def loudness( |
|
self, filter_class: str = "K-weighting", block_size: float = 0.400, **kwargs |
|
): |
|
"""Calculates loudness using an implementation of ITU-R BS.1770-4. |
|
Allows control over gating block size and frequency weighting filters for |
|
additional control. Measure the integrated gated loudness of a signal. |
|
|
|
API is derived from PyLoudnorm, but this implementation is ported to PyTorch |
|
and is tensorized across batches. When on GPU, an FIR approximation of the IIR |
|
filters is used to compute loudness for speed. |
|
|
|
Uses the weighting filters and block size defined by the meter |
|
the integrated loudness is measured based upon the gating algorithm |
|
defined in the ITU-R BS.1770-4 specification. |
|
|
|
Parameters |
|
---------- |
|
filter_class : str, optional |
|
Class of weighting filter used. |
|
K-weighting' (default), 'Fenton/Lee 1' |
|
'Fenton/Lee 2', 'Dash et al.' |
|
by default "K-weighting" |
|
block_size : float, optional |
|
Gating block size in seconds, by default 0.400 |
|
kwargs : dict, optional |
|
Keyword arguments to :py:func:`audiotools.core.loudness.Meter`. |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Loudness of audio data. |
|
""" |
|
if self._loudness is not None: |
|
return self._loudness.to(self.device) |
|
original_length = self.signal_length |
|
if self.signal_duration < 0.5: |
|
pad_len = int((0.5 - self.signal_duration) * self.sample_rate) |
|
self.zero_pad(0, pad_len) |
|
|
|
|
|
meter = Meter( |
|
self.sample_rate, filter_class=filter_class, block_size=block_size, **kwargs |
|
) |
|
meter = meter.to(self.device) |
|
|
|
loudness = meter.integrated_loudness(self.audio_data.permute(0, 2, 1)) |
|
self.truncate_samples(original_length) |
|
min_loudness = ( |
|
torch.ones_like(loudness, device=loudness.device) * self.MIN_LOUDNESS |
|
) |
|
self._loudness = torch.maximum(loudness, min_loudness) |
|
|
|
return self._loudness.to(self.device) |
|
|