import os import re import json import pandas as pd from dateutil import parser from fuzzywuzzy import fuzz # make sure you have fuzzywuzzy installed class Evaluator: def __init__(self, predicted_json_filepath: str, dataset_json_filepath: str, output_metrics_filename: str | None = None, save_metrics_in_folder: bool = False): self.predicted_json_filepath = predicted_json_filepath self.dataset_json_filepath = dataset_json_filepath self.output_metrics_filename = output_metrics_filename self.save_metrics_in_folder = save_metrics_in_folder self.results = {} self.YandY_hat_dicts = self.load_and_combine_data() def load_and_combine_data(self) -> dict: with open(self.dataset_json_filepath, 'r') as f: ground_truth_dict = json.load(f) with open(self.predicted_json_filepath, 'r') as f: predicted_json = json.load(f) return {filename: {'Y_hat': predicted_json[filename], 'Y': ground_truth_dict[filename]} for filename in predicted_json.keys() if filename in ground_truth_dict.keys()} @staticmethod def _normalize_string(s: str) -> str: """Convert to lowercase and remove extra spaces.""" if not isinstance(s, str): s = str(s) return ''.join(e for e in s.lower().strip()) @staticmethod def _clean_numeric_value(s: str) -> str: """ Extract the numeric part of the total value and format it to two decimal places. The total for some engines comes from calculations and not extraction from text so, it can be instead of 19.93, 19.9299867. That's why we do rounding in 3rd and keep the 2 decimals """ pattern = r'\d+\.?\d*' match = re.search(pattern, str(s)) if not match: return s # Return the original string if no numeric part is found num = float(match.group(0)) rounded_num = round(num, 3) formatted_num = f"{rounded_num:.2f}" return formatted_num @staticmethod def convert_to_ddmmyyyy(date_str): try: date_obj = parser.parse(date_str, dayfirst=False) # This will try to infer the format return date_obj.strftime('%d/%m/%Y') except ValueError: return date_str def get_similarity_results(self, y_hat: dict, y: dict, threshold: int = 70): """Compare two dictionaries and calculate similarity scores.""" results = {} for key in y_hat.keys(): normalized_hat = self._normalize_string(y_hat[key]) normalized_y = self._normalize_string(y[key]) # Apply numeric cleaning for the "total" attribute if key == "total": normalized_hat = self._clean_numeric_value(normalized_hat) normalized_y = self._clean_numeric_value(normalized_y) if key == "date": normalized_hat = self.convert_to_ddmmyyyy(normalized_hat) normalized_y = self.convert_to_ddmmyyyy(normalized_y) similarity = fuzz.partial_ratio(normalized_hat, normalized_y) results[key] = { 'Y_hat': y_hat[key], 'Y': y[key], 'similarity': similarity, 'match': similarity >= threshold } return results def save_results(self): """Save the results to a JSON file.""" with open(self.output_metrics_filename, 'w') as f: json.dump(self.results, f, indent=4, sort_keys=True) def calculate_metrics(self, save_to_folder: bool = False, eval_results_folder: str | None = None): data = self.results # Initialize a dynamic metrics dictionary metrics = {} # Calculate metrics for each attribute for file_attrs in data.values(): for attribute, details in file_attrs.items(): if attribute not in metrics: metrics[attribute] = {"correct": 0, "total": 0, "similarity_sum": 0} metrics[attribute]["total"] += 1 metrics[attribute]["similarity_sum"] += details["similarity"] if details["match"]: metrics[attribute]["correct"] += 1 # Calculate accuracy and average similarity for each attribute field_metrics = [] for attribute, counts in metrics.items(): accuracy = counts["correct"] / counts["total"] * 100 avg_similarity = counts["similarity_sum"] / counts["total"] field_metrics.append({ "attribute": attribute, "accuracy": accuracy, "avg_similarity": avg_similarity }) # Create the field metrics DataFrame field_metrics_df = pd.DataFrame(field_metrics) # Create a DataFrame to include the similarity values and correctness check results = [] for filename, attributes in data.items(): row = {"filename": filename} passed = True failed_attributes = [] for attribute, details in attributes.items(): similarity_score = details["similarity"] match = details["match"] row[attribute + "_similarity"] = similarity_score if not match: passed = False failed_attributes.append(attribute) row["passed"] = passed row["failed_attributes"] = ", ".join(failed_attributes) if failed_attributes else "None" results.append(row) # Create the results DataFrame file_metrics_df = pd.DataFrame(results) # Save DataFrames to a folder if required if save_to_folder: base_name = os.path.splitext(os.path.basename(self.predicted_json_filepath))[0] directory = eval_results_folder if eval_results_folder else f"{base_name}_eval_results" if not os.path.exists(directory): os.makedirs(directory) field_metrics_df.to_csv(os.path.join(directory, f"{base_name}_field_metrics.csv"), index=False) file_metrics_df.to_csv(os.path.join(directory, f"{base_name}_file_metrics.csv"), index=False) with open(os.path.join(directory, f'{base_name}_YnY_hat.json'), 'w') as outfile: outfile.write(json.dumps(self.results)) print(f"Saved {base_name} evaluation results in {directory}") return field_metrics_df, file_metrics_df def evaluate_predictions(self, filenames_to_skip: list[str] = []): """Process the data and store in results.""" for filename, data_dict in self.YandY_hat_dicts.items(): if filename in filenames_to_skip: continue y_hat_dict = data_dict['Y_hat'] y = data_dict['Y'] self.results[filename] = self.get_similarity_results(y_hat_dict, y) field_df, file_df = self.calculate_metrics(self.save_metrics_in_folder, self.output_metrics_filename) return self.results, field_df, file_df def evaluate_predictions_for_list( predictions_json_filenames: list[str] | str, filenames_to_skip: list[str] = [], dataset_json_filepath: str = 'data/ground_truth/sroie_ground_truth.json', save_metrics_in_folder: bool = True): outputs = {} if isinstance(predictions_json_filenames, str): predictions_json_filenames = [predictions_json_filenames] for predictions_json_filename in predictions_json_filenames: analyzer = Evaluator( predictions_json_filename, dataset_json_filepath, save_metrics_in_folder=save_metrics_in_folder, ) outputs[os.path.basename(predictions_json_filename)] = analyzer.evaluate_predictions(filenames_to_skip=filenames_to_skip) return outputs if __name__ == '__main__': # predictions_folder = r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\data\The4EnginesSroieMetrics\Predictions' # evaluate_predictions_for_list(os.path.join(predictions_folder, 'microsoft.json')) # faulty_filenames = ['X51006414677.jpg', 'X51006414719.jpg', 'X51006913073.jpg', 'X51005763940.jpg'] # evaluate_predictions_for_list(r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\data\The4EnginesSroieMetrics\Predictions\microsoft.json') analyzer = Evaluator( r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\Qwen2vl\qwen2_vl2b_results.json', r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\data\sroie_task2train.json', save_metrics_in_folder=True ) # faulty_filenames = ['X51006414677.jpg', 'X51006414719.jpg', 'X51006913073.jpg', 'X51005763940.jpg'] analyzer.evaluate_predictions()