Spaces:
Running
Running
import gradio as gr | |
from gradio_client import Client , handle_file | |
import cv2 | |
import os | |
import shutil | |
from PIL import Image | |
from moviepy import * | |
clientImgPipeLn = Client("dj-dawgs-ipd/IPD_IMAGE_PIPELINE") | |
clientAudioPipeLn = Client("dj-dawgs-ipd/IPD_AUDIO_PIPELINE") | |
def predict(video_path): | |
cap = cv2.VideoCapture(video_path) | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
frame_interval = fps * 2 | |
frame_count = 0 | |
success = True | |
temp_data_path = "temp_data" | |
os.makedirs(temp_data_path, exist_ok=True) | |
temp_frames_path = os.path.join(temp_data_path, "temp_frames") | |
os.makedirs(temp_frames_path, exist_ok=True) | |
resImg = {} | |
resAudio = {} | |
video_clip = VideoFileClip(video_path) | |
if video_clip.audio is None: | |
resAudio = { | |
'prediction' : None, | |
'language' : None, | |
'label' : None, | |
'confidence' : None, | |
'hate_text' : None | |
} | |
else: | |
audio_path = os.path.join(temp_data_path , "temp_audio.wav") | |
video_clip.audio.write_audiofile(audio_path , codec="pcm_s16le") | |
resAudio = clientAudioPipeLn.predict( | |
audio_path = handle_file(audio_path), | |
api_name = '/predict' | |
) | |
while success: | |
success, frame = cap.read() | |
if frame_count % frame_interval == 0 and success: | |
temp_image_path = os.path.join(temp_data_path, f"temp_frames/temp_frame_{frame_count // fps}s.jpg") | |
cv2.imwrite(temp_image_path, frame) | |
response = clientImgPipeLn.predict( | |
image=handle_file(temp_image_path), | |
api_name="/predict" | |
) | |
print(f"Response for frame at {frame_count // fps}s: {response}") | |
if response['prediction'] == 'hate': | |
resImg = response | |
resImg['hate_image_timestamp'] = frame_count//fps | |
break | |
frame_count += 1 | |
cap.release() | |
shutil.rmtree(temp_data_path) | |
if len(resImg) == 0 and resAudio['prediction'] == 'not_hate': | |
return { | |
'prediction' : 'not_hate', | |
'language' : { | |
'video' : None, | |
'audio' : None | |
}, | |
'label' : { | |
'video' : None, | |
'audio' : None | |
}, | |
'confidence' : None, | |
'hate_text' : { | |
'video' : None, | |
'audio' : None | |
}, | |
'hate_image_timestamp' : None, | |
'hate_component' : None | |
} | |
if resImg['prediction'] == 'hate' and resAudio['prediction'] == 'not_hate': | |
resImg['hate_component'] = 'video' | |
return { | |
'prediction' : 'hate', | |
'language' : { | |
'video' : resImg['language'], | |
'audio' : None | |
}, | |
'label' : { | |
'video' : resImg['label'], | |
'audio' : None | |
}, | |
'confidence' : resImg['confidence'], | |
'hate_text' : { | |
'video' : resImg['hate_text'], | |
'audio' : None | |
}, | |
'hate_image_timestamp' : resImg['hate_image_timestamp'], | |
'hate_component' : ["video"] | |
} | |
if len(resImg) == 0 and resAudio['prediction'] == 'hate': | |
return { | |
'prediction' : 'hate', | |
'language' : { | |
'video' : None, | |
'audio' : resAudio['language'] | |
}, | |
'label' : { | |
'video' : None, | |
'audio' : resAudio['label'] | |
}, | |
'confidence' : resAudio['confidence'], | |
'hate_text' : { | |
'video' : None, | |
'audio' : resAudio['hate_text'] | |
}, | |
'hate_image_timestamp' : None, | |
'hate_component' : ["audio"], | |
} | |
return { | |
'prediction' : 'hate', | |
'language' : { | |
'video' : resImg['language'], | |
'audio' : resAudio['language'] | |
}, | |
'label' : { | |
'video' : resImg['label'], | |
'audio' : resAudio['label'] | |
}, | |
'confidence' : ((resImg['confidence'] or 0) + (resAudio['confidence'] or 0)) / (2 - (resImg['confidence'] == None or resAudio['confidence'] == None)), | |
'hate_text' : { | |
'video' : resImg['hate_text'], | |
'audio' : resAudio['hate_text'] | |
}, | |
'hate_image_timestamp' : resImg['hate_image_timestamp'], | |
'hate_component' : ["video" , "audio"] | |
} | |
iface = gr.Interface(fn=predict, | |
inputs = gr.Video(), | |
outputs=gr.JSON(), | |
title = "Hate Speech Detection in Video", | |
description = "Detect hateful symbols or text in Video" | |
) | |
if __name__ == "__main__": | |
iface.launch(show_error = True) | |