File size: 8,764 Bytes
fe62fb4 27d24be fe62fb4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
import math
import typing as tp
from dataclasses import dataclass, field
import typing as tp
import torch
from torch import nn
from einops import rearrange
import torch.nn.functional as F
@dataclass
class QuantizedResult:
x: torch.Tensor
codes: torch.Tensor
bandwidth: torch.Tensor # bandwidth in kb/s used, per batch item.
penalty: tp.Optional[torch.Tensor] = None
metrics: dict = field(default_factory=dict)
class EuclideanCodebook(nn.Module):
def __init__(
self,
dim,
codebook_size,
kmeans_init=False,
kmeans_iters=10,
decay=0.8,
epsilon=1e-5,
):
super().__init__()
self.decay=decay
init_fn=uniform_init if not kmeans_init else torch.zeros
embed = init_fn(codebook_size, dim)
self.codebook_size = codebook_size
self.kmeans_iters = kmeans_iters
self.epsilon = epsilon
self.register_buffer("inited", torch.Tensor([not kmeans_init]))
self.register_buffer("cluster_size", torch.zeros(codebook_size))
self.register_buffer("embed", embed)
self.register_buffer("embed_avg", embed.clone())
@torch.jit.ignore
def init_embed_(self, data):
if self.inited:
return
embed, cluster_size = kmeans(data, self.codebook_size, self.kmeans_iters)
self.embed.data.copy_(embed)
self.embed_avg.data.copy_(embed.clone())
self.cluster_size.data.copy_(cluster_size)
self.inited.data.copy_(torch.Tensor([True]))
# Make sure all buffers across workers are in sync after initialization
# flashy.distrib.broadcast_tensors(self.buffers()) # brodcast param values to all GPUS
def postprocess_emb(self, embed_ind, shape):
return embed_ind.view(*shape[:-1])
def dequantize(self, embed_ind):
# embed_ind[0] = 2048
# print('MAX MAX MAX', embed_ind.shape)
quantize = F.embedding(embed_ind, self.embed)
# print('\n\nDE QUANT\n\n', quantize.shape) # (1, 35, 128) -> also arrives here for special_token
return quantize
def decode(self, embed_ind):
quantize = self.dequantize(embed_ind)
return quantize
class VectorQuantization(nn.Module):
def __init__(
self,
dim,
codebook_size,
codebook_dim=None,
decay=0.8,
epsilon=1e-5,
kmeans_init=False,
kmeans_iters=10,
channels_last=False,
):
super().__init__()
# _codebook_dim: int = default(codebook_dim, dim)
_codebook_dim = codebook_dim if codebook_dim is not None else dim
requires_projection = _codebook_dim != dim
self.project_in = (nn.Linear(dim, _codebook_dim) if requires_projection else nn.Identity())
self.project_out = (nn.Linear(_codebook_dim, dim) if requires_projection else nn.Identity())
self._codebook = EuclideanCodebook(dim=_codebook_dim,
codebook_size=codebook_size,
kmeans_init=kmeans_init,
kmeans_iters=kmeans_iters,
decay=decay,
epsilon=epsilon)
self.codebook_size = codebook_size
self.channels_last = channels_last
@property
def codebook(self):
return self._codebook.embed
@property
def inited(self):
return self._codebook.inited
def _postprocess(self, quantize):
if not self.channels_last:
quantize = rearrange(quantize, "b n d -> b d n")
return quantize
def decode(self, embed_ind):
quantize = self._codebook.decode(embed_ind)
quantize = self.project_out(quantize)
quantize = self._postprocess(quantize)
return quantize
class ResidualVectorQuantization(nn.Module):
"""Residual vector quantization implementation.
Follows Algorithm 1. in https://arxiv.org/pdf/2107.03312.pdf
"""
def __init__(self, *, num_quantizers, **kwargs):
super().__init__()
self.layers = nn.ModuleList(
[VectorQuantization(**kwargs) for _ in range(num_quantizers)]
)
def decode(self, q_indices: torch.Tensor) -> torch.Tensor:
quantized_out = torch.tensor(0.0, device=q_indices.device)
for i, indices in enumerate(q_indices):
layer = self.layers[i]
quantized = layer.decode(indices)
quantized_out = quantized_out + quantized
return quantized_out
# ------------------------------------- END core_vq.py
class ResidualVectorQuantizer(nn.Module):
"""Residual Vector Quantizer.
Args:
dimension (int): Dimension of the codebooks.
n_q (int): Number of residual vector quantizers used.
q_dropout (bool): Random quantizer drop out at train time.
bins (int): Codebook size.
decay (float): Decay for exponential moving average over the codebooks.
kmeans_init (bool): Whether to use kmeans to initialize the codebooks.
kmeans_iters (int): Number of iterations used for kmeans initialization.
threshold_ema_dead_code (int): Threshold for dead code expiration. Replace any codes
that have an exponential moving average cluster size less than the specified threshold with
randomly selected vector from the current batch.
orthogonal_reg_weight (float): Orthogonal regularization weights.
orthogonal_reg_active_codes_only (bool): Apply orthogonal regularization only on active codes.
orthogonal_reg_max_codes (optional int): Maximum number of codes to consider.
for orthogonal regularization.
"""
def __init__(
self,
dimension: int = 256,
n_q: int = 8,
q_dropout: bool = False,
bins: int = 1024,
decay: float = 0.99,
kmeans_init: bool = True,
kmeans_iters: int = 10,
threshold_ema_dead_code: int = 2,
orthogonal_reg_weight: float = 0.0,
orthogonal_reg_active_codes_only: bool = False,
orthogonal_reg_max_codes: tp.Optional[int] = None,
):
super().__init__()
self.max_n_q = n_q
self.n_q = n_q
self.q_dropout = q_dropout
self.dimension = dimension
self.bins = bins
self.decay = decay
self.kmeans_init = kmeans_init
self.kmeans_iters = kmeans_iters
self.threshold_ema_dead_code = threshold_ema_dead_code
self.orthogonal_reg_weight = orthogonal_reg_weight
self.orthogonal_reg_active_codes_only = orthogonal_reg_active_codes_only
self.orthogonal_reg_max_codes = orthogonal_reg_max_codes
print(f' {kmeans_init=}\n\n\n\n')
self.vq = ResidualVectorQuantization(
dim=self.dimension,
codebook_size=self.bins,
num_quantizers=self.n_q,
decay=self.decay,
kmeans_init=self.kmeans_init,
kmeans_iters=self.kmeans_iters,
channels_last=False
)
def forward(self, x: torch.Tensor, frame_rate: int):
n_q = self.n_q
if self.training and self.q_dropout:
n_q = int(torch.randint(1, self.n_q + 1, (1,)).item())
bw_per_q = math.log2(self.bins) * frame_rate / 1000
quantized, codes, commit_loss = self.vq(x, n_q=n_q)
codes = codes.transpose(0, 1)
# codes is [B, K, T], with T frames, K nb of codebooks.
bw = torch.tensor(n_q * bw_per_q).to(x)
return QuantizedResult(quantized, codes, bw, penalty=torch.mean(commit_loss))
def encode(self, x: torch.Tensor) -> torch.Tensor:
"""Encode a given input tensor with the specified frame rate at the given bandwidth.
The RVQ encode method sets the appropriate number of quantizer to use
and returns indices for each quantizer.
"""
n_q = self.n_q
codes = self.vq.encode(x, n_q=n_q)
codes = codes.transpose(0, 1)
# codes is [B, K, T], with T frames, K nb of codebooks.
return codes
def decode(self, codes: torch.Tensor) -> torch.Tensor:
"""Decode the given codes to the quantized representation."""
# codes is [B, K, T], with T frames, K nb of codebooks, vq.decode expects [K, B, T].
codes = codes.transpose(0, 1)
quantized = self.vq.decode(codes)
return quantized
@property
def total_codebooks(self):
return self.max_n_q
@property
def num_codebooks(self):
return self.n_q
def set_num_codebooks(self, n: int):
assert n > 0 and n <= self.max_n_q
self.n_q = n
|