import spaces from functools import lru_cache import gradio as gr from gradio_toggle import Toggle import torch from huggingface_hub import hf_hub_download from transformers import CLIPProcessor, CLIPModel from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder from ltx_video.models.transformers.transformer3d import Transformer3DModel from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier from ltx_video.schedulers.rf import RectifiedFlowScheduler from ltx_video.pipelines.pipeline_ltx_video import LTXVideoPipeline as XoraVideoPipeline from transformers import T5EncoderModel, T5Tokenizer from ltx_video.utils.conditioning_method import ConditioningMethod from pathlib import Path import safetensors.torch import json import numpy as np import cv2 from PIL import Image import tempfile import os import gc from openai import OpenAI import csv from datetime import datetime from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy # Load Hugging Face token if needed hf_token = os.getenv("HF_TOKEN") openai_api_key = os.getenv("OPENAI_API_KEY") client = OpenAI(api_key=openai_api_key) system_prompt_t2v_path = "assets/system_prompt_t2v.txt" system_prompt_i2v_path = "assets/system_prompt_i2v.txt" with open(system_prompt_t2v_path, "r") as f: system_prompt_t2v = f.read() with open(system_prompt_i2v_path, "r") as f: system_prompt_i2v = f.read() # Set model download directory within Hugging Face Spaces model_path = Path("/home/elevin/xora-core/assets/") cpkt_path = Path("/home/elevin/xora-core/assets/ltx-video-2b-v0.9.1.safetensors") if not os.path.exists(cpkt_path): hf_hub_download(repo_id="Lightricks/LTX-Video", filename="ltx-video-2b-v0.9.1.safetensors", local_dir=model_path, local_dir_use_symlinks=False, repo_type='model') # Global variables to load components vae_dir = Path(model_path) / "vae" unet_dir = Path(model_path) / "unet" scheduler_dir = Path(model_path) / "scheduler" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") DATA_DIR = "/data" os.makedirs(DATA_DIR, exist_ok=True) LOG_FILE_PATH = os.path.join("/data", "user_requests.csv") clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32", cache_dir=model_path) clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32", cache_dir=model_path) if not os.path.exists(LOG_FILE_PATH): with open(LOG_FILE_PATH, "w", newline="") as f: writer = csv.writer(f) writer.writerow( [ "timestamp", "request_type", "prompt", "negative_prompt", "height", "width", "num_frames", "frame_rate", "seed", "num_inference_steps", "guidance_scale", "is_enhanced", "clip_embedding", "original_resolution", ] ) @lru_cache(maxsize=128) def log_request( request_type, prompt, negative_prompt, height, width, num_frames, frame_rate, seed, num_inference_steps, guidance_scale, is_enhanced, clip_embedding=None, original_resolution=None, ): """Log the user's request to a CSV file.""" timestamp = datetime.now().isoformat() with open(LOG_FILE_PATH, "a", newline="") as f: try: writer = csv.writer(f) writer.writerow( [ timestamp, request_type, prompt, negative_prompt, height, width, num_frames, frame_rate, seed, num_inference_steps, guidance_scale, is_enhanced, clip_embedding, original_resolution, ] ) except Exception as e: print(f"Error logging request: {e}") def compute_clip_embedding(text=None, image=None): """ Compute CLIP embedding for a given text or image. Args: text (str): Input text prompt. image (PIL.Image): Input image. Returns: list: CLIP embedding as a list of floats. """ inputs = clip_processor(text=text, images=image, return_tensors="pt", padding=True) outputs = clip_model.get_text_features(**inputs) if text else clip_model.get_image_features(**inputs) embedding = outputs.detach().cpu().numpy().flatten().tolist() return embedding def load_vae(vae_dir): return CausalVideoAutoencoder.from_pretrained(cpkt_path).to(device=device, dtype=torch.bfloat16) def load_unet(unet_dir): return Transformer3DModel.from_pretrained(cpkt_path).to(device=device, dtype=torch.bfloat16) def load_scheduler(scheduler_dir): return RectifiedFlowScheduler.from_pretrained(cpkt_path) # Helper function for image processing def center_crop_and_resize(frame, target_height, target_width): h, w, _ = frame.shape aspect_ratio_target = target_width / target_height aspect_ratio_frame = w / h if aspect_ratio_frame > aspect_ratio_target: new_width = int(h * aspect_ratio_target) x_start = (w - new_width) // 2 frame_cropped = frame[:, x_start : x_start + new_width] else: new_height = int(w / aspect_ratio_target) y_start = (h - new_height) // 2 frame_cropped = frame[y_start : y_start + new_height, :] frame_resized = cv2.resize(frame_cropped, (target_width, target_height)) return frame_resized def load_image_to_tensor_with_resize(image_path, target_height=512, target_width=768): image = Image.open(image_path).convert("RGB") image_np = np.array(image) frame_resized = center_crop_and_resize(image_np, target_height, target_width) frame_tensor = torch.tensor(frame_resized).permute(2, 0, 1).float() frame_tensor = (frame_tensor / 127.5) - 1.0 return frame_tensor.unsqueeze(0).unsqueeze(2) def enhance_prompt_if_enabled(prompt, enhance_toggle, type="t2v"): if not enhance_toggle: print("Enhance toggle is off, Prompt: ", prompt) return prompt system_prompt = system_prompt_t2v if type == "t2v" else system_prompt_i2v messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}, ] try: response = client.chat.completions.create( model="gpt-4o-mini", messages=messages, max_tokens=200, ) print("Enhanced Prompt: ", response.choices[0].message.content.strip()) return response.choices[0].message.content.strip() except Exception as e: print(f"Error: {e}") return prompt # Preset options for resolution and frame configuration preset_options = [ {"label": "1216x704, 41 frames", "width": 1216, "height": 704, "num_frames": 41}, {"label": "1088x704, 49 frames", "width": 1088, "height": 704, "num_frames": 49}, {"label": "1056x640, 57 frames", "width": 1056, "height": 640, "num_frames": 57}, {"label": "992x608, 65 frames", "width": 992, "height": 608, "num_frames": 65}, {"label": "896x608, 73 frames", "width": 896, "height": 608, "num_frames": 73}, {"label": "896x544, 81 frames", "width": 896, "height": 544, "num_frames": 81}, {"label": "832x544, 89 frames", "width": 832, "height": 544, "num_frames": 89}, {"label": "800x512, 97 frames", "width": 800, "height": 512, "num_frames": 97}, {"label": "768x512, 97 frames", "width": 768, "height": 512, "num_frames": 97}, {"label": "800x480, 105 frames", "width": 800, "height": 480, "num_frames": 105}, {"label": "736x480, 113 frames", "width": 736, "height": 480, "num_frames": 113}, {"label": "704x480, 121 frames", "width": 704, "height": 480, "num_frames": 121}, {"label": "704x448, 129 frames", "width": 704, "height": 448, "num_frames": 129}, {"label": "672x448, 137 frames", "width": 672, "height": 448, "num_frames": 137}, {"label": "640x416, 153 frames", "width": 640, "height": 416, "num_frames": 153}, {"label": "672x384, 161 frames", "width": 672, "height": 384, "num_frames": 161}, {"label": "640x384, 169 frames", "width": 640, "height": 384, "num_frames": 169}, {"label": "608x384, 177 frames", "width": 608, "height": 384, "num_frames": 177}, {"label": "576x384, 185 frames", "width": 576, "height": 384, "num_frames": 185}, {"label": "608x352, 193 frames", "width": 608, "height": 352, "num_frames": 193}, {"label": "576x352, 201 frames", "width": 576, "height": 352, "num_frames": 201}, {"label": "544x352, 209 frames", "width": 544, "height": 352, "num_frames": 209}, {"label": "512x352, 225 frames", "width": 512, "height": 352, "num_frames": 225}, {"label": "512x352, 233 frames", "width": 512, "height": 352, "num_frames": 233}, {"label": "544x320, 241 frames", "width": 544, "height": 320, "num_frames": 241}, {"label": "512x320, 249 frames", "width": 512, "height": 320, "num_frames": 249}, {"label": "512x320, 257 frames", "width": 512, "height": 320, "num_frames": 257}, ] # Function to toggle visibility of sliders based on preset selection def preset_changed(preset): if preset != "Custom": selected = next(item for item in preset_options if item["label"] == preset) return ( selected["height"], selected["width"], selected["num_frames"], gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), ) else: return ( None, None, None, gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), ) # Load models vae = load_vae(vae_dir) unet = load_unet(unet_dir) scheduler = load_scheduler(scheduler_dir) patchifier = SymmetricPatchifier(patch_size=1) text_encoder = T5EncoderModel.from_pretrained("PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="text_encoder").to(device) tokenizer = T5Tokenizer.from_pretrained("PixArt-alpha/PixArt-XL-2-1024-MS", subfolder="tokenizer") pipeline = XoraVideoPipeline( transformer=unet, patchifier=patchifier, text_encoder=text_encoder, tokenizer=tokenizer, scheduler=scheduler, vae=vae, ).to(device) @spaces.GPU(duration=120) def generate_video_from_text( prompt="", enhance_prompt_toggle=False, txt2vid_analytics_toggle=True, negative_prompt="", frame_rate=25, seed=646373, num_inference_steps=30, guidance_scale=3, height=512, width=768, num_frames=121, progress=gr.Progress(), stg_scale=1.0, stg_rescale=0.7, stg_mode="stg_a", stg_skip_layers="19", ): if len(prompt.strip()) < 50: raise gr.Error( "Prompt must be at least 50 characters long. Please provide more details for the best results.", duration=5, ) if txt2vid_analytics_toggle: log_request( "txt2vid", prompt, negative_prompt, height, width, num_frames, frame_rate, seed, num_inference_steps, guidance_scale, enhance_prompt_toggle, ) prompt = enhance_prompt_if_enabled(prompt, enhance_prompt_toggle, type="t2v") sample = { "prompt": prompt, "prompt_attention_mask": None, "negative_prompt": negative_prompt, "negative_prompt_attention_mask": None, "media_items": None, } generator = torch.Generator(device=device).manual_seed(seed) def gradio_progress_callback(self, step, timestep, kwargs): progress((step + 1) / num_inference_steps) try: with torch.no_grad(): images = pipeline( num_inference_steps=num_inference_steps, num_images_per_prompt=1, guidance_scale=guidance_scale, generator=generator, output_type="pt", height=height, width=width, num_frames=num_frames, frame_rate=frame_rate, **sample, is_video=True, vae_per_channel_normalize=True, conditioning_method=ConditioningMethod.UNCONDITIONAL, mixed_precision=True, callback_on_step_end=gradio_progress_callback, stg_scale=stg_scale, do_rescaling=stg_rescale != 1, rescaling_scale=stg_rescale, skip_layer_strategy=SkipLayerStrategy.Attention if stg_mode == "stg_a" else SkipLayerStrategy.Residual, skip_block_list=[int(x.strip()) for x in stg_skip_layers.split(",")] ).images except Exception as e: raise gr.Error( f"An error occurred while generating the video. Please try again. Error: {e}", duration=5, ) finally: torch.cuda.empty_cache() gc.collect() output_path = tempfile.mktemp(suffix=".mp4") print(images.shape) video_np = images.squeeze(0).permute(1, 2, 3, 0).cpu().float().numpy() video_np = (video_np * 255).astype(np.uint8) height, width = video_np.shape[1:3] out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), frame_rate, (width, height)) for frame in video_np[..., ::-1]: out.write(frame) out.release() # Explicitly delete tensors and clear cache del images del video_np torch.cuda.empty_cache() return output_path @spaces.GPU(duration=120) def generate_video_from_image( image_path, prompt="", enhance_prompt_toggle=False, img2vid_analytics_toggle=True, negative_prompt="", frame_rate=25, seed=646373, num_inference_steps=30, guidance_scale=3, height=512, width=768, num_frames=121, progress=gr.Progress(), stg_scale=1.0, stg_rescale=0.7, stg_mode="stg_a", stg_skip_layers="19", ): print("Height: ", height) print("Width: ", width) print("Num Frames: ", num_frames) if len(prompt.strip()) < 50: raise gr.Error( "Prompt must be at least 50 characters long. Please provide more details for the best results.", duration=5, ) if not image_path: raise gr.Error("Please provide an input image.", duration=5) if img2vid_analytics_toggle: with Image.open(image_path) as img: original_resolution = f"{img.width}x{img.height}" # Format as "widthxheight" clip_embedding = compute_clip_embedding(image=img) log_request( "img2vid", prompt, negative_prompt, height, width, num_frames, frame_rate, seed, num_inference_steps, guidance_scale, enhance_prompt_toggle, json.dumps(clip_embedding), original_resolution, ) media_items = load_image_to_tensor_with_resize(image_path, height, width).to(device).detach() prompt = enhance_prompt_if_enabled(prompt, enhance_prompt_toggle, type="i2v") sample = { "prompt": prompt, "prompt_attention_mask": None, "negative_prompt": negative_prompt, "negative_prompt_attention_mask": None, "media_items": media_items, } generator = torch.Generator(device=device).manual_seed(seed) def gradio_progress_callback(self, step, timestep, kwargs): progress((step + 1) / num_inference_steps) try: with torch.no_grad(): images = pipeline( num_inference_steps=num_inference_steps, num_images_per_prompt=1, guidance_scale=guidance_scale, generator=generator, output_type="pt", height=height, width=width, num_frames=num_frames, frame_rate=frame_rate, **sample, is_video=True, vae_per_channel_normalize=True, conditioning_method=ConditioningMethod.FIRST_FRAME, mixed_precision=True, callback_on_step_end=gradio_progress_callback, stg_scale=stg_scale, do_rescaling=stg_rescale != 1, rescaling_scale=stg_rescale, skip_layer_strategy=SkipLayerStrategy.Attention if stg_mode == "stg_a" else SkipLayerStrategy.Residual, skip_block_list=[int(x.strip()) for x in stg_skip_layers.split(",")] ).images output_path = tempfile.mktemp(suffix=".mp4") video_np = images.squeeze(0).permute(1, 2, 3, 0).cpu().float().numpy() video_np = (video_np * 255).astype(np.uint8) height, width = video_np.shape[1:3] out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), frame_rate, (width, height)) for frame in video_np[..., ::-1]: out.write(frame) out.release() except Exception as e: raise gr.Error( f"An error occurred while generating the video. Please try again. Error: {e}", duration=5, ) finally: torch.cuda.empty_cache() gc.collect() return output_path def create_advanced_options(): with gr.Accordion("Step 4: Advanced Options (Optional)", open=False): seed = gr.Slider(label="4.1 Seed", minimum=0, maximum=1000000, step=1, value=646373) inference_steps = gr.Slider(label="4.2 Inference Steps", minimum=1, maximum=50, step=1, value=30) guidance_scale = gr.Slider(label="4.3 Guidance Scale", minimum=1.0, maximum=5.0, step=0.1, value=3.0) height_slider = gr.Slider( label="4.4 Height", minimum=256, maximum=1024, step=64, value=512, visible=False, ) width_slider = gr.Slider( label="4.5 Width", minimum=256, maximum=1024, step=64, value=768, visible=False, ) num_frames_slider = gr.Slider( label="4.5 Number of Frames", minimum=1, maximum=200, step=1, value=121, visible=False, ) return [ seed, inference_steps, guidance_scale, height_slider, width_slider, num_frames_slider, ] # Define the Gradio interface with tabs with gr.Blocks(theme=gr.themes.Soft()) as iface: with gr.Row(elem_id="title-row"): gr.Markdown( """