siddhantuniyal's picture
feat: add audio pipeline and change response format
200d507 verified
import gradio as gr
from gradio_client import Client , handle_file
import cv2
import os
import shutil
from PIL import Image
from moviepy import *
clientImgPipeLn = Client("dj-dawgs-ipd/IPD_IMAGE_PIPELINE")
clientAudioPipeLn = Client("dj-dawgs-ipd/IPD_AUDIO_PIPELINE")
def predict(video_path):
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_interval = fps * 2
frame_count = 0
success = True
temp_data_path = "temp_data"
os.makedirs(temp_data_path, exist_ok=True)
temp_frames_path = os.path.join(temp_data_path, "temp_frames")
os.makedirs(temp_frames_path, exist_ok=True)
resImg = {}
resAudio = {}
video_clip = VideoFileClip(video_path)
if video_clip.audio is None:
resAudio = {
'prediction' : None,
'language' : None,
'label' : None,
'confidence' : None,
'hate_text' : None
}
else:
audio_path = os.path.join(temp_data_path , "temp_audio.wav")
video_clip.audio.write_audiofile(audio_path , codec="pcm_s16le")
resAudio = clientAudioPipeLn.predict(
audio_path = handle_file(audio_path),
api_name = '/predict'
)
while success:
success, frame = cap.read()
if frame_count % frame_interval == 0 and success:
temp_image_path = os.path.join(temp_data_path, f"temp_frames/temp_frame_{frame_count // fps}s.jpg")
cv2.imwrite(temp_image_path, frame)
response = clientImgPipeLn.predict(
image=handle_file(temp_image_path),
api_name="/predict"
)
print(f"Response for frame at {frame_count // fps}s: {response}")
if response['prediction'] == 'hate':
resImg = response
resImg['hate_image_timestamp'] = frame_count//fps
break
frame_count += 1
cap.release()
shutil.rmtree(temp_data_path)
if len(resImg) == 0 and resAudio['prediction'] == 'not_hate':
return {
'prediction' : 'not_hate',
'language' : {
'video' : None,
'audio' : None
},
'label' : {
'video' : None,
'audio' : None
},
'confidence' : None,
'hate_text' : {
'video' : None,
'audio' : None
},
'hate_image_timestamp' : None,
'hate_component' : None
}
if resImg['prediction'] == 'hate' and resAudio['prediction'] == 'not_hate':
resImg['hate_component'] = 'video'
return {
'prediction' : 'hate',
'language' : {
'video' : resImg['language'],
'audio' : None
},
'label' : {
'video' : resImg['label'],
'audio' : None
},
'confidence' : resImg['confidence'],
'hate_text' : {
'video' : resImg['hate_text'],
'audio' : None
},
'hate_image_timestamp' : resImg['hate_image_timestamp'],
'hate_component' : ["video"]
}
if len(resImg) == 0 and resAudio['prediction'] == 'hate':
return {
'prediction' : 'hate',
'language' : {
'video' : None,
'audio' : resAudio['language']
},
'label' : {
'video' : None,
'audio' : resAudio['label']
},
'confidence' : resAudio['confidence'],
'hate_text' : {
'video' : None,
'audio' : resAudio['hate_text']
},
'hate_image_timestamp' : None,
'hate_component' : ["audio"],
}
return {
'prediction' : 'hate',
'language' : {
'video' : resImg['language'],
'audio' : resAudio['language']
},
'label' : {
'video' : resImg['label'],
'audio' : resAudio['label']
},
'confidence' : ((resImg['confidence'] or 0) + (resAudio['confidence'] or 0)) / (2 - (resImg['confidence'] == None or resAudio['confidence'] == None)),
'hate_text' : {
'video' : resImg['hate_text'],
'audio' : resAudio['hate_text']
},
'hate_image_timestamp' : resImg['hate_image_timestamp'],
'hate_component' : ["video" , "audio"]
}
iface = gr.Interface(fn=predict,
inputs = gr.Video(),
outputs=gr.JSON(),
title = "Hate Speech Detection in Video",
description = "Detect hateful symbols or text in Video"
)
if __name__ == "__main__":
iface.launch(show_error = True)