|
import typing |
|
|
|
import julius |
|
import numpy as np |
|
import torch |
|
import torchaudio |
|
|
|
from . import util |
|
|
|
|
|
class EffectMixin: |
|
GAIN_FACTOR = np.log(10) / 20 |
|
"""Gain factor for converting between amplitude and decibels.""" |
|
CODEC_PRESETS = { |
|
"8-bit": {"format": "wav", "encoding": "ULAW", "bits_per_sample": 8}, |
|
"GSM-FR": {"format": "gsm"}, |
|
"MP3": {"format": "mp3", "compression": -9}, |
|
"Vorbis": {"format": "vorbis", "compression": -1}, |
|
"Ogg": { |
|
"format": "ogg", |
|
"compression": -1, |
|
}, |
|
"Amr-nb": {"format": "amr-nb"}, |
|
} |
|
"""Presets for applying codecs via torchaudio.""" |
|
|
|
def mix( |
|
self, |
|
other, |
|
snr: typing.Union[torch.Tensor, np.ndarray, float] = 10, |
|
other_eq: typing.Union[torch.Tensor, np.ndarray] = None, |
|
): |
|
"""Mixes noise with signal at specified |
|
signal-to-noise ratio. Optionally, the |
|
other signal can be equalized in-place. |
|
|
|
|
|
Parameters |
|
---------- |
|
other : AudioSignal |
|
AudioSignal object to mix with. |
|
snr : typing.Union[torch.Tensor, np.ndarray, float], optional |
|
Signal to noise ratio, by default 10 |
|
other_eq : typing.Union[torch.Tensor, np.ndarray], optional |
|
EQ curve to apply to other signal, if any, by default None |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
In-place modification of AudioSignal. |
|
""" |
|
snr = util.ensure_tensor(snr).to(self.device) |
|
|
|
pad_len = max(0, self.signal_length - other.signal_length) |
|
other.zero_pad(0, pad_len) |
|
other.truncate_samples(self.signal_length) |
|
if other_eq is not None: |
|
other = other.equalizer(other_eq) |
|
|
|
tgt_loudness = self.loudness() - snr |
|
other = other.normalize(tgt_loudness) |
|
|
|
self.audio_data = self.audio_data + other.audio_data |
|
return self |
|
|
|
def convolve(self, other, start_at_max: bool = True): |
|
"""Convolves self with other. |
|
This function uses FFTs to do the convolution. |
|
|
|
Parameters |
|
---------- |
|
other : AudioSignal |
|
Signal to convolve with. |
|
start_at_max : bool, optional |
|
Whether to start at the max value of other signal, to |
|
avoid inducing delays, by default True |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Convolved signal, in-place. |
|
""" |
|
from . import AudioSignal |
|
|
|
pad_len = self.signal_length - other.signal_length |
|
|
|
if pad_len > 0: |
|
other.zero_pad(0, pad_len) |
|
else: |
|
other.truncate_samples(self.signal_length) |
|
|
|
if start_at_max: |
|
|
|
|
|
|
|
idx = other.audio_data.abs().argmax(axis=-1) |
|
irs = torch.zeros_like(other.audio_data) |
|
for i in range(other.batch_size): |
|
irs[i] = torch.roll(other.audio_data[i], -idx[i].item(), -1) |
|
other = AudioSignal(irs, other.sample_rate) |
|
|
|
delta = torch.zeros_like(other.audio_data) |
|
delta[..., 0] = 1 |
|
|
|
length = self.signal_length |
|
delta_fft = torch.fft.rfft(delta, length) |
|
other_fft = torch.fft.rfft(other.audio_data, length) |
|
self_fft = torch.fft.rfft(self.audio_data, length) |
|
|
|
convolved_fft = other_fft * self_fft |
|
convolved_audio = torch.fft.irfft(convolved_fft, length) |
|
|
|
delta_convolved_fft = other_fft * delta_fft |
|
delta_audio = torch.fft.irfft(delta_convolved_fft, length) |
|
|
|
|
|
delta_max = delta_audio.abs().max(dim=-1, keepdims=True)[0] |
|
scale = 1 / delta_max.clamp(1e-5) |
|
convolved_audio = convolved_audio * scale |
|
|
|
self.audio_data = convolved_audio |
|
|
|
return self |
|
|
|
def apply_ir( |
|
self, |
|
ir, |
|
drr: typing.Union[torch.Tensor, np.ndarray, float] = None, |
|
ir_eq: typing.Union[torch.Tensor, np.ndarray] = None, |
|
use_original_phase: bool = False, |
|
): |
|
"""Applies an impulse response to the signal. If ` is`ir_eq`` |
|
is specified, the impulse response is equalized before |
|
it is applied, using the given curve. |
|
|
|
Parameters |
|
---------- |
|
ir : AudioSignal |
|
Impulse response to convolve with. |
|
drr : typing.Union[torch.Tensor, np.ndarray, float], optional |
|
Direct-to-reverberant ratio that impulse response will be |
|
altered to, if specified, by default None |
|
ir_eq : typing.Union[torch.Tensor, np.ndarray], optional |
|
Equalization that will be applied to impulse response |
|
if specified, by default None |
|
use_original_phase : bool, optional |
|
Whether to use the original phase, instead of the convolved |
|
phase, by default False |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Signal with impulse response applied to it |
|
""" |
|
if ir_eq is not None: |
|
ir = ir.equalizer(ir_eq) |
|
if drr is not None: |
|
ir = ir.alter_drr(drr) |
|
|
|
|
|
max_spk = self.audio_data.abs().max(dim=-1, keepdims=True).values |
|
|
|
|
|
|
|
phase = self.phase |
|
self.convolve(ir) |
|
|
|
|
|
if use_original_phase: |
|
self.stft() |
|
self.stft_data = self.magnitude * torch.exp(1j * phase) |
|
self.istft() |
|
|
|
|
|
max_transformed = self.audio_data.abs().max(dim=-1, keepdims=True).values |
|
scale_factor = max_spk.clamp(1e-8) / max_transformed.clamp(1e-8) |
|
self = self * scale_factor |
|
|
|
return self |
|
|
|
def ensure_max_of_audio(self, max: float = 1.0): |
|
"""Ensures that ``abs(audio_data) <= max``. |
|
|
|
Parameters |
|
---------- |
|
max : float, optional |
|
Max absolute value of signal, by default 1.0 |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Signal with values scaled between -max and max. |
|
""" |
|
peak = self.audio_data.abs().max(dim=-1, keepdims=True)[0] |
|
peak_gain = torch.ones_like(peak) |
|
peak_gain[peak > max] = max / peak[peak > max] |
|
self.audio_data = self.audio_data * peak_gain |
|
return self |
|
|
|
def normalize(self, db: typing.Union[torch.Tensor, np.ndarray, float] = -24.0): |
|
"""Normalizes the signal's volume to the specified db, in LUFS. |
|
This is GPU-compatible, making for very fast loudness normalization. |
|
|
|
Parameters |
|
---------- |
|
db : typing.Union[torch.Tensor, np.ndarray, float], optional |
|
Loudness to normalize to, by default -24.0 |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Normalized audio signal. |
|
""" |
|
db = util.ensure_tensor(db).to(self.device) |
|
ref_db = self.loudness() |
|
gain = db - ref_db |
|
gain = torch.exp(gain * self.GAIN_FACTOR) |
|
|
|
self.audio_data = self.audio_data * gain[:, None, None] |
|
return self |
|
|
|
def volume_change(self, db: typing.Union[torch.Tensor, np.ndarray, float]): |
|
"""Change volume of signal by some amount, in dB. |
|
|
|
Parameters |
|
---------- |
|
db : typing.Union[torch.Tensor, np.ndarray, float] |
|
Amount to change volume by. |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Signal at new volume. |
|
""" |
|
db = util.ensure_tensor(db, ndim=1).to(self.device) |
|
gain = torch.exp(db * self.GAIN_FACTOR) |
|
self.audio_data = self.audio_data * gain[:, None, None] |
|
return self |
|
|
|
def _to_2d(self): |
|
waveform = self.audio_data.reshape(-1, self.signal_length) |
|
return waveform |
|
|
|
def _to_3d(self, waveform): |
|
return waveform.reshape(self.batch_size, self.num_channels, -1) |
|
|
|
def pitch_shift(self, n_semitones: int, quick: bool = True): |
|
"""Pitch shift the signal. All items in the batch |
|
get the same pitch shift. |
|
|
|
Parameters |
|
---------- |
|
n_semitones : int |
|
How many semitones to shift the signal by. |
|
quick : bool, optional |
|
Using quick pitch shifting, by default True |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Pitch shifted audio signal. |
|
""" |
|
device = self.device |
|
effects = [ |
|
["pitch", str(n_semitones * 100)], |
|
["rate", str(self.sample_rate)], |
|
] |
|
if quick: |
|
effects[0].insert(1, "-q") |
|
|
|
waveform = self._to_2d().cpu() |
|
waveform, sample_rate = torchaudio.sox_effects.apply_effects_tensor( |
|
waveform, self.sample_rate, effects, channels_first=True |
|
) |
|
self.sample_rate = sample_rate |
|
self.audio_data = self._to_3d(waveform) |
|
return self.to(device) |
|
|
|
def time_stretch(self, factor: float, quick: bool = True): |
|
"""Time stretch the audio signal. |
|
|
|
Parameters |
|
---------- |
|
factor : float |
|
Factor by which to stretch the AudioSignal. Typically |
|
between 0.8 and 1.2. |
|
quick : bool, optional |
|
Whether to use quick time stretching, by default True |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Time-stretched AudioSignal. |
|
""" |
|
device = self.device |
|
effects = [ |
|
["tempo", str(factor)], |
|
["rate", str(self.sample_rate)], |
|
] |
|
if quick: |
|
effects[0].insert(1, "-q") |
|
|
|
waveform = self._to_2d().cpu() |
|
waveform, sample_rate = torchaudio.sox_effects.apply_effects_tensor( |
|
waveform, self.sample_rate, effects, channels_first=True |
|
) |
|
self.sample_rate = sample_rate |
|
self.audio_data = self._to_3d(waveform) |
|
return self.to(device) |
|
|
|
def apply_codec( |
|
self, |
|
preset: str = None, |
|
format: str = "wav", |
|
encoding: str = None, |
|
bits_per_sample: int = None, |
|
compression: int = None, |
|
): |
|
"""Applies an audio codec to the signal. |
|
|
|
Parameters |
|
---------- |
|
preset : str, optional |
|
One of the keys in ``self.CODEC_PRESETS``, by default None |
|
format : str, optional |
|
Format for audio codec, by default "wav" |
|
encoding : str, optional |
|
Encoding to use, by default None |
|
bits_per_sample : int, optional |
|
How many bits per sample, by default None |
|
compression : int, optional |
|
Compression amount of codec, by default None |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
AudioSignal with codec applied. |
|
|
|
Raises |
|
------ |
|
ValueError |
|
If preset is not in ``self.CODEC_PRESETS``, an error |
|
is thrown. |
|
""" |
|
torchaudio_version_070 = "0.7" in torchaudio.__version__ |
|
if torchaudio_version_070: |
|
return self |
|
|
|
kwargs = { |
|
"format": format, |
|
"encoding": encoding, |
|
"bits_per_sample": bits_per_sample, |
|
"compression": compression, |
|
} |
|
|
|
if preset is not None: |
|
if preset in self.CODEC_PRESETS: |
|
kwargs = self.CODEC_PRESETS[preset] |
|
else: |
|
raise ValueError( |
|
f"Unknown preset: {preset}. " |
|
f"Known presets: {list(self.CODEC_PRESETS.keys())}" |
|
) |
|
|
|
waveform = self._to_2d() |
|
if kwargs["format"] in ["vorbis", "mp3", "ogg", "amr-nb"]: |
|
|
|
augmented = torch.cat( |
|
[ |
|
torchaudio.functional.apply_codec( |
|
waveform[i][None, :], self.sample_rate, **kwargs |
|
) |
|
for i in range(waveform.shape[0]) |
|
], |
|
dim=0, |
|
) |
|
else: |
|
augmented = torchaudio.functional.apply_codec( |
|
waveform, self.sample_rate, **kwargs |
|
) |
|
augmented = self._to_3d(augmented) |
|
|
|
self.audio_data = augmented |
|
return self |
|
|
|
def mel_filterbank(self, n_bands: int): |
|
"""Breaks signal into mel bands. |
|
|
|
Parameters |
|
---------- |
|
n_bands : int |
|
Number of mel bands to use. |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Mel-filtered bands, with last axis being the band index. |
|
""" |
|
filterbank = ( |
|
julius.SplitBands(self.sample_rate, n_bands).float().to(self.device) |
|
) |
|
filtered = filterbank(self.audio_data) |
|
return filtered.permute(1, 2, 3, 0) |
|
|
|
def equalizer(self, db: typing.Union[torch.Tensor, np.ndarray]): |
|
"""Applies a mel-spaced equalizer to the audio signal. |
|
|
|
Parameters |
|
---------- |
|
db : typing.Union[torch.Tensor, np.ndarray] |
|
EQ curve to apply. |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
AudioSignal with equalization applied. |
|
""" |
|
db = util.ensure_tensor(db) |
|
n_bands = db.shape[-1] |
|
fbank = self.mel_filterbank(n_bands) |
|
|
|
|
|
if db.ndim == 2: |
|
if db.shape[0] != 1: |
|
assert db.shape[0] == fbank.shape[0] |
|
else: |
|
db = db.unsqueeze(0) |
|
|
|
weights = (10**db).to(self.device).float() |
|
fbank = fbank * weights[:, None, None, :] |
|
eq_audio_data = fbank.sum(-1) |
|
self.audio_data = eq_audio_data |
|
return self |
|
|
|
def clip_distortion( |
|
self, clip_percentile: typing.Union[torch.Tensor, np.ndarray, float] |
|
): |
|
"""Clips the signal at a given percentile. The higher it is, |
|
the lower the threshold for clipping. |
|
|
|
Parameters |
|
---------- |
|
clip_percentile : typing.Union[torch.Tensor, np.ndarray, float] |
|
Values are between 0.0 to 1.0. Typical values are 0.1 or below. |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Audio signal with clipped audio data. |
|
""" |
|
clip_percentile = util.ensure_tensor(clip_percentile, ndim=1) |
|
min_thresh = torch.quantile(self.audio_data, clip_percentile / 2, dim=-1) |
|
max_thresh = torch.quantile(self.audio_data, 1 - (clip_percentile / 2), dim=-1) |
|
|
|
nc = self.audio_data.shape[1] |
|
min_thresh = min_thresh[:, :nc, :] |
|
max_thresh = max_thresh[:, :nc, :] |
|
|
|
self.audio_data = self.audio_data.clamp(min_thresh, max_thresh) |
|
|
|
return self |
|
|
|
def quantization( |
|
self, quantization_channels: typing.Union[torch.Tensor, np.ndarray, int] |
|
): |
|
"""Applies quantization to the input waveform. |
|
|
|
Parameters |
|
---------- |
|
quantization_channels : typing.Union[torch.Tensor, np.ndarray, int] |
|
Number of evenly spaced quantization channels to quantize |
|
to. |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Quantized AudioSignal. |
|
""" |
|
quantization_channels = util.ensure_tensor(quantization_channels, ndim=3) |
|
|
|
x = self.audio_data |
|
x = (x + 1) / 2 |
|
x = x * quantization_channels |
|
x = x.floor() |
|
x = x / quantization_channels |
|
x = 2 * x - 1 |
|
|
|
residual = (self.audio_data - x).detach() |
|
self.audio_data = self.audio_data - residual |
|
return self |
|
|
|
def mulaw_quantization( |
|
self, quantization_channels: typing.Union[torch.Tensor, np.ndarray, int] |
|
): |
|
"""Applies mu-law quantization to the input waveform. |
|
|
|
Parameters |
|
---------- |
|
quantization_channels : typing.Union[torch.Tensor, np.ndarray, int] |
|
Number of mu-law spaced quantization channels to quantize |
|
to. |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Quantized AudioSignal. |
|
""" |
|
mu = quantization_channels - 1.0 |
|
mu = util.ensure_tensor(mu, ndim=3) |
|
|
|
x = self.audio_data |
|
|
|
|
|
x = torch.sign(x) * torch.log1p(mu * torch.abs(x)) / torch.log1p(mu) |
|
x = ((x + 1) / 2 * mu + 0.5).to(torch.int64) |
|
|
|
|
|
x = (x / mu) * 2 - 1.0 |
|
x = torch.sign(x) * (torch.exp(torch.abs(x) * torch.log1p(mu)) - 1.0) / mu |
|
|
|
residual = (self.audio_data - x).detach() |
|
self.audio_data = self.audio_data - residual |
|
return self |
|
|
|
def __matmul__(self, other): |
|
return self.convolve(other) |
|
|
|
|
|
class ImpulseResponseMixin: |
|
"""These functions are generally only used with AudioSignals that are derived |
|
from impulse responses, not other sources like music or speech. These methods |
|
are used to replicate the data augmentation described in [1]. |
|
|
|
1. Bryan, Nicholas J. "Impulse response data augmentation and deep |
|
neural networks for blind room acoustic parameter estimation." |
|
ICASSP 2020-2020 IEEE International Conference on Acoustics, |
|
Speech and Signal Processing (ICASSP). IEEE, 2020. |
|
""" |
|
|
|
def decompose_ir(self): |
|
"""Decomposes an impulse response into early and late |
|
field responses. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
td = torch.argmax(self.audio_data, dim=-1, keepdim=True) |
|
t0 = int(self.sample_rate * 0.0025) |
|
|
|
idx = torch.arange(self.audio_data.shape[-1], device=self.device)[None, None, :] |
|
idx = idx.expand(self.batch_size, -1, -1) |
|
early_idx = (idx >= td - t0) * (idx <= td + t0) |
|
|
|
early_response = torch.zeros_like(self.audio_data, device=self.device) |
|
early_response[early_idx] = self.audio_data[early_idx] |
|
|
|
late_idx = ~early_idx |
|
late_field = torch.zeros_like(self.audio_data, device=self.device) |
|
late_field[late_idx] = self.audio_data[late_idx] |
|
|
|
|
|
|
|
|
|
|
|
|
|
window = torch.zeros_like(self.audio_data, device=self.device) |
|
for idx in range(self.batch_size): |
|
window_idx = early_idx[idx, 0].nonzero() |
|
window[idx, ..., window_idx] = self.get_window( |
|
"hann", window_idx.shape[-1], self.device |
|
) |
|
return early_response, late_field, window |
|
|
|
def measure_drr(self): |
|
"""Measures the direct-to-reverberant ratio of the impulse |
|
response. |
|
|
|
Returns |
|
------- |
|
float |
|
Direct-to-reverberant ratio |
|
""" |
|
early_response, late_field, _ = self.decompose_ir() |
|
num = (early_response**2).sum(dim=-1) |
|
den = (late_field**2).sum(dim=-1) |
|
drr = 10 * torch.log10(num / den) |
|
return drr |
|
|
|
@staticmethod |
|
def solve_alpha(early_response, late_field, wd, target_drr): |
|
"""Used to solve for the alpha value, which is used |
|
to alter the drr. |
|
""" |
|
|
|
|
|
|
|
|
|
wd_sq = wd**2 |
|
wd_sq_1 = (1 - wd) ** 2 |
|
e_sq = early_response**2 |
|
l_sq = late_field**2 |
|
a = (wd_sq * e_sq).sum(dim=-1) |
|
b = (2 * (1 - wd) * wd * e_sq).sum(dim=-1) |
|
c = (wd_sq_1 * e_sq).sum(dim=-1) - torch.pow(10, target_drr / 10) * l_sq.sum( |
|
dim=-1 |
|
) |
|
|
|
expr = ((b**2) - 4 * a * c).sqrt() |
|
alpha = torch.maximum( |
|
(-b - expr) / (2 * a), |
|
(-b + expr) / (2 * a), |
|
) |
|
return alpha |
|
|
|
def alter_drr(self, drr: typing.Union[torch.Tensor, np.ndarray, float]): |
|
"""Alters the direct-to-reverberant ratio of the impulse response. |
|
|
|
Parameters |
|
---------- |
|
drr : typing.Union[torch.Tensor, np.ndarray, float] |
|
Direct-to-reverberant ratio that impulse response will be |
|
altered to, if specified, by default None |
|
|
|
Returns |
|
------- |
|
AudioSignal |
|
Altered impulse response. |
|
""" |
|
drr = util.ensure_tensor(drr, 2, self.batch_size).to(self.device) |
|
|
|
early_response, late_field, window = self.decompose_ir() |
|
alpha = self.solve_alpha(early_response, late_field, window, drr) |
|
min_alpha = ( |
|
late_field.abs().max(dim=-1)[0] / early_response.abs().max(dim=-1)[0] |
|
) |
|
alpha = torch.maximum(alpha, min_alpha)[..., None] |
|
|
|
aug_ir_data = ( |
|
alpha * window * early_response |
|
+ ((1 - window) * early_response) |
|
+ late_field |
|
) |
|
self.audio_data = aug_ir_data |
|
self.ensure_max_of_audio() |
|
return self |
|
|