|
import os |
|
import re |
|
import json |
|
import pandas as pd |
|
from dateutil import parser |
|
|
|
from fuzzywuzzy import fuzz |
|
|
|
|
|
class Evaluator: |
|
def __init__(self, predicted_json_filepath: str, dataset_json_filepath: str, |
|
output_metrics_filename: str | None = None, |
|
save_metrics_in_folder: bool = False): |
|
self.predicted_json_filepath = predicted_json_filepath |
|
self.dataset_json_filepath = dataset_json_filepath |
|
self.output_metrics_filename = output_metrics_filename |
|
self.save_metrics_in_folder = save_metrics_in_folder |
|
self.results = {} |
|
self.YandY_hat_dicts = self.load_and_combine_data() |
|
|
|
def load_and_combine_data(self) -> dict: |
|
with open(self.dataset_json_filepath, 'r') as f: |
|
ground_truth_dict = json.load(f) |
|
|
|
with open(self.predicted_json_filepath, 'r') as f: |
|
predicted_json = json.load(f) |
|
|
|
return {filename: {'Y_hat': predicted_json[filename], 'Y': ground_truth_dict[filename]} for filename in |
|
predicted_json.keys() if filename in ground_truth_dict.keys()} |
|
|
|
@staticmethod |
|
def _normalize_string(s: str) -> str: |
|
"""Convert to lowercase and remove extra spaces.""" |
|
if not isinstance(s, str): |
|
s = str(s) |
|
return ''.join(e for e in s.lower().strip()) |
|
|
|
@staticmethod |
|
def _clean_numeric_value(s: str) -> str: |
|
""" |
|
Extract the numeric part of the total value and format it to two decimal places. |
|
The total for some engines comes from calculations and not extraction from text so, |
|
it can be instead of 19.93, 19.9299867. That's why we do rounding in 3rd and keep the 2 decimals |
|
""" |
|
pattern = r'\d+\.?\d*' |
|
match = re.search(pattern, str(s)) |
|
if not match: |
|
return s |
|
|
|
num = float(match.group(0)) |
|
rounded_num = round(num, 3) |
|
formatted_num = f"{rounded_num:.2f}" |
|
return formatted_num |
|
|
|
@staticmethod |
|
def convert_to_ddmmyyyy(date_str): |
|
try: |
|
date_obj = parser.parse(date_str, dayfirst=False) |
|
return date_obj.strftime('%d/%m/%Y') |
|
except ValueError: |
|
return date_str |
|
|
|
def get_similarity_results(self, y_hat: dict, y: dict, threshold: int = 70): |
|
"""Compare two dictionaries and calculate similarity scores.""" |
|
results = {} |
|
for key in y_hat.keys(): |
|
normalized_hat = self._normalize_string(y_hat[key]) |
|
normalized_y = self._normalize_string(y[key]) |
|
|
|
|
|
if key == "total": |
|
normalized_hat = self._clean_numeric_value(normalized_hat) |
|
normalized_y = self._clean_numeric_value(normalized_y) |
|
|
|
if key == "date": |
|
normalized_hat = self.convert_to_ddmmyyyy(normalized_hat) |
|
normalized_y = self.convert_to_ddmmyyyy(normalized_y) |
|
|
|
similarity = fuzz.partial_ratio(normalized_hat, normalized_y) |
|
results[key] = { |
|
'Y_hat': y_hat[key], |
|
'Y': y[key], |
|
'similarity': similarity, |
|
'match': similarity >= threshold |
|
} |
|
return results |
|
|
|
def save_results(self): |
|
"""Save the results to a JSON file.""" |
|
with open(self.output_metrics_filename, 'w') as f: |
|
json.dump(self.results, f, indent=4, sort_keys=True) |
|
|
|
def calculate_metrics(self, save_to_folder: bool = False, eval_results_folder: str | None = None): |
|
|
|
data = self.results |
|
|
|
|
|
metrics = {} |
|
|
|
|
|
for file_attrs in data.values(): |
|
for attribute, details in file_attrs.items(): |
|
if attribute not in metrics: |
|
metrics[attribute] = {"correct": 0, "total": 0, "similarity_sum": 0} |
|
|
|
metrics[attribute]["total"] += 1 |
|
metrics[attribute]["similarity_sum"] += details["similarity"] |
|
if details["match"]: |
|
metrics[attribute]["correct"] += 1 |
|
|
|
|
|
field_metrics = [] |
|
for attribute, counts in metrics.items(): |
|
accuracy = counts["correct"] / counts["total"] * 100 |
|
avg_similarity = counts["similarity_sum"] / counts["total"] |
|
|
|
field_metrics.append({ |
|
"attribute": attribute, |
|
"accuracy": accuracy, |
|
"avg_similarity": avg_similarity |
|
}) |
|
|
|
|
|
field_metrics_df = pd.DataFrame(field_metrics) |
|
|
|
|
|
results = [] |
|
for filename, attributes in data.items(): |
|
row = {"filename": filename} |
|
passed = True |
|
failed_attributes = [] |
|
|
|
for attribute, details in attributes.items(): |
|
similarity_score = details["similarity"] |
|
match = details["match"] |
|
row[attribute + "_similarity"] = similarity_score |
|
if not match: |
|
passed = False |
|
failed_attributes.append(attribute) |
|
|
|
row["passed"] = passed |
|
row["failed_attributes"] = ", ".join(failed_attributes) if failed_attributes else "None" |
|
|
|
results.append(row) |
|
|
|
|
|
file_metrics_df = pd.DataFrame(results) |
|
|
|
|
|
if save_to_folder: |
|
base_name = os.path.splitext(os.path.basename(self.predicted_json_filepath))[0] |
|
directory = eval_results_folder if eval_results_folder else f"{base_name}_eval_results" |
|
|
|
if not os.path.exists(directory): |
|
os.makedirs(directory) |
|
|
|
field_metrics_df.to_csv(os.path.join(directory, f"{base_name}_field_metrics.csv"), index=False) |
|
file_metrics_df.to_csv(os.path.join(directory, f"{base_name}_file_metrics.csv"), index=False) |
|
with open(os.path.join(directory, f'{base_name}_YnY_hat.json'), 'w') as outfile: |
|
outfile.write(json.dumps(self.results)) |
|
|
|
print(f"Saved {base_name} evaluation results in {directory}") |
|
|
|
return field_metrics_df, file_metrics_df |
|
|
|
def evaluate_predictions(self, filenames_to_skip: list[str] = []): |
|
"""Process the data and store in results.""" |
|
for filename, data_dict in self.YandY_hat_dicts.items(): |
|
if filename in filenames_to_skip: |
|
continue |
|
y_hat_dict = data_dict['Y_hat'] |
|
y = data_dict['Y'] |
|
self.results[filename] = self.get_similarity_results(y_hat_dict, y) |
|
field_df, file_df = self.calculate_metrics(self.save_metrics_in_folder, self.output_metrics_filename) |
|
return self.results, field_df, file_df |
|
|
|
|
|
def evaluate_predictions_for_list( |
|
predictions_json_filenames: list[str] | str, filenames_to_skip: list[str] = [], |
|
dataset_json_filepath: str = 'data/ground_truth/sroie_ground_truth.json', |
|
save_metrics_in_folder: bool = True): |
|
outputs = {} |
|
|
|
if isinstance(predictions_json_filenames, str): |
|
predictions_json_filenames = [predictions_json_filenames] |
|
for predictions_json_filename in predictions_json_filenames: |
|
analyzer = Evaluator( |
|
predictions_json_filename, |
|
dataset_json_filepath, |
|
save_metrics_in_folder=save_metrics_in_folder, |
|
) |
|
outputs[os.path.basename(predictions_json_filename)] = analyzer.evaluate_predictions(filenames_to_skip=filenames_to_skip) |
|
return outputs |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
|
|
|
|
|
|
analyzer = Evaluator( |
|
r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\Qwen2vl\qwen2_vl2b_results.json', |
|
r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\data\sroie_task2train.json', |
|
save_metrics_in_folder=True |
|
) |
|
|
|
|
|
analyzer.evaluate_predictions() |
|
|