Spaces:
Runtime error
Runtime error
from Levenshtein import distance as lev_distance | |
import random | |
import json | |
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction, corpus_bleu | |
from nltk.translate.meteor_score import meteor_score | |
from rouge_score import rouge_scorer | |
from tqdm import tqdm | |
import random | |
import numpy as np | |
import argparse | |
from paragraph2actions.readable_converter import ReadableConverter | |
import re | |
from transformers import AutoTokenizer | |
from collections import defaultdict | |
import time | |
from functools import wraps | |
import os | |
import torch | |
import textdistance | |
from typing import List | |
def levenshtein_similarity(truth: List[str], pred: List[str]) -> List[float]: | |
assert len(truth) == len(pred) | |
scores: List[float] = [ | |
textdistance.levenshtein.normalized_similarity(t, p) | |
for t, p in zip(truth, pred) | |
] | |
return scores | |
def modified_bleu(truth: List[str], pred: List[str], bleu_n=4) -> float: | |
""" | |
Calculates the BLEU score of a translation, with a small modification in order not to penalize sentences | |
with less than 4 words. | |
Returns: | |
value between 0 and 1. | |
""" | |
references = [sentence.split() for sentence in truth] | |
candidates = [sentence.split() for sentence in pred] | |
# BLEU penalizes sentences with only one word. Even correct translations get a score of zero. | |
references = [r + max(0, bleu_n - len(r)) * [""] for r in references] | |
candidates = [c + max(0, bleu_n - len(c)) * [""] for c in candidates] | |
# references must have a larger depth because it supports multiple choices | |
refs = [[r] for r in references] | |
weights = { | |
2: (0.5, 0.5), | |
4: (0.25, 0.25, 0.25, 0.25), | |
} | |
return 100*corpus_bleu(refs, candidates, weights=weights[bleu_n]) # type: ignore[no-any-return] | |
def set_random_seed(seed): | |
random.seed(seed) | |
os.environ['PYTHONHASHSEED'] = str(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) # If using multi-GPU. | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
def time_it(func): | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = func(*args, **kwargs) | |
end_time = time.time() | |
print(f"Function {func.__name__} finished in {end_time - start_time:.5f} seconds.\n") | |
return result | |
return wrapper | |
def accuracy_score(score_list, threshold): | |
matches = sum(score>=threshold for score in score_list) | |
acc = matches / len(score_list) | |
return acc | |
def extract_tokenized_entities(text): | |
pattern = r'\$[^\$]+\$|#[^#]+#|@[^\@]+@' | |
return re.findall(pattern, text) | |
def extract_reactant_cnt(text): | |
max_id = None | |
for token in text.split(): | |
if token.startswith('$') and token.endswith('$'): | |
try: | |
current_id = int(token.strip('$')) | |
if max_id is None or current_id > max_id: | |
max_id = current_id | |
except ValueError: | |
pass # Ignore tokens that do not represent an integer | |
if not max_id: | |
return 0 | |
return max_id | |
class Metric_calculator: | |
def __init__(self, text_trunc_length=1024): | |
self.converter = ReadableConverter(separator=' ; ') | |
self.tokenizer = AutoTokenizer.from_pretrained('allenai/scibert_scivocab_uncased', use_fast=False, padding_side='right') | |
self.tokenizer.add_special_tokens({'pad_token': '<pad>'}) | |
self.text_trunc_length = text_trunc_length | |
self.scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL']) | |
def tokenize(self, gt_list, pred_list): | |
references = [] | |
hypotheses = [] | |
for gt, out in tqdm(zip(gt_list, pred_list)): | |
gt_tokens = self.tokenizer.tokenize(gt) | |
## added for galactica | |
gt_tokens = list(filter(('<pad>').__ne__, gt_tokens)) | |
gt_tokens = list(filter(('[PAD]').__ne__, gt_tokens)) | |
gt_tokens = list(filter(('[CLS]').__ne__, gt_tokens)) | |
gt_tokens = list(filter(('[SEP]').__ne__, gt_tokens)) | |
out_tokens = self.tokenizer.tokenize(out) | |
out_tokens = list(filter(('<pad>').__ne__, out_tokens)) | |
out_tokens = list(filter(('[PAD]').__ne__, out_tokens)) | |
out_tokens = list(filter(('[CLS]').__ne__, out_tokens)) | |
out_tokens = list(filter(('[SEP]').__ne__, out_tokens)) | |
references.append([gt_tokens]) | |
hypotheses.append(out_tokens) | |
return references, hypotheses | |
def __call__(self, gt_list, pred_list, use_tokenizer=False): | |
gt_list = [gt.strip() for gt in gt_list] | |
pred_list = [pred.strip() for pred in pred_list] | |
if use_tokenizer: | |
references, hypotheses = self.tokenize(gt_list, pred_list) | |
bleu2, bleu4 = self.bleu(references, hypotheses) | |
_meteor_score = self.meteor(references, hypotheses) | |
else: | |
bleu2 = modified_bleu(gt_list, pred_list, bleu_n=2) | |
bleu4 = modified_bleu(gt_list, pred_list, bleu_n=4) | |
_meteor_score = 0 | |
rouge_1, rouge_2, rouge_l = self.rouge(gt_list, pred_list) | |
validity = self.validity(gt_list, pred_list) | |
acc_100, acc_90, acc_75, acc_50 = self.accuracy(gt_list, pred_list) | |
print('BLEU-2 score:', bleu2) | |
print('BLEU-4 score:', bleu4) | |
print('Average Meteor score:', _meteor_score) | |
print('rouge1:', rouge_1) | |
print('rouge2:', rouge_2) | |
print('rougeL:', rouge_l) | |
print(f'Validity: {validity:.6f}') | |
print(f'Accuracy (100): {acc_100:.6f}') | |
print(f'Accuracy (90): {acc_90:.6f}') | |
print(f'Accuracy (75): {acc_75:.6f}') | |
print(f'Accuracy (50): {acc_50:.6f}') | |
line = '' | |
for score in [validity, bleu2, bleu4, acc_100, acc_90, acc_75, acc_50, rouge_1, rouge_2, rouge_l, _meteor_score]: | |
line += f'{score:.6f} ' | |
print(line) | |
return { | |
'bleu2': bleu2, | |
'bleu4': bleu4, | |
'rouge_1': rouge_1, | |
'rouge_2': rouge_2, | |
'rouge_l': rouge_l, | |
'meteor_score': _meteor_score, | |
'validity': validity, | |
'acc_100': acc_100, | |
'acc_90': acc_90, | |
'acc_75': acc_75, | |
'acc_50': acc_50, | |
} | |
def get_result_list(self, gt_list, pred_list, use_tokenizer=False): | |
gt_list = [gt.strip() for gt in gt_list] | |
pred_list = [pred.strip() for pred in pred_list] | |
if use_tokenizer: | |
references, hypotheses = self.tokenize(gt_list, pred_list) | |
bleu2 = [corpus_bleu([gt], [pred], weights=(.5,.5)) for gt, pred in zip(references, hypotheses)] | |
bleu4 = [corpus_bleu([gt], [pred], weights=(.25,.25,.25,.25)) for gt, pred in zip(references, hypotheses)] | |
_meteor_score = [meteor_score(gt, out) for gt, out in zip(references, hypotheses)] | |
else: | |
bleu2 = [modified_bleu([gt], [pred], bleu_n=2) for gt, pred in zip(gt_list, pred_list)] | |
bleu4 = [modified_bleu([gt], [pred], bleu_n=4) for gt, pred in zip(gt_list, pred_list)] | |
_meteor_score = 0 | |
rouge_1, rouge_2, rouge_l = self.rouge(gt_list, pred_list, return_list=True) | |
lev_score = levenshtein_similarity(gt_list, pred_list) | |
return { | |
'bleu2': bleu2, | |
'bleu4': bleu4, | |
'rouge_1': rouge_1, | |
'rouge_2': rouge_2, | |
'rouge_l': rouge_l, | |
'meteor_score': _meteor_score, | |
'lev_score': lev_score, | |
} | |
def bleu(self, references, hypotheses): | |
bleu2 = corpus_bleu(references, hypotheses, weights=(.5,.5)) | |
bleu4 = corpus_bleu(references, hypotheses, weights=(.25,.25,.25,.25)) | |
bleu2 *= 100 | |
bleu4 *= 100 | |
return bleu2, bleu4 | |
def meteor(self, references, hypotheses): | |
meteor_scores = [] | |
for gt, out in zip(references, hypotheses): | |
mscore = meteor_score(gt, out) | |
meteor_scores.append(mscore) | |
_meteor_score = np.mean(meteor_scores) | |
_meteor_score *= 100 | |
return _meteor_score | |
def rouge(self, targets, predictions, return_list=False): | |
rouge_scores = [] | |
for gt, out in zip(targets, predictions): | |
rs = self.scorer.score(out, gt) | |
rouge_scores.append(rs) | |
rouge_1 = [rs['rouge1'].fmeasure for rs in rouge_scores] | |
rouge_2 = [rs['rouge2'].fmeasure for rs in rouge_scores] | |
rouge_l = [rs['rougeL'].fmeasure for rs in rouge_scores] | |
if return_list: | |
return rouge_1, rouge_2, rouge_l | |
rouge_1 = np.mean(rouge_1) * 100 | |
rouge_2 = np.mean(rouge_2) * 100 | |
rouge_l = np.mean(rouge_l) * 100 | |
return rouge_1, rouge_2, rouge_l | |
def validity(self, gt_list, pred_list): | |
num_valid, n = 0, len(pred_list) | |
for pred, gt in zip(pred_list, gt_list): | |
try: | |
actions = self.converter.string_to_actions(pred) | |
max_token_pred = extract_reactant_cnt(pred) | |
max_token_gt = extract_reactant_cnt(gt) | |
assert max_token_gt >= max_token_pred | |
num_valid += 1 | |
except: | |
pass | |
return 100*(num_valid / n) | |
def accuracy(self, gt_list, pred_list): | |
score_list = levenshtein_similarity(gt_list, pred_list) | |
acc_100 = 100*accuracy_score(score_list, 1.0) | |
acc_90 = 100*accuracy_score(score_list, 0.90) | |
acc_75 = 100*accuracy_score(score_list, 0.75) | |
acc_50 = 100*accuracy_score(score_list, 0.50) | |
return acc_100, acc_90, acc_75, acc_50 | |