Spaces:
Running
Running
File size: 5,564 Bytes
add40fc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
import gradio as gr
from transformers import VideoLlavaForConditionalGeneration, VideoLlavaProcessor, TextIteratorStreamer
from threading import Thread
import re
import time
from PIL import Image
import torch
import cv2
import spaces
model = VideoLlavaForConditionalGeneration.from_pretrained("LanguageBind/Video-LLaVA-7B-hf", torch_dtype=torch.float16, device_map="cuda")
processor = VideoLlavaProcessor.from_pretrained("LanguageBind/Video-LLaVA-7B-hf")
#model.to("cuda")
def replace_video_with_images(text, frames):
return text.replace("<video>", "<image>" * frames)
import cv2
from PIL import Image
def sample_frames(video_file, num_frames):
video = cv2.VideoCapture(video_file)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
interval = max(1, total_frames // num_frames)
frames = []
for i in range(0, total_frames, interval):
video.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = video.read()
if not ret:
continue
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
frames.append(pil_img)
if len(frames) == num_frames:
break
video.release()
return frames
@spaces.GPU
def bot_streaming(message, history):
txt = message.text
ext_buffer = f"USER: {txt} ASSISTANT: "
if message.files:
if len(message.files) == 1:
image = [message.files[0].path]
# interleaved images or video
elif len(message.files) > 1:
image = [msg.path for msg in message.files]
else:
def has_file_data(lst):
return any(isinstance(item, FileData) for sublist in lst if isinstance(sublist, tuple) for item in sublist)
def extract_paths(lst):
return [item.path for sublist in lst if isinstance(sublist, tuple) for item in sublist if isinstance(item, FileData)]
latest_text_only_index = -1
for i, item in enumerate(history):
if all(isinstance(sub_item, str) for sub_item in item):
latest_text_only_index = i
image = [path for i, item in enumerate(history) if i < latest_text_only_index and has_file_data(item) for path in extract_paths(item)]
if message.files is None:
gr.Error("You need to upload an image or video for LLaVA to work.")
video_extensions = ("avi", "mp4", "mov", "mkv", "flv", "wmv", "mjpeg")
image_extensions = Image.registered_extensions()
image_extensions = tuple([ex for ex, f in image_extensions.items()])
image_list = []
video_list = []
print("media", image)
if len(image) == 1:
if image[0].endswith(video_extensions):
video_list = sample_frames(image[0], 12)
prompt = f"USER: <video> {message.text} ASSISTANT:"
elif image[0].endswith(image_extensions):
image_list.append(Image.open(image[0]).convert("RGB"))
prompt = f"USER: <image> {message.text} ASSISTANT:"
elif len(image) > 1:
user_prompt = message.text
for img in image:
if img.endswith(image_extensions):
img = Image.open(img).convert("RGB")
image_list.append(img)
elif img.endswith(video_extensions):
video_list.append(sample_frames(img, 7))
print(len(video_list[-1]))
#for frame in sample_frames(img, 6):
#video_list.append(frame)
print("video_list", video_list)
image_tokens = ""
video_tokens = ""
if image_list != []:
image_tokens = "<image>" * len(image_list)
if video_list != []:
toks = len(video_list)
video_tokens = "<video>" * toks
prompt = f"USER: {image_tokens}{video_tokens} {user_prompt} ASSISTANT:"
print(prompt)
if image_list != [] and video_list != []:
inputs = processor(prompt, images=image_list, videos=video_list, return_tensors="pt").to("cuda",torch.float16)
elif image_list != [] and video_list == []:
inputs = processor(prompt, images=image_list, return_tensors="pt").to("cuda", torch.float16)
elif image_list == [] and video_list != []:
inputs = processor(prompt, videos=video_list, return_tensors="pt").to("cuda", torch.float16)
streamer = TextIteratorStreamer(processor, **{"max_new_tokens": 200, "skip_special_tokens": True, "clean_up_tokenization_spaces":True})
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=100)
generated_text = ""
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
buffer = ""
for new_text in streamer:
buffer += new_text
generated_text_without_prompt = buffer[len(ext_buffer):][:-1]
time.sleep(0.01)
yield generated_text_without_prompt
demo = gr.ChatInterface(fn=bot_streaming, title="VideoLLaVA", examples=[
{"text": "The input contains two videos, are the cats in this video and this video doing the same thing?", "files":["./cats_1.mp4", "./cats_2.mp4"]},
{"text": "There are two images in the input. What is the relationship between this image and this image?", "files":["./rococo_1.jpg", "./rococo_2.jpg"]},
{"text": "What is the cat doing?", "files":["./cat.mp4"]},
{"text": "How to make this pastry?", "files":["./baklava.png"]}],
textbox=gr.MultimodalTextbox(file_count="multiple"),
description="Try [Video-LLaVA](https://huggingface.co/docs/transformers/main/en/model_doc/video_llava) in this demo. Upload an image or a video, and start chatting about it, or simply try one of the examples below. If you don't upload an image, you will receive an error. ",
stop_btn="Stop Generation", multimodal=True)
demo.launch(debug=True) |