KIE-Engines-Comparison / Evaluator.py
kkakkavas's picture
path problem
64e8a72
import os
import re
import json
import pandas as pd
from dateutil import parser
from fuzzywuzzy import fuzz # make sure you have fuzzywuzzy installed
class Evaluator:
def __init__(self, predicted_json_filepath: str, dataset_json_filepath: str,
output_metrics_filename: str | None = None,
save_metrics_in_folder: bool = False):
self.predicted_json_filepath = predicted_json_filepath
self.dataset_json_filepath = dataset_json_filepath
self.output_metrics_filename = output_metrics_filename
self.save_metrics_in_folder = save_metrics_in_folder
self.results = {}
self.YandY_hat_dicts = self.load_and_combine_data()
def load_and_combine_data(self) -> dict:
with open(self.dataset_json_filepath, 'r') as f:
ground_truth_dict = json.load(f)
with open(self.predicted_json_filepath, 'r') as f:
predicted_json = json.load(f)
return {filename: {'Y_hat': predicted_json[filename], 'Y': ground_truth_dict[filename]} for filename in
predicted_json.keys() if filename in ground_truth_dict.keys()}
@staticmethod
def _normalize_string(s: str) -> str:
"""Convert to lowercase and remove extra spaces."""
if not isinstance(s, str):
s = str(s)
return ''.join(e for e in s.lower().strip())
@staticmethod
def _clean_numeric_value(s: str) -> str:
"""
Extract the numeric part of the total value and format it to two decimal places.
The total for some engines comes from calculations and not extraction from text so,
it can be instead of 19.93, 19.9299867. That's why we do rounding in 3rd and keep the 2 decimals
"""
pattern = r'\d+\.?\d*'
match = re.search(pattern, str(s))
if not match:
return s # Return the original string if no numeric part is found
num = float(match.group(0))
rounded_num = round(num, 3)
formatted_num = f"{rounded_num:.2f}"
return formatted_num
@staticmethod
def convert_to_ddmmyyyy(date_str):
try:
date_obj = parser.parse(date_str, dayfirst=False) # This will try to infer the format
return date_obj.strftime('%d/%m/%Y')
except ValueError:
return date_str
def get_similarity_results(self, y_hat: dict, y: dict, threshold: int = 70):
"""Compare two dictionaries and calculate similarity scores."""
results = {}
for key in y_hat.keys():
normalized_hat = self._normalize_string(y_hat[key])
normalized_y = self._normalize_string(y[key])
# Apply numeric cleaning for the "total" attribute
if key == "total":
normalized_hat = self._clean_numeric_value(normalized_hat)
normalized_y = self._clean_numeric_value(normalized_y)
if key == "date":
normalized_hat = self.convert_to_ddmmyyyy(normalized_hat)
normalized_y = self.convert_to_ddmmyyyy(normalized_y)
similarity = fuzz.partial_ratio(normalized_hat, normalized_y)
results[key] = {
'Y_hat': y_hat[key],
'Y': y[key],
'similarity': similarity,
'match': similarity >= threshold
}
return results
def save_results(self):
"""Save the results to a JSON file."""
with open(self.output_metrics_filename, 'w') as f:
json.dump(self.results, f, indent=4, sort_keys=True)
def calculate_metrics(self, save_to_folder: bool = False, eval_results_folder: str | None = None):
data = self.results
# Initialize a dynamic metrics dictionary
metrics = {}
# Calculate metrics for each attribute
for file_attrs in data.values():
for attribute, details in file_attrs.items():
if attribute not in metrics:
metrics[attribute] = {"correct": 0, "total": 0, "similarity_sum": 0}
metrics[attribute]["total"] += 1
metrics[attribute]["similarity_sum"] += details["similarity"]
if details["match"]:
metrics[attribute]["correct"] += 1
# Calculate accuracy and average similarity for each attribute
field_metrics = []
for attribute, counts in metrics.items():
accuracy = counts["correct"] / counts["total"] * 100
avg_similarity = counts["similarity_sum"] / counts["total"]
field_metrics.append({
"attribute": attribute,
"accuracy": accuracy,
"avg_similarity": avg_similarity
})
# Create the field metrics DataFrame
field_metrics_df = pd.DataFrame(field_metrics)
# Create a DataFrame to include the similarity values and correctness check
results = []
for filename, attributes in data.items():
row = {"filename": filename}
passed = True
failed_attributes = []
for attribute, details in attributes.items():
similarity_score = details["similarity"]
match = details["match"]
row[attribute + "_similarity"] = similarity_score
if not match:
passed = False
failed_attributes.append(attribute)
row["passed"] = passed
row["failed_attributes"] = ", ".join(failed_attributes) if failed_attributes else "None"
results.append(row)
# Create the results DataFrame
file_metrics_df = pd.DataFrame(results)
# Save DataFrames to a folder if required
if save_to_folder:
base_name = os.path.splitext(os.path.basename(self.predicted_json_filepath))[0]
directory = eval_results_folder if eval_results_folder else f"{base_name}_eval_results"
if not os.path.exists(directory):
os.makedirs(directory)
field_metrics_df.to_csv(os.path.join(directory, f"{base_name}_field_metrics.csv"), index=False)
file_metrics_df.to_csv(os.path.join(directory, f"{base_name}_file_metrics.csv"), index=False)
with open(os.path.join(directory, f'{base_name}_YnY_hat.json'), 'w') as outfile:
outfile.write(json.dumps(self.results))
print(f"Saved {base_name} evaluation results in {directory}")
return field_metrics_df, file_metrics_df
def evaluate_predictions(self, filenames_to_skip: list[str] = []):
"""Process the data and store in results."""
for filename, data_dict in self.YandY_hat_dicts.items():
if filename in filenames_to_skip:
continue
y_hat_dict = data_dict['Y_hat']
y = data_dict['Y']
self.results[filename] = self.get_similarity_results(y_hat_dict, y)
field_df, file_df = self.calculate_metrics(self.save_metrics_in_folder, self.output_metrics_filename)
return self.results, field_df, file_df
def evaluate_predictions_for_list(
predictions_json_filenames: list[str] | str, filenames_to_skip: list[str] = [],
dataset_json_filepath: str = 'data/ground_truth/sroie_ground_truth.json',
save_metrics_in_folder: bool = True):
outputs = {}
if isinstance(predictions_json_filenames, str):
predictions_json_filenames = [predictions_json_filenames]
for predictions_json_filename in predictions_json_filenames:
analyzer = Evaluator(
predictions_json_filename,
dataset_json_filepath,
save_metrics_in_folder=save_metrics_in_folder,
)
outputs[os.path.basename(predictions_json_filename)] = analyzer.evaluate_predictions(filenames_to_skip=filenames_to_skip)
return outputs
if __name__ == '__main__':
# predictions_folder = r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\data\The4EnginesSroieMetrics\Predictions'
# evaluate_predictions_for_list(os.path.join(predictions_folder, 'microsoft.json'))
# faulty_filenames = ['X51006414677.jpg', 'X51006414719.jpg', 'X51006913073.jpg', 'X51005763940.jpg']
# evaluate_predictions_for_list(r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\data\The4EnginesSroieMetrics\Predictions\microsoft.json')
analyzer = Evaluator(
r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\Qwen2vl\qwen2_vl2b_results.json',
r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\data\sroie_task2train.json',
save_metrics_in_folder=True
)
# faulty_filenames = ['X51006414677.jpg', 'X51006414719.jpg', 'X51006913073.jpg', 'X51005763940.jpg']
analyzer.evaluate_predictions()