File size: 9,000 Bytes
7c7c0ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os, glob
import librosa
import librosa.display
sample_rate=48000
def get_waveforms(file):
'''# load an individual sample audio file
# read the full 3 seconds of the file, cut off the first 0.5s of silence; native sample rate = 48k
# don't need to store the sample rate that librosa.load returns'''
waveform, _ = librosa.load(file, duration=3, offset=0.5, sr=sample_rate)
waveform_homo = np.zeros((int(sample_rate*3,)))
waveform_homo[:len(waveform)] = waveform
return waveform_homo
class SER(nn.Module):
# Define all layers present in the network
def __init__(self,num_emotions):
super().__init__()
'''################ TRANSFORMER BLOCK #############################'''
self.transformer_maxpool = nn.MaxPool2d(kernel_size=[1,4], stride=[1,4])
transformer_layer = nn.TransformerEncoderLayer(
d_model=40, # input feature (frequency) dim after maxpooling 40*282 -> 40*70 (MFC*time)
nhead=4, # 4 self-attention layers in each multi-head self-attention layer in each encoder block
dim_feedforward=512, # 2 linear layers in each encoder block's feedforward network: dim 40-->512--->40
dropout=0.4,
activation='relu' # ReLU: avoid saturation/tame gradient/reduce compute time
)
self.transformer_encoder = nn.TransformerEncoder(transformer_layer, num_layers=4)
'''############### 1ST PARALLEL 2D CONVOLUTION BLOCK ############'''
self.conv2Dblock1 = nn.Sequential(
nn.Conv2d(
in_channels=1, # input volume depth == input channel dim == 1
out_channels=16, # expand output feature map volume's depth to 16
kernel_size=3, # 3*3 stride 1 kernel
stride=1,
padding=1
),
nn.BatchNorm2d(16), # batch normalize the output feature map before activation
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Dropout(p=0.3),
# 2nd 2D convolution layer identical to last except output dim, maxpool kernel
nn.Conv2d(
in_channels=16,
out_channels=32, # expand output feature map volume's depth to 32
kernel_size=3,
stride=1,
padding=1
),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=4, stride=4), # increase maxpool kernel for subsequent filters
nn.Dropout(p=0.3),
# 3rd 2D convolution layer identical to last except output dim
nn.Conv2d(
in_channels=32,
out_channels=64, # expand output feature map volume's depth to 64
kernel_size=3,
stride=1,
padding=1
),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=4, stride=4),
nn.Dropout(p=0.3),
)
'''############### 2ND PARALLEL 2D CONVOLUTION BLOCK ############'''
self.conv2Dblock2 = nn.Sequential(
# 1st 2D convolution layer
nn.Conv2d(
in_channels=1, # input volume depth == input channel dim == 1
out_channels=16,
kernel_size=3, #3*3 stride 1 kernel
stride=1,
padding=1
),
nn.BatchNorm2d(16), # batch normalize the output feature map before activation
nn.ReLU(), # feature map --> activation map
nn.MaxPool2d(kernel_size=2, stride=2), #typical maxpool kernel size
nn.Dropout(p=0.3), #randomly zero 30% of 1st layer's output feature map in training
# 2nd 2D convolution layer identical to last except output dim, maxpool kernel
nn.Conv2d(
in_channels=16,
out_channels=32, # expand output feature map volume's depth to 32
kernel_size=3,
stride=1,
padding=1
),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=4, stride=4), # increase maxpool kernel for subsequent filters
nn.Dropout(p=0.3),
# 3rd 2D convolution layer identical to last except output dim
nn.Conv2d(
in_channels=32,
out_channels=64, # expand output feature map volume's depth to 64
kernel_size=3,
stride=1,
padding=1
),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=4, stride=4),
nn.Dropout(p=0.3),
)
# Each full convolution block outputs (64*1*8) embedding flattened to dim 512 1D array
# Full transformer block outputs 40*70 feature map, which we time-avg to dim 40 1D array
# 512*2+40 == 1064 input features --> 8 output emotions
self.fc1_linear = nn.Linear(512*2+40,num_emotions)
self.softmax_out = nn.Softmax(dim=1)
def forward(self,x):
'''############ 1st parallel Conv2D block: 4 Convolutional layers ############################'''
conv2d_embedding1 = self.conv2Dblock1(x) # x == N/batch * channel * freq * time
# flatten final 64*1*8 feature map from convolutional layers to length 512 1D array
conv2d_embedding1 = torch.flatten(conv2d_embedding1, start_dim=1)
'''############ 2nd parallel Conv2D block: 4 Convolutional layers #############################'''
conv2d_embedding2 = self.conv2Dblock2(x) # x == N/batch * channel * freq * time
conv2d_embedding2 = torch.flatten(conv2d_embedding2, start_dim=1)
x_maxpool = self.transformer_maxpool(x)
# remove channel dim: 1*40*70 --> 40*70
x_maxpool_reduced = torch.squeeze(x_maxpool,1)
# transformer encoder layer requires tensor in format: time * batch * embedding (freq)
x = x_maxpool_reduced.permute(2,0,1)
# finally, pass reduced input feature map x into transformer encoder layers
transformer_output = self.transformer_encoder(x)
transformer_embedding = torch.mean(transformer_output, dim=0) # dim 40x70 --> 40
# concatenate embedding tensors output by parallel 2*conv and 1*transformer blocks
complete_embedding = torch.cat([conv2d_embedding1, conv2d_embedding2,transformer_embedding], dim=1)
output_logits = self.fc1_linear(complete_embedding)
output_softmax = self.softmax_out(output_logits)
return output_logits, output_softmax
emotions_dict ={
'0':'surprised',
'1':'neutral',
'2':'calm',
'3':'happy',
'4':'sad',
'5':'angry',
'6':'fearful',
'7':'disgust'
}
def load_checkpoint(optimizer, model, filename):
checkpoint_dict = torch.load(filename,map_location=torch.device('cpu'))
epoch = checkpoint_dict['epoch']
model.load_state_dict(checkpoint_dict['model'])
if optimizer is not None:
optimizer.load_state_dict(checkpoint_dict['optimizer'])
return epoch
def make_validate_fnc(model,criterion):
def validate(X,Y):
with torch.no_grad():
# set model to validation phase i.e. turn off dropout and batchnorm layers
model.eval()
# get the model's predictions on the validation set
output_logits, output_softmax = model(X)
predictions = torch.argmax(output_softmax,dim=1)
# calculate the mean accuracy over the entire validation set
accuracy = torch.sum(Y==predictions)/float(len(Y))
# compute error from logits (nn.crossentropy implements softmax)
loss = criterion(output_logits,Y)
return loss.item(), accuracy*100, predictions
return validate
model = SER(len(emotions_dict))
optimizer = torch.optim.SGD(model.parameters(),lr=0.01, weight_decay=1e-3, momentum=0.8)
load_checkpoint(optimizer, model, "SERFINAL-099.pkl")
waveform = get_waveforms("03-01-08-01-01-01-01.wav")
waveforms = np.array(waveform)
mfc=librosa.feature.mfcc(
y=waveforms,
sr=48000,
n_mfcc=40,
n_fft=1024,
win_length=512,
window='hamming',
n_mels=128,
fmax=48000/2
)
X = np.expand_dims(mfc, axis=1)
X=np.expand_dims(X,axis=1)
X = X.transpose(1, 2, 0,3) # assign the result back to arr
X=torch.tensor(X)
X=X.float()
with torch.no_grad():
# set model to validation phase i.e. turn off dropout and batchnorm layers
model.eval()
# get the model's predictions on the validation set
output_logits, output_softmax = model(X)
predictions = torch.argmax(output_softmax,dim=1)
pred = predictions.cpu().numpy()
x=pred[0]
x=str(x)
emotions_dict[x] |