File size: 5,639 Bytes
d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a d6d8b90 909f75a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
import os
import subprocess
import gradio as gr
import whisper
import yt_dlp
import torch
import numpy as np
from moviepy.editor import VideoFileClip
from transformers import AutoModelForAudioClassification, AutoFeatureExtractor
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import BlipProcessor, BlipForConditionalGeneration
import cv2
def download_youtube_video(video_url, output_path):
ydl_opts = {
'format': 'bestvideo+bestaudio',
'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'),
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
video_info = ydl.extract_info(video_url, download=False)
video_title = video_info.get('title', 'video')
return os.path.join(output_path, f"{video_title}.webm")
def convert_to_mp4(input_path, output_path):
output_file = os.path.join(output_path, 'video.mp4')
command = ['ffmpeg', '-i', input_path, '-c', 'copy', output_file]
subprocess.run(command, check=True)
return output_file
def extract_audio_from_video(video_path):
video_clip = VideoFileClip(video_path)
audio_output = os.path.join(output_path, 'audio.mp3')
audio_clip = video_clip.audio
audio_clip.write_audiofile(audio_output)
return audio_output
def convert_mp3_to_wav(mp3_path):
from pydub import AudioSegment
audio = AudioSegment.from_mp3(mp3_path)
wav_output = os.path.join(output_path, 'audio.wav')
audio.export(wav_output, format="wav")
return wav_output
def process_text(text):
model_name = "cardiffnlp/twitter-roberta-base-emotion"
emotion_labels = ['anger', 'joy', 'optimism', 'sad']
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
emotion_probs = torch.softmax(logits, dim=-1).squeeze()
predicted_emotion = emotion_labels[torch.argmax(emotion_probs)]
emotion_dict = {emotion_labels[i]: emotion_probs[i].item() for i in range(len(emotion_labels))}
return emotion_dict, predicted_emotion
def preprocess_frame(frame):
frame = cv2.resize(frame, (224, 224))
pixel_values = caption_processor(images=frame, return_tensors="pt").pixel_values
return pixel_values
def generate_caption(pixel_values):
caption_ids = caption_model.generate(pixel_values)
caption = caption_processor.batch_decode(caption_ids, skip_special_tokens=True)[0]
return caption
def predict_emotions(caption):
inputs = emotion_tokenizer(caption, return_tensors='pt', truncation=True, padding=True)
outputs = emotion_model(**inputs)
emotion_probs = torch.softmax(outputs.logits, dim=1)
predicted_emotions = {label: prob.item() for label, prob in zip(emotion_labels, emotion_probs[0])}
return predicted_emotions
caption_model_name = "Salesforce/blip-image-captioning-base"
caption_processor = BlipProcessor.from_pretrained(caption_model_name)
caption_model = BlipForConditionalGeneration.from_pretrained(caption_model_name)
emotion_model_name = "j-hartmann/emotion-english-distilroberta-base"
emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name)
emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name)
def analyze_video(video_url):
global output_path
output_path = './'
video_path = download_youtube_video(video_url, output_path)
mp4_path = convert_to_mp4(video_path, output_path)
audio_path = extract_audio_from_video(mp4_path)
audio_wav_path = convert_mp3_to_wav(audio_path)
model_whisper = whisper.load_model("base")
result_whisper = model_whisper.transcribe(audio_wav_path)
transcript = result_whisper['text']
emotion_dict_text, predicted_emotion_text = process_text(transcript)
n_frame_interval = 60
emotion_vectors_video = []
video_capture = cv2.VideoCapture(mp4_path)
total_frames_video = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
frame_count_video = 0
while video_capture.isOpened():
ret_video, frame_video = video_capture.read()
if not ret_video or frame_count_video > total_frames_video:
break
if frame_count_video % n_frame_interval == 0:
pixel_values_video = preprocess_frame(frame_video)
caption_video = generate_caption(pixel_values_video)
predicted_emotions_video, _ = predict_emotions(caption_video)
emotion_vectors_video.append(np.array(list(predicted_emotions_video.values())))
frame_count_video += 1
video_capture.release()
average_emotion_vector_video = np.mean(emotion_vectors_video, axis=0)
combined_emotion_vector_final = np.concatenate((np.array(list(emotion_dict_text.values())), average_emotion_vector_video))
final_most_predicted_index = np.argmax(combined_emotion_vector_final)
final_most_predicted_emotion = list(emotion_dict_text.keys())[final_most_predicted_index]
return transcript, predicted_emotion_text, final_most_predicted_emotion
iface = gr.Interface(fn=analyze_video,
inputs=gr.Textbox(label="YouTube Video URL"),
outputs=["text", "text", "text"],
title="Multimodal Emotion Recognition",
description="Enter a YouTube Video URL to analyze emotions from both audio and visual content.")
if __name__ == "__main__":
iface.launch()
|