Spaces:
Runtime error
Runtime error
from pathlib import Path | |
from typing import Any, Optional, Union, Callable | |
import pytorch_lightning as pl | |
import torch | |
from diffusers import DDPMScheduler, DiffusionPipeline, AutoencoderKL, DDIMScheduler | |
from diffusers.utils.import_utils import is_xformers_available | |
from einops import rearrange, repeat | |
from transformers import CLIPTextModel, CLIPTokenizer | |
from t2v_enhanced.utils.video_utils import ResultProcessor, save_videos_grid, video_naming | |
from t2v_enhanced.model import pl_module_params_controlnet | |
from t2v_enhanced.model.diffusers_conditional.models.controlnet.controlnet import ControlNetModel | |
from t2v_enhanced.model.diffusers_conditional.models.controlnet.unet_3d_condition import UNet3DConditionModel | |
from t2v_enhanced.model.diffusers_conditional.models.controlnet.pipeline_text_to_video_w_controlnet_synth import TextToVideoSDPipeline | |
from t2v_enhanced.model.diffusers_conditional.models.controlnet.processor import set_use_memory_efficient_attention_xformers | |
from t2v_enhanced.model.diffusers_conditional.models.controlnet.mask_generator import MaskGenerator | |
import warnings | |
# from warnings import warn | |
from t2v_enhanced.utils.iimage import IImage | |
from t2v_enhanced.utils.object_loader import instantiate_object | |
from t2v_enhanced.utils.object_loader import get_class | |
class VideoLDM(pl.LightningModule): | |
def __init__(self, | |
inference_params: pl_module_params_controlnet.InferenceParams, | |
opt_params: pl_module_params_controlnet.OptimizerParams = None, | |
unet_params: pl_module_params_controlnet.UNetParams = None, | |
): | |
super().__init__() | |
self.inference_generator = torch.Generator(device=self.device) | |
self.opt_params = opt_params | |
self.unet_params = unet_params | |
print(f"Base pipeline from: {unet_params.pipeline_repo}") | |
print(f"Pipeline class {unet_params.pipeline_class}") | |
# load entire pipeline (unet, vq, text encoder,..) | |
state_dict_control_model = None | |
state_dict_fusion = None | |
state_dict_base_model = None | |
if len(opt_params.load_trained_controlnet_from_ckpt) > 0: | |
state_dict_ckpt = torch.load(opt_params.load_trained_controlnet_from_ckpt, map_location=torch.device("cpu")) | |
state_dict_ckpt = state_dict_ckpt["state_dict"] | |
state_dict_control_model = dict(filter(lambda x: x[0].startswith("unet"), state_dict_ckpt.items())) | |
state_dict_control_model = {k.split("unet.")[1]: v for (k, v) in state_dict_control_model.items()} | |
state_dict_fusion = dict(filter(lambda x: "cross_attention_merger" in x[0], state_dict_ckpt.items())) | |
state_dict_fusion = {k.split("base_model.")[1]: v for (k, v) in state_dict_fusion.items()} | |
del state_dict_ckpt | |
state_dict_proj = None | |
state_dict_ckpt = None | |
if hasattr(unet_params, "use_resampler") and unet_params.use_resampler: | |
num_queries = unet_params.num_frames if unet_params.num_frames > 1 else None | |
if unet_params.use_image_tokens_ctrl: | |
num_queries = unet_params.num_control_input_frames | |
assert unet_params.frame_expansion == "none" | |
image_encoder = self.unet_params.image_encoder | |
embedding_dim = image_encoder.embedding_dim | |
resampler = instantiate_object(self.unet_params.resampler_cls, video_length=num_queries, embedding_dim=embedding_dim, input_tokens=image_encoder.num_tokens, num_layers=self.unet_params.resampler_merging_layers, aggregation=self.unet_params.aggregation) | |
state_dict_proj = None | |
self.resampler = resampler | |
self.image_encoder = image_encoder | |
noise_scheduler = DDPMScheduler.from_pretrained(self.unet_params.pipeline_repo, subfolder="scheduler") | |
tokenizer = CLIPTokenizer.from_pretrained(self.unet_params.pipeline_repo, subfolder="tokenizer") | |
text_encoder = CLIPTextModel.from_pretrained(self.unet_params.pipeline_repo, subfolder="text_encoder") | |
vae = AutoencoderKL.from_pretrained(self.unet_params.pipeline_repo, subfolder="vae") | |
base_model = UNet3DConditionModel.from_pretrained(self.unet_params.pipeline_repo, subfolder="unet", low_cpu_mem_usage=False, device_map=None, merging_mode=self.unet_params.merging_mode_base, use_image_embedding=unet_params.use_resampler and unet_params.use_image_tokens_main, use_fps_conditioning=self.opt_params.use_fps_conditioning, unet_params=unet_params) | |
if state_dict_base_model is not None: | |
miss, unex = base_model.load_state_dict(state_dict_base_model, strict=False) | |
assert len(unex) == 0 | |
if len(miss) > 0: | |
warnings.warn(f"Missing keys when loading base_mode:{miss}") | |
del state_dict_base_model | |
if state_dict_fusion is not None: | |
miss, unex = base_model.load_state_dict(state_dict_fusion, strict=False) | |
assert len(unex) == 0 | |
del state_dict_fusion | |
print("PIPE LOADING DONE") | |
self.noise_scheduler = noise_scheduler | |
self.tokenizer = tokenizer | |
self.text_encoder = text_encoder | |
self.vae = vae | |
self.unet = ControlNetModel.from_unet( | |
unet=base_model, | |
conditioning_embedding_out_channels=unet_params.conditioning_embedding_out_channels, | |
downsample_controlnet_cond=unet_params.downsample_controlnet_cond, | |
num_frames=unet_params.num_frames if (unet_params.frame_expansion != "none" or self.unet_params.use_controlnet_mask) else unet_params.num_control_input_frames, | |
num_frame_conditioning=unet_params.num_control_input_frames, | |
frame_expansion=unet_params.frame_expansion, | |
pre_transformer_in_cond=unet_params.pre_transformer_in_cond, | |
num_tranformers=unet_params.num_tranformers, | |
vae=AutoencoderKL.from_pretrained(self.unet_params.pipeline_repo, subfolder="vae"), | |
zero_conv_mode=unet_params.zero_conv_mode, | |
merging_mode=unet_params.merging_mode, | |
condition_encoder=unet_params.condition_encoder, | |
use_controlnet_mask=unet_params.use_controlnet_mask, | |
use_image_embedding=unet_params.use_resampler and unet_params.use_image_tokens_ctrl, | |
unet_params=unet_params, | |
use_image_encoder_normalization=unet_params.use_image_encoder_normalization, | |
) | |
if state_dict_control_model is not None: | |
miss, unex = self.unet.load_state_dict( | |
state_dict_control_model, strict=False) | |
if len(miss) > 0: | |
print("WARNING: Loading checkpoint for controlnet misses states") | |
print(miss) | |
if unet_params.frame_expansion == "none": | |
attention_params = self.unet_params.attention_mask_params | |
assert not attention_params.temporal_self_attention_only_on_conditioning and not attention_params.spatial_attend_on_condition_frames and not attention_params.temp_attend_on_neighborhood_of_condition_frames | |
self.mask_generator = MaskGenerator( | |
self.unet_params.attention_mask_params, num_frame_conditioning=self.unet_params.num_control_input_frames, num_frames=self.unet_params.num_frames) | |
self.mask_generator_base = MaskGenerator( | |
self.unet_params.attention_mask_params_base, num_frame_conditioning=self.unet_params.num_control_input_frames, num_frames=self.unet_params.num_frames) | |
if state_dict_proj is not None and unet_params.use_image_tokens_main: | |
if unet_params.use_image_tokens_main: | |
missing, unexpected = base_model.load_state_dict( | |
state_dict_proj, strict=False) | |
elif unet_params.use_image_tokens_ctrl: | |
missing, unexpected = unet.load_state_dict( | |
state_dict_proj, strict=False) | |
assert len(unexpected) == 0, f"Unexpected entries {unexpected}" | |
print(f"Missing keys state proj = {missing}") | |
del state_dict_proj | |
base_model.requires_grad_(False) | |
self.base_model = base_model | |
self.unet.requires_grad_(False) | |
self.text_encoder.requires_grad_(False) | |
self.vae.requires_grad_(False) | |
layers_config = opt_params.layers_config | |
layers_config.set_requires_grad(self) | |
print("CUSTOM XFORMERS ATTENTION USED.") | |
if is_xformers_available(): | |
set_use_memory_efficient_attention_xformers(self.unet, num_frame_conditioning=self.unet_params.num_control_input_frames, | |
num_frames=self.unet_params.num_frames, | |
attention_mask_params=self.unet_params.attention_mask_params | |
) | |
set_use_memory_efficient_attention_xformers(self.base_model, num_frame_conditioning=self.unet_params.num_control_input_frames, | |
num_frames=self.unet_params.num_frames, | |
attention_mask_params=self.unet_params.attention_mask_params_base) | |
if len(inference_params.scheduler_cls) > 0: | |
inf_scheduler_class = get_class(inference_params.scheduler_cls) | |
else: | |
inf_scheduler_class = DDIMScheduler | |
inf_scheduler = inf_scheduler_class.from_pretrained( | |
self.unet_params.pipeline_repo, subfolder="scheduler") | |
inference_pipeline = TextToVideoSDPipeline(vae=self.vae, | |
text_encoder=self.text_encoder, | |
tokenizer=self.tokenizer, | |
unet=self.base_model, | |
controlnet=self.unet, | |
scheduler=inf_scheduler | |
) | |
inference_pipeline.set_noise_generator(self.opt_params.noise_generator) | |
inference_pipeline.enable_vae_slicing() | |
inference_pipeline.set_progress_bar_config(disable=True) | |
self.inference_params = inference_params | |
self.inference_pipeline = inference_pipeline | |
self.result_processor = ResultProcessor(fps=self.inference_params.frame_rate, n_frames=self.inference_params.video_length) | |
def on_start(self): | |
datamodule = self.trainer._data_connector._datahook_selector.datamodule | |
pipe_id_model = self.unet_params.pipeline_repo | |
for dataset_key in ["video_dataset", "image_dataset", "predict_dataset"]: | |
dataset = getattr(datamodule, dataset_key, None) | |
if dataset is not None and hasattr(dataset, "model_id"): | |
pipe_id_data = dataset.model_id | |
assert pipe_id_model == pipe_id_data, f"Model and Dataloader need the same pipeline path. Found '{pipe_id_model}' and '{dataset_key}.model_id={pipe_id_data}'. Consider setting '--data.{dataset_key}.model_id={pipe_id_data}'" | |
self.result_processor.set_logger(self.logger) | |
def on_predict_start(self) -> None: | |
self.on_start() | |
# pipe = DiffusionPipeline.from_pretrained("damo-vilab/text-to-video-ms-1.7b", torch_dtype=torch.float16, variant="fp16") | |
# pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config) | |
# pipe.set_progress_bar_config(disable=True) | |
# self.first_stage = pipe.to(self.device) | |
def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: int = 0) -> Any: | |
cfg = self.trainer.predict_cfg | |
result_file_stem = cfg["result_file_stem"] | |
storage_fol = Path(cfg['predict_dir']) | |
prompts = [cfg["prompt"]] | |
inference_params: pl_module_params_controlnet.InferenceParams = self.inference_params | |
conditioning_type = inference_params.conditioning_type | |
n_autoregressive_generations = inference_params.n_autoregressive_generations | |
mode = inference_params.mode | |
start_from_real_input = inference_params.start_from_real_input | |
assert isinstance(prompts, list) | |
prompts = n_autoregressive_generations * prompts | |
self.inference_generator.manual_seed(self.inference_params.seed) | |
assert self.unet_params.num_control_input_frames == self.inference_params.video_length//2, f"currently we assume to have an equal size for and second half of the frame interval, e.g. 16 frames, and we condition on 8. Current setup: {self.unet_params.num_frame_conditioning} and {self.inference_params.video_length}" | |
chunks_conditional = [] | |
batch_size = 1 | |
shape = (batch_size, self.inference_pipeline.unet.config.in_channels, self.inference_params.video_length, | |
self.inference_pipeline.unet.config.sample_size, self.inference_pipeline.unet.config.sample_size) | |
for idx, prompt in enumerate(prompts): | |
if idx > 0: | |
content = sample*2-1 | |
content_latent = self.vae.encode(content).latent_dist.sample() * self.vae.config.scaling_factor | |
content_latent = rearrange(content_latent, "F C W H -> 1 C F W H") | |
content_latent = content_latent[:, :, self.unet_params.num_control_input_frames:].detach().clone() | |
if hasattr(self.inference_pipeline, "noise_generator"): | |
latents = self.inference_pipeline.noise_generator.sample_noise(shape=shape, device=self.device, dtype=self.dtype, generator=self.inference_generator, content=content_latent if idx > 0 else None) | |
else: | |
latents = None | |
if idx == 0: | |
sample = cfg["video"] | |
else: | |
if inference_params.conditioning_type == "fixed": | |
context = chunks_conditional[0][:self.unet_params.num_frame_conditioning] | |
context = [context] | |
context = [2*sample-1 for sample in context] | |
input_frames_conditioning = torch.cat(context).detach().clone() | |
input_frames_conditioning = rearrange(input_frames_conditioning, "F C W H -> 1 F C W H") | |
elif inference_params.conditioning_type == "last_chunk": | |
input_frames_conditioning = condition_input[:, -self.unet_params.num_frame_conditioning:].detach().clone() | |
elif inference_params.conditioning_type == "past": | |
context = [sample[:self.unet_params.num_control_input_frames] for sample in chunks_conditional] | |
context = [2*sample-1 for sample in context] | |
input_frames_conditioning = torch.cat(context).detach().clone() | |
input_frames_conditioning = rearrange(input_frames_conditioning, "F C W H -> 1 F C W H") | |
else: | |
raise NotImplementedError() | |
input_frames = condition_input[:, self.unet_params.num_control_input_frames:].detach().clone() | |
sample = self(prompt, input_frames=input_frames, input_frames_conditioning=input_frames_conditioning, latents=latents) | |
if hasattr(self.inference_pipeline, "reset_noise_generator_state"): | |
self.inference_pipeline.reset_noise_generator_state() | |
condition_input = rearrange(sample, "F C W H -> 1 F C W H") | |
condition_input = (2*condition_input)-1 # range: [-1,1] | |
# store first 16 frames, then always last 8 of a chunk | |
chunks_conditional.append(sample) | |
result_formats = self.inference_params.result_formats | |
# result_formats = [gif", "mp4"] | |
concat_video = self.inference_params.concat_video | |
def IImage_normalized(x): return IImage(x, vmin=0, vmax=1) | |
for result_format in result_formats: | |
save_format = result_format.replace("eval_", "") | |
merged_video = None | |
for chunk_idx, (prompt, video) in enumerate(zip(prompts, chunks_conditional)): | |
if chunk_idx == 0: | |
current_video = IImage_normalized(video) | |
else: | |
current_video = IImage_normalized(video[self.unet_params.num_control_input_frames:]) | |
if merged_video is None: | |
merged_video = current_video | |
else: | |
merged_video &= current_video | |
if concat_video: | |
filename = video_naming(prompts[0], save_format, batch_idx, 0) | |
result_file_video = (storage_fol / filename).absolute().as_posix() | |
result_file_video = (Path(result_file_video).parent / (result_file_stem+Path(result_file_video).suffix)).as_posix() | |
self.result_processor.save_to_file(video=merged_video.torch(vmin=0, vmax=1), prompt=prompts[0], video_filename=result_file_video, prompt_on_vid=False) | |
def forward(self, prompt, input_frames=None, input_frames_conditioning=None, latents=None): | |
call_params = self.inference_params.to_dict() | |
print(f"INFERENCE PARAMS = {call_params}") | |
call_params["prompt"] = prompt | |
call_params["image"] = input_frames | |
call_params["num_frames"] = self.inference_params.video_length | |
call_params["return_dict"] = False | |
call_params["output_type"] = "pt_t2v" | |
call_params["mask_generator"] = self.mask_generator | |
call_params["precision"] = "16" if self.trainer.precision.startswith("16") else "32" | |
call_params["no_text_condition_control"] = self.opt_params.no_text_condition_control | |
call_params["weight_control_sample"] = self.unet_params.weight_control_sample | |
call_params["use_controlnet_mask"] = self.unet_params.use_controlnet_mask | |
call_params["skip_controlnet_branch"] = self.opt_params.skip_controlnet_branch | |
call_params["img_cond_resampler"] = self.resampler if self.unet_params.use_resampler else None | |
call_params["img_cond_encoder"] = self.image_encoder if self.unet_params.use_resampler else None | |
call_params["input_frames_conditioning"] = input_frames_conditioning | |
call_params["cfg_text_image"] = self.unet_params.cfg_text_image | |
call_params["use_of"] = self.unet_params.use_of | |
if latents is not None: | |
call_params["latents"] = latents | |
sample = self.inference_pipeline(generator=self.inference_generator, **call_params) | |
return sample | |