|
import gradio as gr |
|
import torch |
|
import numpy as np |
|
from transformers import AutoProcessor, AutoModel |
|
from PIL import Image |
|
import cv2 |
|
from concurrent.futures import ThreadPoolExecutor |
|
import os |
|
|
|
|
|
MODEL_NAME = "microsoft/xclip-base-patch16-zero-shot" |
|
CLIP_LEN = 32 |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
print (device) |
|
|
|
|
|
processor = AutoProcessor.from_pretrained(MODEL_NAME) |
|
model = AutoModel.from_pretrained(MODEL_NAME).to(device) |
|
|
|
def get_video_length(file_path): |
|
cap = cv2.VideoCapture(file_path) |
|
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
cap.release() |
|
return length |
|
|
|
def read_video_opencv(file_path, indices): |
|
frames = [] |
|
with ThreadPoolExecutor() as executor: |
|
futures = [executor.submit(get_frame, file_path, i) for i in indices] |
|
for future in futures: |
|
frame = future.result() |
|
if frame is not None: |
|
frames.append(frame) |
|
return frames |
|
|
|
def get_frame(file_path, index): |
|
cap = cv2.VideoCapture(file_path) |
|
cap.set(cv2.CAP_PROP_POS_FRAMES, index) |
|
ret, frame = cap.read() |
|
cap.release() |
|
if ret: |
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
return None |
|
|
|
def sample_uniform_frame_indices(clip_len, seg_len): |
|
if seg_len < clip_len: |
|
repeat_factor = np.ceil(clip_len / seg_len).astype(int) |
|
indices = np.arange(seg_len).tolist() * repeat_factor |
|
indices = indices[:clip_len] |
|
else: |
|
spacing = seg_len // clip_len |
|
indices = [i * spacing for i in range(clip_len)] |
|
return np.array(indices).astype(np.int64) |
|
|
|
def concatenate_frames(frames, clip_len): |
|
layout = { 32: (4, 8) } |
|
rows, cols = layout[clip_len] |
|
combined_image = Image.new('RGB', (frames[0].shape[1]*cols, frames[0].shape[0]*rows)) |
|
frame_iter = iter(frames) |
|
y_offset = 0 |
|
for i in range(rows): |
|
x_offset = 0 |
|
for j in range(cols): |
|
img = Image.fromarray(next(frame_iter)) |
|
combined_image.paste(img, (x_offset, y_offset)) |
|
x_offset += frames[0].shape[1] |
|
y_offset += frames[0].shape[0] |
|
return combined_image |
|
|
|
def model_interface(uploaded_video, activity): |
|
video_length = get_video_length(uploaded_video) |
|
indices = sample_uniform_frame_indices(CLIP_LEN, seg_len=video_length) |
|
video = read_video_opencv(uploaded_video, indices) |
|
concatenated_image = concatenate_frames(video, CLIP_LEN) |
|
|
|
activities_list = [activity, "other"] |
|
inputs = processor( |
|
text=activities_list, |
|
videos=list(video), |
|
return_tensors="pt", |
|
padding=True, |
|
) |
|
|
|
|
|
for key, value in inputs.items(): |
|
if isinstance(value, torch.Tensor): |
|
inputs[key] = value.to(device) |
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
|
|
logits_per_video = outputs.logits_per_video |
|
probs = logits_per_video.softmax(dim=1) |
|
|
|
results_probs = [] |
|
results_logits = [] |
|
max_prob_index = torch.argmax(probs[0]).item() |
|
for i in range(len(activities_list)): |
|
current_activity = activities_list[i] |
|
prob = float(probs[0][i].cpu()) |
|
logit = float(logits_per_video[0][i].cpu()) |
|
results_probs.append((current_activity, f"Probability: {prob * 100:.2f}%")) |
|
results_logits.append((current_activity, f"Raw Score: {logit:.2f}")) |
|
|
|
likely_label = activities_list[max_prob_index] |
|
likely_probability = float(probs[0][max_prob_index].cpu()) * 100 |
|
|
|
activity_perfomed = False |
|
if likely_label != 'other': |
|
activity_perfomed = True |
|
|
|
return activity_perfomed, concatenated_image, results_probs, results_logits, [likely_label, likely_probability] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
iface = gr.Interface( |
|
fn=model_interface, |
|
inputs=[ |
|
gr.components.Video(label="Upload a video file"), |
|
gr.components.Text(default="taking a shot", label="Desired Activity to Recognize"), |
|
], |
|
outputs=[ |
|
gr.components.Text(type="text", label="True/False"), |
|
gr.components.Image(type="pil", label="Sampled Frames"), |
|
gr.components.Text(type="text", label="Probabilities"), |
|
gr.components.Text(type="text", label="Raw Scores"), |
|
gr.components.Text(type="text", label="Top Prediction"), |
|
|
|
], |
|
title="Action Detection Video", |
|
description="[Author: Ibrahim Hasani] This Method uses X-CLIP [Version: ZERO SHOT / SAMPLED FRAMES = 32] to determine if an action is being performed in a video or not. (Binaray Classifier). It contrasts an Action against multiple negative labels that are supposedly far enough in the latent semantic space vs the target label. Do not use negative labels in the desired activity, rather the action to be performed.", |
|
live=False, |
|
theme=gr.themes.Monochrome(), |
|
|
|
) |
|
|
|
iface.launch() |
|
|