File size: 5,564 Bytes
add40fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import gradio as gr
from transformers import VideoLlavaForConditionalGeneration, VideoLlavaProcessor, TextIteratorStreamer
from threading import Thread
import re
import time 
from PIL import Image
import torch
import cv2
import spaces

model = VideoLlavaForConditionalGeneration.from_pretrained("LanguageBind/Video-LLaVA-7B-hf", torch_dtype=torch.float16, device_map="cuda")
processor = VideoLlavaProcessor.from_pretrained("LanguageBind/Video-LLaVA-7B-hf")
#model.to("cuda")

def replace_video_with_images(text, frames):
  return text.replace("<video>", "<image>" * frames)

import cv2
from PIL import Image

def sample_frames(video_file, num_frames):
    video = cv2.VideoCapture(video_file)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    interval = max(1, total_frames // num_frames)
    frames = []
    
    for i in range(0, total_frames, interval):
        video.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = video.read()
        if not ret:
            continue
        pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        frames.append(pil_img)
        if len(frames) == num_frames:
            break

    video.release()
    return frames

@spaces.GPU
def bot_streaming(message, history):

  txt = message.text
  ext_buffer = f"USER: {txt} ASSISTANT: "

  if message.files:
    if len(message.files) == 1:
      image = [message.files[0].path]
    # interleaved images or video
    elif len(message.files) > 1:
      image = [msg.path for msg in message.files]
  else:
      
    def has_file_data(lst):
      return any(isinstance(item, FileData) for sublist in lst if isinstance(sublist, tuple) for item in sublist)

    def extract_paths(lst):
        return [item.path for sublist in lst if isinstance(sublist, tuple) for item in sublist if isinstance(item, FileData)]

    latest_text_only_index = -1

    for i, item in enumerate(history):
        if all(isinstance(sub_item, str) for sub_item in item):
            latest_text_only_index = i

    image = [path for i, item in enumerate(history) if i < latest_text_only_index and has_file_data(item) for path in extract_paths(item)]

  if message.files is None:
      gr.Error("You need to upload an image or video for LLaVA to work.")
      
  video_extensions = ("avi", "mp4", "mov", "mkv", "flv", "wmv", "mjpeg")
  image_extensions = Image.registered_extensions()
  image_extensions = tuple([ex for ex, f in image_extensions.items()])
  image_list = []
  video_list = []

  print("media", image)
  if len(image) == 1:
    if image[0].endswith(video_extensions):
        
        video_list = sample_frames(image[0], 12)
        
        prompt = f"USER: <video> {message.text} ASSISTANT:"
    elif image[0].endswith(image_extensions):
        image_list.append(Image.open(image[0]).convert("RGB"))
        prompt =  f"USER: <image> {message.text} ASSISTANT:"

  elif len(image) > 1:
    user_prompt = message.text

    for img in image:
      if img.endswith(image_extensions):
        img = Image.open(img).convert("RGB")
        image_list.append(img)

      elif img.endswith(video_extensions):
        video_list.append(sample_frames(img, 7))
        print(len(video_list[-1]))
        #for frame in sample_frames(img, 6):
          #video_list.append(frame)
        
    print("video_list", video_list)
    image_tokens = ""
    video_tokens = ""

    if image_list != []:
      image_tokens = "<image>" * len(image_list)
    if video_list != []:   
      
      toks = len(video_list) 
      video_tokens = "<video>" * toks
      
    

    prompt = f"USER: {image_tokens}{video_tokens} {user_prompt} ASSISTANT:"

  print(prompt)
  if image_list != [] and video_list != []:
    inputs = processor(prompt, images=image_list, videos=video_list, return_tensors="pt").to("cuda",torch.float16)
  elif image_list != [] and video_list == []:
    inputs = processor(prompt, images=image_list, return_tensors="pt").to("cuda", torch.float16)
  elif image_list == [] and video_list != []:
    inputs = processor(prompt, videos=video_list, return_tensors="pt").to("cuda", torch.float16)
  
  
  streamer = TextIteratorStreamer(processor, **{"max_new_tokens": 200, "skip_special_tokens": True, "clean_up_tokenization_spaces":True})
  generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=100)
  generated_text = ""

  thread = Thread(target=model.generate, kwargs=generation_kwargs)
  thread.start()

  

  buffer = ""
  for new_text in streamer:
    
    buffer += new_text
    
    generated_text_without_prompt = buffer[len(ext_buffer):][:-1]
    time.sleep(0.01)
    yield generated_text_without_prompt


demo = gr.ChatInterface(fn=bot_streaming, title="VideoLLaVA", examples=[
     {"text": "The input contains two videos, are the cats in this video and this video doing the same thing?", "files":["./cats_1.mp4", "./cats_2.mp4"]},
    {"text": "There are two images in the input. What is the relationship between this image and this image?", "files":["./rococo_1.jpg", "./rococo_2.jpg"]},
     {"text": "What is the cat doing?", "files":["./cat.mp4"]}, 
    {"text": "How to make this pastry?", "files":["./baklava.png"]}], 
      textbox=gr.MultimodalTextbox(file_count="multiple"), 
      description="Try [Video-LLaVA](https://huggingface.co/docs/transformers/main/en/model_doc/video_llava) in this demo. Upload an image or a video, and start chatting about it, or simply try one of the examples below. If you don't upload an image, you will receive an error. ",
      stop_btn="Stop Generation", multimodal=True)
demo.launch(debug=True)