File size: 3,289 Bytes
dcd7182
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# This code is based on the descriptions in https://github.com/speechbrain/speechbrain/blob/develop/speechbrain/processing/PLDA_LDA.py
from pathlib import Path
from speechbrain.processing.PLDA_LDA import PLDA, StatObject_SB, Ndx, fast_PLDA_scoring
import numpy as np
import torch


class PLDAModel:

    def __init__(self, train_embeddings, results_path: Path=None):
        self.mean, self.F, self.Sigma = None, None, None

        files_exist = False
        if results_path and results_path.exists():
            files_exist = self.load_parameters(results_path)
        if not files_exist:
            self._train_plda(train_embeddings)
            self.save_parameters(results_path)

    def compute_distance(self, enrollment_vectors, trial_vectors):
        enrol_vecs = enrollment_vectors.cpu().numpy()
        en_sets, en_s, en_stat0 = self._get_vector_stats(enrol_vecs, sg_tag='en')
        en_stat = StatObject_SB(modelset=en_sets, segset=en_sets, start=en_s, stop=en_s, stat0=en_stat0,
                                stat1=enrol_vecs)

        trial_vecs = trial_vectors.cpu().numpy()
        te_sets, te_s, te_stat0 = self._get_vector_stats(trial_vecs, sg_tag='te')
        te_stat = StatObject_SB(modelset=te_sets, segset=te_sets, start=te_s, stop=te_s, stat0=te_stat0,
                                stat1=trial_vecs)

        ndx = Ndx(models=en_sets, testsegs=te_sets)
        scores_plda = fast_PLDA_scoring(en_stat, te_stat, ndx, self.mean, self.F, self.Sigma)
        return scores_plda.scoremat

    def save_parameters(self, filename):
        filename.parent.mkdir(parents=True, exist_ok=True)
        np.save(filename / 'plda_mean.npy', self.mean)
        np.save(filename / 'plda_F.npy', self.F)
        np.save(filename / 'plda_Sigma.npy', self.Sigma)

    def load_parameters(self, dir_path):
        existing_files = [x.name for x in dir_path.glob('*')]
        files_exist = True
        if 'plda_mean.npy' in existing_files:
            self.mean = np.load(dir_path / 'plda_mean.npy')
        else:
            files_exist = False

        if 'plda_F.npy' in existing_files:
            self.F = np.load(dir_path / 'plda_F.npy')
        else:
            files_exist = False

        if 'plda_Sigma.npy' in existing_files:
            self.Sigma = np.load(dir_path / 'plda_Sigma.npy')
        else:
            files_exist = False
        return files_exist

    def _train_plda(self, train_embeddings):
        vectors = train_embeddings.speaker_vectors.to(torch.float64)
        speakers = train_embeddings.speakers

        modelset = np.array([f'md{speaker}' for speaker in speakers], dtype="|O")
        segset, s, stat0 = self._get_vector_stats(vectors, sg_tag='sg')

        xvectors_stat = StatObject_SB(modelset=modelset, segset=segset, start=s, stop=s, stat0=stat0,
                                      stat1=vectors.cpu().numpy())

        plda = PLDA(rank_f=100)
        plda.plda(xvectors_stat)

        self.mean = plda.mean
        self.F = plda.F
        self.Sigma = plda.Sigma

    def _get_vector_stats(self, vectors, sg_tag='sg'):
        N, dim = vectors.shape
        segset = np.array([f'{sg_tag}{i}' for i in range(N)], dtype="|O")
        s = np.array([None] * N)
        stat0 = np.array([[1.0]] * N)
        return segset, s, stat0