File size: 5,639 Bytes
d6d8b90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
909f75a
 
 
 
 
d6d8b90
909f75a
 
d6d8b90
909f75a
 
d6d8b90
909f75a
 
 
 
 
d6d8b90
909f75a
d6d8b90
909f75a
d6d8b90
909f75a
 
 
 
d6d8b90
909f75a
d6d8b90
909f75a
d6d8b90
 
 
 
909f75a
d6d8b90
909f75a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import subprocess
import gradio as gr
import whisper
import yt_dlp
import torch
import numpy as np
from moviepy.editor import VideoFileClip
from transformers import AutoModelForAudioClassification, AutoFeatureExtractor
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import BlipProcessor, BlipForConditionalGeneration
import cv2

def download_youtube_video(video_url, output_path):
    ydl_opts = {
        'format': 'bestvideo+bestaudio',
        'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'),
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])
        video_info = ydl.extract_info(video_url, download=False)
        video_title = video_info.get('title', 'video')
        return os.path.join(output_path, f"{video_title}.webm")

def convert_to_mp4(input_path, output_path):
    output_file = os.path.join(output_path, 'video.mp4')
    command = ['ffmpeg', '-i', input_path, '-c', 'copy', output_file]
    subprocess.run(command, check=True)
    return output_file

def extract_audio_from_video(video_path):
    video_clip = VideoFileClip(video_path)
    audio_output = os.path.join(output_path, 'audio.mp3')
    audio_clip = video_clip.audio
    audio_clip.write_audiofile(audio_output)
    return audio_output

def convert_mp3_to_wav(mp3_path):
    from pydub import AudioSegment
    audio = AudioSegment.from_mp3(mp3_path)
    wav_output = os.path.join(output_path, 'audio.wav')
    audio.export(wav_output, format="wav")
    return wav_output

def process_text(text):
    model_name = "cardiffnlp/twitter-roberta-base-emotion"
    emotion_labels = ['anger', 'joy', 'optimism', 'sad']
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)

    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
    
    emotion_probs = torch.softmax(logits, dim=-1).squeeze()
    predicted_emotion = emotion_labels[torch.argmax(emotion_probs)]
    
    emotion_dict = {emotion_labels[i]: emotion_probs[i].item() for i in range(len(emotion_labels))}
    
    return emotion_dict, predicted_emotion

def preprocess_frame(frame):
    frame = cv2.resize(frame, (224, 224))
    pixel_values = caption_processor(images=frame, return_tensors="pt").pixel_values
    return pixel_values

def generate_caption(pixel_values):
    caption_ids = caption_model.generate(pixel_values)
    caption = caption_processor.batch_decode(caption_ids, skip_special_tokens=True)[0]
    return caption

def predict_emotions(caption):
    inputs = emotion_tokenizer(caption, return_tensors='pt', truncation=True, padding=True)
    outputs = emotion_model(**inputs)
    
    emotion_probs = torch.softmax(outputs.logits, dim=1)
    
    predicted_emotions = {label: prob.item() for label, prob in zip(emotion_labels, emotion_probs[0])}
    
    return predicted_emotions

caption_model_name = "Salesforce/blip-image-captioning-base"
caption_processor = BlipProcessor.from_pretrained(caption_model_name)
caption_model = BlipForConditionalGeneration.from_pretrained(caption_model_name)

emotion_model_name = "j-hartmann/emotion-english-distilroberta-base"
emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name)
emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name)

def analyze_video(video_url):
    global output_path
    output_path = './'
    video_path = download_youtube_video(video_url, output_path)
    mp4_path = convert_to_mp4(video_path, output_path)
    audio_path = extract_audio_from_video(mp4_path)
    audio_wav_path = convert_mp3_to_wav(audio_path)
    model_whisper = whisper.load_model("base")
    
    result_whisper = model_whisper.transcribe(audio_wav_path)
    
    transcript = result_whisper['text']
    emotion_dict_text, predicted_emotion_text = process_text(transcript)
    
    n_frame_interval = 60
    emotion_vectors_video = []
    video_capture = cv2.VideoCapture(mp4_path)
    total_frames_video = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_count_video = 0

    while video_capture.isOpened():
        ret_video, frame_video = video_capture.read()

        if not ret_video or frame_count_video > total_frames_video:
            break

        if frame_count_video % n_frame_interval == 0:
            pixel_values_video = preprocess_frame(frame_video)
            caption_video = generate_caption(pixel_values_video)
            predicted_emotions_video, _ = predict_emotions(caption_video)
            emotion_vectors_video.append(np.array(list(predicted_emotions_video.values())))

        frame_count_video += 1

    video_capture.release()

    average_emotion_vector_video = np.mean(emotion_vectors_video, axis=0)
    combined_emotion_vector_final = np.concatenate((np.array(list(emotion_dict_text.values())), average_emotion_vector_video))
    final_most_predicted_index = np.argmax(combined_emotion_vector_final)
    final_most_predicted_emotion = list(emotion_dict_text.keys())[final_most_predicted_index]

    return transcript, predicted_emotion_text, final_most_predicted_emotion

iface = gr.Interface(fn=analyze_video,
                     inputs=gr.Textbox(label="YouTube Video URL"),
                     outputs=["text", "text", "text"],
                     title="Multimodal Emotion Recognition",
                     description="Enter a YouTube Video URL to analyze emotions from both audio and visual content.")

if __name__ == "__main__":
    iface.launch()