|
from transformers import PreTrainedModel |
|
from nltk.corpus import stopwords |
|
from nltk.tokenize import word_tokenize |
|
from .config_gzipembed import * |
|
from tqdm.auto import tqdm |
|
import torch |
|
import gzip |
|
import multiprocessing |
|
class GZIPEmbeddingModel(PreTrainedModel): |
|
config_class = GZIPEmbeddingConfig |
|
def __init__(self, config): |
|
super().__init__(config) |
|
if config.reduction: |
|
self.reduction_head = torch.nn.Linear(len(config.corpus), config.reduced_dimension) |
|
else: |
|
self.reduction_head = None |
|
self.dummy_parameter = torch.nn.Parameter(torch.ones(1)) |
|
|
|
def forward(self, prompt, num_procs=16, return_tensor=True): |
|
global calculate_ncd_row |
|
global p |
|
def calculate_ncd_row(data_row): |
|
i = data_row[0] |
|
row = self.ncd(data_row[1], p) |
|
return i, row |
|
if type(prompt) == str: |
|
prompt = [prompt] |
|
x = [] |
|
for p in prompt: |
|
ncd = [0] * len(self.config.corpus) |
|
with multiprocessing.Pool(num_procs) as pool: |
|
data = enumerate(self.config.corpus) |
|
results = pool.map(calculate_ncd_row,data) |
|
for i,row in results: |
|
ncd[i]=row |
|
x.append(ncd) |
|
if self.reduction_head is not None: |
|
x = torch.tensor(x) |
|
x = x.to(self.reduction_head.dtype).to(self.reduction_head.device) |
|
return self.reduction_head(x) |
|
return x if not return_tensor else torch.tensor(x) |
|
|
|
def encode(self, sentences, batch_size=32, **kwargs): |
|
""" |
|
Returns a list of embeddings for the given sentences. |
|
Args: |
|
sentences (`List[str]`): List of sentences to encode |
|
batch_size (`int`): Batch size for the encoding |
|
|
|
Returns: |
|
`List[np.ndarray]` or `List[tensor]`: List of embeddings for the given sentences |
|
""" |
|
import numpy as np |
|
x = self.forward(sentences, num_procs=batch_size, return_tensor=False) |
|
|
|
return [np.array(i) for i in x] |
|
|
|
def normalize(self, x): |
|
x = ''.join([char for char in x.lower() if char in "abcdefghijklmnopqrstuvwxyz "]) |
|
x = word_tokenize(x) |
|
x = [w for w in x if not w in self.config.stop_words] |
|
return ' '.join(x) |
|
|
|
def ncd(self, x, y): |
|
_x = self.normalize(x) if self.config.normalize else x |
|
_y = self.normalize(y) if (not self.config.normalized_corpus) and self.config.normalize else y |
|
x_c = len(gzip.compress(_x.encode())) |
|
y_c = len(gzip.compress(_y.encode())) |
|
xy_c = len(gzip.compress(f"{_x} {_y}".encode())) |
|
return (xy_c-min(x_c,y_c))/max(x_c,y_c) |
|
|
|
def gzip_embed( |
|
self, |
|
corpus, |
|
document, |
|
verbose=False, |
|
): |
|
embedding = [] |
|
for reference_document in (corpus if not verbose else tqdm(corpus)): |
|
embedding.append(self.ncd(reference_document, document)) |
|
return embedding |
|
|
|
def dimensionality(self): |
|
return len(self.config.corpus) |
|
|