gzip-openhermes / modeling_gzipembed.py
crumb's picture
Update modeling_gzipembed.py
c8a0a8a
from transformers import PreTrainedModel
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from .config_gzipembed import *
from tqdm.auto import tqdm
import torch
import gzip
import multiprocessing
class GZIPEmbeddingModel(PreTrainedModel):
config_class = GZIPEmbeddingConfig
def __init__(self, config):
super().__init__(config)
if config.reduction:
self.reduction_head = torch.nn.Linear(len(config.corpus), config.reduced_dimension)
else:
self.reduction_head = None
self.dummy_parameter = torch.nn.Parameter(torch.ones(1))
def forward(self, prompt, num_procs=16, return_tensor=True):
global calculate_ncd_row
global p
def calculate_ncd_row(data_row):
i = data_row[0]
row = self.ncd(data_row[1], p)
return i, row
if type(prompt) == str:
prompt = [prompt]
x = []
for p in prompt:
ncd = [0] * len(self.config.corpus)
with multiprocessing.Pool(num_procs) as pool:
data = enumerate(self.config.corpus)
results = pool.map(calculate_ncd_row,data)
for i,row in results:
ncd[i]=row
x.append(ncd)
if self.reduction_head is not None:
x = torch.tensor(x)
x = x.to(self.reduction_head.dtype).to(self.reduction_head.device)
return self.reduction_head(x)
return x if not return_tensor else torch.tensor(x)
def encode(self, sentences, batch_size=32, **kwargs):
"""
Returns a list of embeddings for the given sentences.
Args:
sentences (`List[str]`): List of sentences to encode
batch_size (`int`): Batch size for the encoding
Returns:
`List[np.ndarray]` or `List[tensor]`: List of embeddings for the given sentences
"""
import numpy as np
x = self.forward(sentences, num_procs=batch_size, return_tensor=False)
# return [torch.tensor(i) for i in x]
return [np.array(i) for i in x] # test?
def normalize(self, x):
x = ''.join([char for char in x.lower() if char in "abcdefghijklmnopqrstuvwxyz "])
x = word_tokenize(x)
x = [w for w in x if not w in self.config.stop_words]
return ' '.join(x)
def ncd(self, x, y):
_x = self.normalize(x) if self.config.normalize else x
_y = self.normalize(y) if (not self.config.normalized_corpus) and self.config.normalize else y
x_c = len(gzip.compress(_x.encode()))
y_c = len(gzip.compress(_y.encode()))
xy_c = len(gzip.compress(f"{_x} {_y}".encode()))
return (xy_c-min(x_c,y_c))/max(x_c,y_c)
def gzip_embed(
self,
corpus,
document,
verbose=False,
):
embedding = []
for reference_document in (corpus if not verbose else tqdm(corpus)):
embedding.append(self.ncd(reference_document, document))
return embedding
def dimensionality(self):
return len(self.config.corpus)