Spaces:
Running
on
Zero
Running
on
Zero
# Adapted from Open-Sora-Plan | |
# This source code is licensed under the license found in the | |
# LICENSE file in the root directory of this source tree. | |
# -------------------------------------------------------- | |
# References: | |
# Open-Sora-Plan: https://github.com/PKU-YuanGroup/Open-Sora-Plan | |
# -------------------------------------------------------- | |
import functools | |
import hashlib | |
import os | |
from collections import namedtuple | |
import requests | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from einops import rearrange | |
from torch import nn | |
from torchvision import models | |
from tqdm import tqdm | |
from videosys.models.open_sora_plan.modules.normalize import ActNorm | |
URL_MAP = {"vgg_lpips": "https://heibox.uni-heidelberg.de/f/607503859c864bc1b30b/?dl=1"} | |
CKPT_MAP = {"vgg_lpips": "vgg.pth"} | |
MD5_MAP = {"vgg_lpips": "d507d7349b931f0638a25a48a722f98a"} | |
def download(url, local_path, chunk_size=1024): | |
os.makedirs(os.path.split(local_path)[0], exist_ok=True) | |
with requests.get(url, stream=True) as r: | |
total_size = int(r.headers.get("content-length", 0)) | |
with tqdm(total=total_size, unit="B", unit_scale=True) as pbar: | |
with open(local_path, "wb") as f: | |
for data in r.iter_content(chunk_size=chunk_size): | |
if data: | |
f.write(data) | |
pbar.update(chunk_size) | |
def md5_hash(path): | |
with open(path, "rb") as f: | |
content = f.read() | |
return hashlib.md5(content).hexdigest() | |
def get_ckpt_path(name, root, check=False): | |
assert name in URL_MAP | |
path = os.path.join(root, CKPT_MAP[name]) | |
if not os.path.exists(path) or (check and not md5_hash(path) == MD5_MAP[name]): | |
print("Downloading {} model from {} to {}".format(name, URL_MAP[name], path)) | |
download(URL_MAP[name], path) | |
md5 = md5_hash(path) | |
assert md5 == MD5_MAP[name], md5 | |
return path | |
class LPIPS(nn.Module): | |
# Learned perceptual metric | |
def __init__(self, use_dropout=True): | |
super().__init__() | |
self.scaling_layer = ScalingLayer() | |
self.chns = [64, 128, 256, 512, 512] # vg16 features | |
self.net = vgg16(pretrained=True, requires_grad=False) | |
self.lin0 = NetLinLayer(self.chns[0], use_dropout=use_dropout) | |
self.lin1 = NetLinLayer(self.chns[1], use_dropout=use_dropout) | |
self.lin2 = NetLinLayer(self.chns[2], use_dropout=use_dropout) | |
self.lin3 = NetLinLayer(self.chns[3], use_dropout=use_dropout) | |
self.lin4 = NetLinLayer(self.chns[4], use_dropout=use_dropout) | |
self.load_from_pretrained() | |
for param in self.parameters(): | |
param.requires_grad = False | |
def load_from_pretrained(self, name="vgg_lpips"): | |
ckpt = get_ckpt_path(name, "taming/modules/autoencoder/lpips") | |
self.load_state_dict(torch.load(ckpt, map_location=torch.device("cpu")), strict=False) | |
print("loaded pretrained LPIPS loss from {}".format(ckpt)) | |
def from_pretrained(cls, name="vgg_lpips"): | |
if name != "vgg_lpips": | |
raise NotImplementedError | |
model = cls() | |
ckpt = get_ckpt_path(name) | |
model.load_state_dict(torch.load(ckpt, map_location=torch.device("cpu")), strict=False) | |
return model | |
def forward(self, input, target): | |
in0_input, in1_input = (self.scaling_layer(input), self.scaling_layer(target)) | |
outs0, outs1 = self.net(in0_input), self.net(in1_input) | |
feats0, feats1, diffs = {}, {}, {} | |
lins = [self.lin0, self.lin1, self.lin2, self.lin3, self.lin4] | |
for kk in range(len(self.chns)): | |
feats0[kk], feats1[kk] = normalize_tensor(outs0[kk]), normalize_tensor(outs1[kk]) | |
diffs[kk] = (feats0[kk] - feats1[kk]) ** 2 | |
res = [spatial_average(lins[kk].model(diffs[kk]), keepdim=True) for kk in range(len(self.chns))] | |
val = res[0] | |
for l in range(1, len(self.chns)): | |
val += res[l] | |
return val | |
class ScalingLayer(nn.Module): | |
def __init__(self): | |
super(ScalingLayer, self).__init__() | |
self.register_buffer("shift", torch.Tensor([-0.030, -0.088, -0.188])[None, :, None, None]) | |
self.register_buffer("scale", torch.Tensor([0.458, 0.448, 0.450])[None, :, None, None]) | |
def forward(self, inp): | |
return (inp - self.shift) / self.scale | |
class NetLinLayer(nn.Module): | |
"""A single linear layer which does a 1x1 conv""" | |
def __init__(self, chn_in, chn_out=1, use_dropout=False): | |
super(NetLinLayer, self).__init__() | |
layers = ( | |
[ | |
nn.Dropout(), | |
] | |
if (use_dropout) | |
else [] | |
) | |
layers += [ | |
nn.Conv2d(chn_in, chn_out, 1, stride=1, padding=0, bias=False), | |
] | |
self.model = nn.Sequential(*layers) | |
class vgg16(torch.nn.Module): | |
def __init__(self, requires_grad=False, pretrained=True): | |
super(vgg16, self).__init__() | |
vgg_pretrained_features = models.vgg16(pretrained=pretrained).features | |
self.slice1 = torch.nn.Sequential() | |
self.slice2 = torch.nn.Sequential() | |
self.slice3 = torch.nn.Sequential() | |
self.slice4 = torch.nn.Sequential() | |
self.slice5 = torch.nn.Sequential() | |
self.N_slices = 5 | |
for x in range(4): | |
self.slice1.add_module(str(x), vgg_pretrained_features[x]) | |
for x in range(4, 9): | |
self.slice2.add_module(str(x), vgg_pretrained_features[x]) | |
for x in range(9, 16): | |
self.slice3.add_module(str(x), vgg_pretrained_features[x]) | |
for x in range(16, 23): | |
self.slice4.add_module(str(x), vgg_pretrained_features[x]) | |
for x in range(23, 30): | |
self.slice5.add_module(str(x), vgg_pretrained_features[x]) | |
if not requires_grad: | |
for param in self.parameters(): | |
param.requires_grad = False | |
def forward(self, X): | |
h = self.slice1(X) | |
h_relu1_2 = h | |
h = self.slice2(h) | |
h_relu2_2 = h | |
h = self.slice3(h) | |
h_relu3_3 = h | |
h = self.slice4(h) | |
h_relu4_3 = h | |
h = self.slice5(h) | |
h_relu5_3 = h | |
vgg_outputs = namedtuple("VggOutputs", ["relu1_2", "relu2_2", "relu3_3", "relu4_3", "relu5_3"]) | |
out = vgg_outputs(h_relu1_2, h_relu2_2, h_relu3_3, h_relu4_3, h_relu5_3) | |
return out | |
def normalize_tensor(x, eps=1e-10): | |
norm_factor = torch.sqrt(torch.sum(x**2, dim=1, keepdim=True)) | |
return x / (norm_factor + eps) | |
def spatial_average(x, keepdim=True): | |
return x.mean([2, 3], keepdim=keepdim) | |
def weights_init(m): | |
classname = m.__class__.__name__ | |
if classname.find("Conv") != -1: | |
nn.init.normal_(m.weight.data, 0.0, 0.02) | |
elif classname.find("BatchNorm") != -1: | |
nn.init.normal_(m.weight.data, 1.0, 0.02) | |
nn.init.constant_(m.bias.data, 0) | |
def weights_init_conv(m): | |
if hasattr(m, "conv"): | |
m = m.conv | |
classname = m.__class__.__name__ | |
if classname.find("Conv") != -1: | |
nn.init.normal_(m.weight.data, 0.0, 0.02) | |
elif classname.find("BatchNorm") != -1: | |
nn.init.normal_(m.weight.data, 1.0, 0.02) | |
nn.init.constant_(m.bias.data, 0) | |
class NLayerDiscriminator(nn.Module): | |
"""Defines a PatchGAN discriminator as in Pix2Pix | |
--> see https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/blob/master/models/networks.py | |
""" | |
def __init__(self, input_nc=3, ndf=64, n_layers=3, use_actnorm=False): | |
"""Construct a PatchGAN discriminator | |
Parameters: | |
input_nc (int) -- the number of channels in input images | |
ndf (int) -- the number of filters in the last conv layer | |
n_layers (int) -- the number of conv layers in the discriminator | |
norm_layer -- normalization layer | |
""" | |
super(NLayerDiscriminator, self).__init__() | |
if not use_actnorm: | |
norm_layer = nn.BatchNorm2d | |
else: | |
norm_layer = ActNorm | |
if type(norm_layer) == functools.partial: # no need to use bias as BatchNorm2d has affine parameters | |
use_bias = norm_layer.func != nn.BatchNorm2d | |
else: | |
use_bias = norm_layer != nn.BatchNorm2d | |
kw = 4 | |
padw = 1 | |
sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)] | |
nf_mult = 1 | |
nf_mult_prev = 1 | |
for n in range(1, n_layers): # gradually increase the number of filters | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n, 8) | |
sequence += [ | |
nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias), | |
norm_layer(ndf * nf_mult), | |
nn.LeakyReLU(0.2, True), | |
] | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n_layers, 8) | |
sequence += [ | |
nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw, bias=use_bias), | |
norm_layer(ndf * nf_mult), | |
nn.LeakyReLU(0.2, True), | |
] | |
sequence += [ | |
nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw) | |
] # output 1 channel prediction map | |
self.main = nn.Sequential(*sequence) | |
def forward(self, input): | |
"""Standard forward.""" | |
return self.main(input) | |
class NLayerDiscriminator3D(nn.Module): | |
"""Defines a 3D PatchGAN discriminator as in Pix2Pix but for 3D inputs.""" | |
def __init__(self, input_nc=1, ndf=64, n_layers=3, use_actnorm=False): | |
""" | |
Construct a 3D PatchGAN discriminator | |
Parameters: | |
input_nc (int) -- the number of channels in input volumes | |
ndf (int) -- the number of filters in the last conv layer | |
n_layers (int) -- the number of conv layers in the discriminator | |
use_actnorm (bool) -- flag to use actnorm instead of batchnorm | |
""" | |
super(NLayerDiscriminator3D, self).__init__() | |
if not use_actnorm: | |
norm_layer = nn.BatchNorm3d | |
else: | |
raise NotImplementedError("Not implemented.") | |
if type(norm_layer) == functools.partial: | |
use_bias = norm_layer.func != nn.BatchNorm3d | |
else: | |
use_bias = norm_layer != nn.BatchNorm3d | |
kw = 3 | |
padw = 1 | |
sequence = [nn.Conv3d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)] | |
nf_mult = 1 | |
nf_mult_prev = 1 | |
for n in range(1, n_layers): # gradually increase the number of filters | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n, 8) | |
sequence += [ | |
nn.Conv3d( | |
ndf * nf_mult_prev, | |
ndf * nf_mult, | |
kernel_size=(kw, kw, kw), | |
stride=(2 if n == 1 else 1, 2, 2), | |
padding=padw, | |
bias=use_bias, | |
), | |
norm_layer(ndf * nf_mult), | |
nn.LeakyReLU(0.2, True), | |
] | |
nf_mult_prev = nf_mult | |
nf_mult = min(2**n_layers, 8) | |
sequence += [ | |
nn.Conv3d( | |
ndf * nf_mult_prev, ndf * nf_mult, kernel_size=(kw, kw, kw), stride=1, padding=padw, bias=use_bias | |
), | |
norm_layer(ndf * nf_mult), | |
nn.LeakyReLU(0.2, True), | |
] | |
sequence += [ | |
nn.Conv3d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw) | |
] # output 1 channel prediction map | |
self.main = nn.Sequential(*sequence) | |
def forward(self, input): | |
"""Standard forward.""" | |
return self.main(input) | |
def hinge_d_loss(logits_real, logits_fake): | |
loss_real = torch.mean(F.relu(1.0 - logits_real)) | |
loss_fake = torch.mean(F.relu(1.0 + logits_fake)) | |
d_loss = 0.5 * (loss_real + loss_fake) | |
return d_loss | |
def vanilla_d_loss(logits_real, logits_fake): | |
d_loss = 0.5 * ( | |
torch.mean(torch.nn.functional.softplus(-logits_real)) + torch.mean(torch.nn.functional.softplus(logits_fake)) | |
) | |
return d_loss | |
def hinge_d_loss_with_exemplar_weights(logits_real, logits_fake, weights): | |
assert weights.shape[0] == logits_real.shape[0] == logits_fake.shape[0] | |
loss_real = torch.mean(F.relu(1.0 - logits_real), dim=[1, 2, 3]) | |
loss_fake = torch.mean(F.relu(1.0 + logits_fake), dim=[1, 2, 3]) | |
loss_real = (weights * loss_real).sum() / weights.sum() | |
loss_fake = (weights * loss_fake).sum() / weights.sum() | |
d_loss = 0.5 * (loss_real + loss_fake) | |
return d_loss | |
def adopt_weight(weight, global_step, threshold=0, value=0.0): | |
if global_step < threshold: | |
weight = value | |
return weight | |
def measure_perplexity(predicted_indices, n_embed): | |
# src: https://github.com/karpathy/deep-vector-quantization/blob/main/model.py | |
# eval cluster perplexity. when perplexity == num_embeddings then all clusters are used exactly equally | |
encodings = F.one_hot(predicted_indices, n_embed).float().reshape(-1, n_embed) | |
avg_probs = encodings.mean(0) | |
perplexity = (-(avg_probs * torch.log(avg_probs + 1e-10)).sum()).exp() | |
cluster_use = torch.sum(avg_probs > 0) | |
return perplexity, cluster_use | |
def l1(x, y): | |
return torch.abs(x - y) | |
def l2(x, y): | |
return torch.pow((x - y), 2) | |
class LPIPSWithDiscriminator(nn.Module): | |
def __init__( | |
self, | |
disc_start, | |
logvar_init=0.0, | |
kl_weight=1.0, | |
pixelloss_weight=1.0, | |
perceptual_weight=1.0, | |
# --- Discriminator Loss --- | |
disc_num_layers=3, | |
disc_in_channels=3, | |
disc_factor=1.0, | |
disc_weight=1.0, | |
use_actnorm=False, | |
disc_conditional=False, | |
disc_loss="hinge", | |
): | |
super().__init__() | |
assert disc_loss in ["hinge", "vanilla"] | |
self.kl_weight = kl_weight | |
self.pixel_weight = pixelloss_weight | |
self.perceptual_loss = LPIPS().eval() | |
self.perceptual_weight = perceptual_weight | |
self.logvar = nn.Parameter(torch.ones(size=()) * logvar_init) | |
self.discriminator = NLayerDiscriminator( | |
input_nc=disc_in_channels, n_layers=disc_num_layers, use_actnorm=use_actnorm | |
).apply(weights_init) | |
self.discriminator_iter_start = disc_start | |
self.disc_loss = hinge_d_loss if disc_loss == "hinge" else vanilla_d_loss | |
self.disc_factor = disc_factor | |
self.discriminator_weight = disc_weight | |
self.disc_conditional = disc_conditional | |
def calculate_adaptive_weight(self, nll_loss, g_loss, last_layer=None): | |
if last_layer is not None: | |
nll_grads = torch.autograd.grad(nll_loss, last_layer, retain_graph=True)[0] | |
g_grads = torch.autograd.grad(g_loss, last_layer, retain_graph=True)[0] | |
else: | |
nll_grads = torch.autograd.grad(nll_loss, self.last_layer[0], retain_graph=True)[0] | |
g_grads = torch.autograd.grad(g_loss, self.last_layer[0], retain_graph=True)[0] | |
d_weight = torch.norm(nll_grads) / (torch.norm(g_grads) + 1e-4) | |
d_weight = torch.clamp(d_weight, 0.0, 1e4).detach() | |
d_weight = d_weight * self.discriminator_weight | |
return d_weight | |
def forward( | |
self, | |
inputs, | |
reconstructions, | |
posteriors, | |
optimizer_idx, | |
global_step, | |
split="train", | |
weights=None, | |
last_layer=None, | |
cond=None, | |
): | |
inputs = rearrange(inputs, "b c t h w -> (b t) c h w").contiguous() | |
reconstructions = rearrange(reconstructions, "b c t h w -> (b t) c h w").contiguous() | |
rec_loss = torch.abs(inputs - reconstructions) | |
if self.perceptual_weight > 0: | |
p_loss = self.perceptual_loss(inputs, reconstructions) | |
rec_loss = rec_loss + self.perceptual_weight * p_loss | |
nll_loss = rec_loss / torch.exp(self.logvar) + self.logvar | |
weighted_nll_loss = nll_loss | |
if weights is not None: | |
weighted_nll_loss = weights * nll_loss | |
weighted_nll_loss = torch.sum(weighted_nll_loss) / weighted_nll_loss.shape[0] | |
nll_loss = torch.sum(nll_loss) / nll_loss.shape[0] | |
kl_loss = posteriors.kl() | |
kl_loss = torch.sum(kl_loss) / kl_loss.shape[0] | |
# GAN Part | |
if optimizer_idx == 0: | |
# generator update | |
if cond is None: | |
assert not self.disc_conditional | |
logits_fake = self.discriminator(reconstructions.contiguous()) | |
else: | |
assert self.disc_conditional | |
logits_fake = self.discriminator(torch.cat((reconstructions.contiguous(), cond), dim=1)) | |
g_loss = -torch.mean(logits_fake) | |
if self.disc_factor > 0.0: | |
try: | |
d_weight = self.calculate_adaptive_weight(nll_loss, g_loss, last_layer=last_layer) | |
except RuntimeError: | |
assert not self.training | |
d_weight = torch.tensor(0.0) | |
else: | |
d_weight = torch.tensor(0.0) | |
disc_factor = adopt_weight(self.disc_factor, global_step, threshold=self.discriminator_iter_start) | |
loss = weighted_nll_loss + self.kl_weight * kl_loss + d_weight * disc_factor * g_loss | |
log = { | |
"{}/total_loss".format(split): loss.clone().detach().mean(), | |
"{}/logvar".format(split): self.logvar.detach(), | |
"{}/kl_loss".format(split): kl_loss.detach().mean(), | |
"{}/nll_loss".format(split): nll_loss.detach().mean(), | |
"{}/rec_loss".format(split): rec_loss.detach().mean(), | |
"{}/d_weight".format(split): d_weight.detach(), | |
"{}/disc_factor".format(split): torch.tensor(disc_factor), | |
"{}/g_loss".format(split): g_loss.detach().mean(), | |
} | |
return loss, log | |
if optimizer_idx == 1: | |
if cond is None: | |
logits_real = self.discriminator(inputs.contiguous().detach()) | |
logits_fake = self.discriminator(reconstructions.contiguous().detach()) | |
else: | |
logits_real = self.discriminator(torch.cat((inputs.contiguous().detach(), cond), dim=1)) | |
logits_fake = self.discriminator(torch.cat((reconstructions.contiguous().detach(), cond), dim=1)) | |
disc_factor = adopt_weight(self.disc_factor, global_step, threshold=self.discriminator_iter_start) | |
d_loss = disc_factor * self.disc_loss(logits_real, logits_fake) | |
log = { | |
"{}/disc_loss".format(split): d_loss.clone().detach().mean(), | |
"{}/logits_real".format(split): logits_real.detach().mean(), | |
"{}/logits_fake".format(split): logits_fake.detach().mean(), | |
} | |
return d_loss, log | |
class LPIPSWithDiscriminator3D(nn.Module): | |
def __init__( | |
self, | |
disc_start, | |
logvar_init=0.0, | |
kl_weight=1.0, | |
pixelloss_weight=1.0, | |
perceptual_weight=1.0, | |
# --- Discriminator Loss --- | |
disc_num_layers=3, | |
disc_in_channels=3, | |
disc_factor=1.0, | |
disc_weight=1.0, | |
use_actnorm=False, | |
disc_conditional=False, | |
disc_loss="hinge", | |
): | |
super().__init__() | |
assert disc_loss in ["hinge", "vanilla"] | |
self.kl_weight = kl_weight | |
self.pixel_weight = pixelloss_weight | |
self.perceptual_loss = LPIPS().eval() | |
self.perceptual_weight = perceptual_weight | |
self.logvar = nn.Parameter(torch.ones(size=()) * logvar_init) | |
self.discriminator = NLayerDiscriminator3D( | |
input_nc=disc_in_channels, n_layers=disc_num_layers, use_actnorm=use_actnorm | |
).apply(weights_init) | |
self.discriminator_iter_start = disc_start | |
self.disc_loss = hinge_d_loss if disc_loss == "hinge" else vanilla_d_loss | |
self.disc_factor = disc_factor | |
self.discriminator_weight = disc_weight | |
self.disc_conditional = disc_conditional | |
def calculate_adaptive_weight(self, nll_loss, g_loss, last_layer=None): | |
if last_layer is not None: | |
nll_grads = torch.autograd.grad(nll_loss, last_layer, retain_graph=True)[0] | |
g_grads = torch.autograd.grad(g_loss, last_layer, retain_graph=True)[0] | |
else: | |
nll_grads = torch.autograd.grad(nll_loss, self.last_layer[0], retain_graph=True)[0] | |
g_grads = torch.autograd.grad(g_loss, self.last_layer[0], retain_graph=True)[0] | |
d_weight = torch.norm(nll_grads) / (torch.norm(g_grads) + 1e-4) | |
d_weight = torch.clamp(d_weight, 0.0, 1e4).detach() | |
d_weight = d_weight * self.discriminator_weight | |
return d_weight | |
def forward( | |
self, | |
inputs, | |
reconstructions, | |
posteriors, | |
optimizer_idx, | |
global_step, | |
split="train", | |
weights=None, | |
last_layer=None, | |
cond=None, | |
): | |
t = inputs.shape[2] | |
inputs = rearrange(inputs, "b c t h w -> (b t) c h w").contiguous() | |
reconstructions = rearrange(reconstructions, "b c t h w -> (b t) c h w").contiguous() | |
rec_loss = torch.abs(inputs - reconstructions) | |
if self.perceptual_weight > 0: | |
p_loss = self.perceptual_loss(inputs, reconstructions) | |
rec_loss = rec_loss + self.perceptual_weight * p_loss | |
nll_loss = rec_loss / torch.exp(self.logvar) + self.logvar | |
weighted_nll_loss = nll_loss | |
if weights is not None: | |
weighted_nll_loss = weights * nll_loss | |
weighted_nll_loss = torch.sum(weighted_nll_loss) / weighted_nll_loss.shape[0] | |
nll_loss = torch.sum(nll_loss) / nll_loss.shape[0] | |
kl_loss = posteriors.kl() | |
kl_loss = torch.sum(kl_loss) / kl_loss.shape[0] | |
inputs = rearrange(inputs, "(b t) c h w -> b c t h w", t=t).contiguous() | |
reconstructions = rearrange(reconstructions, "(b t) c h w -> b c t h w", t=t).contiguous() | |
# GAN Part | |
if optimizer_idx == 0: | |
# generator update | |
if cond is None: | |
assert not self.disc_conditional | |
logits_fake = self.discriminator(reconstructions) | |
else: | |
assert self.disc_conditional | |
logits_fake = self.discriminator(torch.cat((reconstructions, cond), dim=1)) | |
g_loss = -torch.mean(logits_fake) | |
if self.disc_factor > 0.0: | |
try: | |
d_weight = self.calculate_adaptive_weight(nll_loss, g_loss, last_layer=last_layer) | |
except RuntimeError as e: | |
assert not self.training, print(e) | |
d_weight = torch.tensor(0.0) | |
else: | |
d_weight = torch.tensor(0.0) | |
disc_factor = adopt_weight(self.disc_factor, global_step, threshold=self.discriminator_iter_start) | |
loss = weighted_nll_loss + self.kl_weight * kl_loss + d_weight * disc_factor * g_loss | |
log = { | |
"{}/total_loss".format(split): loss.clone().detach().mean(), | |
"{}/logvar".format(split): self.logvar.detach(), | |
"{}/kl_loss".format(split): kl_loss.detach().mean(), | |
"{}/nll_loss".format(split): nll_loss.detach().mean(), | |
"{}/rec_loss".format(split): rec_loss.detach().mean(), | |
"{}/d_weight".format(split): d_weight.detach(), | |
"{}/disc_factor".format(split): torch.tensor(disc_factor), | |
"{}/g_loss".format(split): g_loss.detach().mean(), | |
} | |
return loss, log | |
if optimizer_idx == 1: | |
if cond is None: | |
logits_real = self.discriminator(inputs.contiguous().detach()) | |
logits_fake = self.discriminator(reconstructions.contiguous().detach()) | |
else: | |
logits_real = self.discriminator(torch.cat((inputs.contiguous().detach(), cond), dim=1)) | |
logits_fake = self.discriminator(torch.cat((reconstructions.contiguous().detach(), cond), dim=1)) | |
disc_factor = adopt_weight(self.disc_factor, global_step, threshold=self.discriminator_iter_start) | |
d_loss = disc_factor * self.disc_loss(logits_real, logits_fake) | |
log = { | |
"{}/disc_loss".format(split): d_loss.clone().detach().mean(), | |
"{}/logits_real".format(split): logits_real.detach().mean(), | |
"{}/logits_fake".format(split): logits_fake.detach().mean(), | |
} | |
return d_loss, log | |
class SimpleLPIPS(nn.Module): | |
def __init__( | |
self, | |
logvar_init=0.0, | |
kl_weight=1.0, | |
pixelloss_weight=1.0, | |
perceptual_weight=1.0, | |
disc_loss="hinge", | |
): | |
super().__init__() | |
assert disc_loss in ["hinge", "vanilla"] | |
self.kl_weight = kl_weight | |
self.pixel_weight = pixelloss_weight | |
self.perceptual_loss = LPIPS().eval() | |
self.perceptual_weight = perceptual_weight | |
self.logvar = nn.Parameter(torch.ones(size=()) * logvar_init) | |
def forward( | |
self, | |
inputs, | |
reconstructions, | |
posteriors, | |
split="train", | |
weights=None, | |
): | |
inputs = rearrange(inputs, "b c t h w -> (b t) c h w").contiguous() | |
reconstructions = rearrange(reconstructions, "b c t h w -> (b t) c h w").contiguous() | |
rec_loss = torch.abs(inputs - reconstructions) | |
if self.perceptual_weight > 0: | |
p_loss = self.perceptual_loss(inputs, reconstructions) | |
rec_loss = rec_loss + self.perceptual_weight * p_loss | |
nll_loss = rec_loss / torch.exp(self.logvar) + self.logvar | |
weighted_nll_loss = nll_loss | |
if weights is not None: | |
weighted_nll_loss = weights * nll_loss | |
weighted_nll_loss = torch.sum(weighted_nll_loss) / weighted_nll_loss.shape[0] | |
nll_loss = torch.sum(nll_loss) / nll_loss.shape[0] | |
kl_loss = posteriors.kl() | |
kl_loss = torch.sum(kl_loss) / kl_loss.shape[0] | |
loss = weighted_nll_loss + self.kl_weight * kl_loss | |
log = { | |
"{}/total_loss".format(split): loss.clone().detach().mean(), | |
"{}/logvar".format(split): self.logvar.detach(), | |
"{}/kl_loss".format(split): kl_loss.detach().mean(), | |
"{}/nll_loss".format(split): nll_loss.detach().mean(), | |
"{}/rec_loss".format(split): rec_loss.detach().mean(), | |
} | |
if self.perceptual_weight > 0: | |
log.update({"{}/p_loss".format(split): p_loss.detach().mean()}) | |
return loss, log | |