File size: 5,449 Bytes
73957b0 012c55d 73957b0 25030c6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import gradio as gr
import torch
import numpy as np
from transformers import AutoProcessor, AutoModel
from PIL import Image
import cv2
from concurrent.futures import ThreadPoolExecutor
import os
MODEL_NAME = "microsoft/xclip-base-patch16-zero-shot"
CLIP_LEN = 32
# Check if GPU is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print (device)
# Load model and processor once and move them to the device
processor = AutoProcessor.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME).to(device)
def get_video_length(file_path):
cap = cv2.VideoCapture(file_path)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
return length
def read_video_opencv(file_path, indices):
frames = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(get_frame, file_path, i) for i in indices]
for future in futures:
frame = future.result()
if frame is not None:
frames.append(frame)
return frames
def get_frame(file_path, index):
cap = cv2.VideoCapture(file_path)
cap.set(cv2.CAP_PROP_POS_FRAMES, index)
ret, frame = cap.read()
cap.release()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return None
def sample_uniform_frame_indices(clip_len, seg_len):
if seg_len < clip_len:
repeat_factor = np.ceil(clip_len / seg_len).astype(int)
indices = np.arange(seg_len).tolist() * repeat_factor
indices = indices[:clip_len]
else:
spacing = seg_len // clip_len
indices = [i * spacing for i in range(clip_len)]
return np.array(indices).astype(np.int64)
def concatenate_frames(frames, clip_len):
layout = { 32: (4, 8) }
rows, cols = layout[clip_len]
combined_image = Image.new('RGB', (frames[0].shape[1]*cols, frames[0].shape[0]*rows))
frame_iter = iter(frames)
y_offset = 0
for i in range(rows):
x_offset = 0
for j in range(cols):
img = Image.fromarray(next(frame_iter))
combined_image.paste(img, (x_offset, y_offset))
x_offset += frames[0].shape[1]
y_offset += frames[0].shape[0]
return combined_image
def model_interface(uploaded_video, activity):
video_length = get_video_length(uploaded_video)
indices = sample_uniform_frame_indices(CLIP_LEN, seg_len=video_length)
video = read_video_opencv(uploaded_video, indices)
concatenated_image = concatenate_frames(video, CLIP_LEN)
activities_list = [activity, "other"]
inputs = processor(
text=activities_list,
videos=list(video),
return_tensors="pt",
padding=True,
)
# Move the tensors to the same device as the model
for key, value in inputs.items():
if isinstance(value, torch.Tensor):
inputs[key] = value.to(device)
with torch.no_grad():
outputs = model(**inputs)
logits_per_video = outputs.logits_per_video
probs = logits_per_video.softmax(dim=1)
results_probs = []
results_logits = []
max_prob_index = torch.argmax(probs[0]).item()
for i in range(len(activities_list)):
current_activity = activities_list[i]
prob = float(probs[0][i].cpu()) # Move tensor data to CPU for further processing
logit = float(logits_per_video[0][i].cpu()) # Move tensor data to CPU for further processing
results_probs.append((current_activity, f"Probability: {prob * 100:.2f}%"))
results_logits.append((current_activity, f"Raw Score: {logit:.2f}"))
likely_label = activities_list[max_prob_index]
likely_probability = float(probs[0][max_prob_index].cpu()) * 100 # Move tensor data to CPU
activity_perfomed = False
if likely_label != 'other':
activity_perfomed = True
return activity_perfomed, concatenated_image, results_probs, results_logits, [likely_label, likely_probability]
# Load video paths from the folder
#video_folder = "Action Detection Samples"
#video_files = [os.path.join(video_folder, file) for file in os.listdir(video_folder) if file.endswith('.mp4')] # considering only mp4 files
# Create examples: assuming every video is about 'dancing'
#examples = [[video, "taking a shot"] for video in video_files]
iface = gr.Interface(
fn=model_interface,
inputs=[
gr.components.Video(label="Upload a video file"),
gr.components.Text(default="taking a shot", label="Desired Activity to Recognize"),
],
outputs=[
gr.components.Text(type="text", label="True/False"),
gr.components.Image(type="pil", label="Sampled Frames"),
gr.components.Text(type="text", label="Probabilities"),
gr.components.Text(type="text", label="Raw Scores"),
gr.components.Text(type="text", label="Top Prediction"),
],
title="Action Detection Video",
description="[Author: Ibrahim Hasani] This Method uses X-CLIP [Version: ZERO SHOT / SAMPLED FRAMES = 32] to determine if an action is being performed in a video or not. (Binaray Classifier). It contrasts an Action against multiple negative labels that are supposedly far enough in the latent semantic space vs the target label. Do not use negative labels in the desired activity, rather the action to be performed.",
live=False,
theme=gr.themes.Monochrome(),
#examples=examples # Add examples to the interface
)
iface.launch()
|