File size: 8,834 Bytes
47543ac 64e8a72 47543ac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
import os
import re
import json
import pandas as pd
from dateutil import parser
from fuzzywuzzy import fuzz # make sure you have fuzzywuzzy installed
class Evaluator:
def __init__(self, predicted_json_filepath: str, dataset_json_filepath: str,
output_metrics_filename: str | None = None,
save_metrics_in_folder: bool = False):
self.predicted_json_filepath = predicted_json_filepath
self.dataset_json_filepath = dataset_json_filepath
self.output_metrics_filename = output_metrics_filename
self.save_metrics_in_folder = save_metrics_in_folder
self.results = {}
self.YandY_hat_dicts = self.load_and_combine_data()
def load_and_combine_data(self) -> dict:
with open(self.dataset_json_filepath, 'r') as f:
ground_truth_dict = json.load(f)
with open(self.predicted_json_filepath, 'r') as f:
predicted_json = json.load(f)
return {filename: {'Y_hat': predicted_json[filename], 'Y': ground_truth_dict[filename]} for filename in
predicted_json.keys() if filename in ground_truth_dict.keys()}
@staticmethod
def _normalize_string(s: str) -> str:
"""Convert to lowercase and remove extra spaces."""
if not isinstance(s, str):
s = str(s)
return ''.join(e for e in s.lower().strip())
@staticmethod
def _clean_numeric_value(s: str) -> str:
"""
Extract the numeric part of the total value and format it to two decimal places.
The total for some engines comes from calculations and not extraction from text so,
it can be instead of 19.93, 19.9299867. That's why we do rounding in 3rd and keep the 2 decimals
"""
pattern = r'\d+\.?\d*'
match = re.search(pattern, str(s))
if not match:
return s # Return the original string if no numeric part is found
num = float(match.group(0))
rounded_num = round(num, 3)
formatted_num = f"{rounded_num:.2f}"
return formatted_num
@staticmethod
def convert_to_ddmmyyyy(date_str):
try:
date_obj = parser.parse(date_str, dayfirst=False) # This will try to infer the format
return date_obj.strftime('%d/%m/%Y')
except ValueError:
return date_str
def get_similarity_results(self, y_hat: dict, y: dict, threshold: int = 70):
"""Compare two dictionaries and calculate similarity scores."""
results = {}
for key in y_hat.keys():
normalized_hat = self._normalize_string(y_hat[key])
normalized_y = self._normalize_string(y[key])
# Apply numeric cleaning for the "total" attribute
if key == "total":
normalized_hat = self._clean_numeric_value(normalized_hat)
normalized_y = self._clean_numeric_value(normalized_y)
if key == "date":
normalized_hat = self.convert_to_ddmmyyyy(normalized_hat)
normalized_y = self.convert_to_ddmmyyyy(normalized_y)
similarity = fuzz.partial_ratio(normalized_hat, normalized_y)
results[key] = {
'Y_hat': y_hat[key],
'Y': y[key],
'similarity': similarity,
'match': similarity >= threshold
}
return results
def save_results(self):
"""Save the results to a JSON file."""
with open(self.output_metrics_filename, 'w') as f:
json.dump(self.results, f, indent=4, sort_keys=True)
def calculate_metrics(self, save_to_folder: bool = False, eval_results_folder: str | None = None):
data = self.results
# Initialize a dynamic metrics dictionary
metrics = {}
# Calculate metrics for each attribute
for file_attrs in data.values():
for attribute, details in file_attrs.items():
if attribute not in metrics:
metrics[attribute] = {"correct": 0, "total": 0, "similarity_sum": 0}
metrics[attribute]["total"] += 1
metrics[attribute]["similarity_sum"] += details["similarity"]
if details["match"]:
metrics[attribute]["correct"] += 1
# Calculate accuracy and average similarity for each attribute
field_metrics = []
for attribute, counts in metrics.items():
accuracy = counts["correct"] / counts["total"] * 100
avg_similarity = counts["similarity_sum"] / counts["total"]
field_metrics.append({
"attribute": attribute,
"accuracy": accuracy,
"avg_similarity": avg_similarity
})
# Create the field metrics DataFrame
field_metrics_df = pd.DataFrame(field_metrics)
# Create a DataFrame to include the similarity values and correctness check
results = []
for filename, attributes in data.items():
row = {"filename": filename}
passed = True
failed_attributes = []
for attribute, details in attributes.items():
similarity_score = details["similarity"]
match = details["match"]
row[attribute + "_similarity"] = similarity_score
if not match:
passed = False
failed_attributes.append(attribute)
row["passed"] = passed
row["failed_attributes"] = ", ".join(failed_attributes) if failed_attributes else "None"
results.append(row)
# Create the results DataFrame
file_metrics_df = pd.DataFrame(results)
# Save DataFrames to a folder if required
if save_to_folder:
base_name = os.path.splitext(os.path.basename(self.predicted_json_filepath))[0]
directory = eval_results_folder if eval_results_folder else f"{base_name}_eval_results"
if not os.path.exists(directory):
os.makedirs(directory)
field_metrics_df.to_csv(os.path.join(directory, f"{base_name}_field_metrics.csv"), index=False)
file_metrics_df.to_csv(os.path.join(directory, f"{base_name}_file_metrics.csv"), index=False)
with open(os.path.join(directory, f'{base_name}_YnY_hat.json'), 'w') as outfile:
outfile.write(json.dumps(self.results))
print(f"Saved {base_name} evaluation results in {directory}")
return field_metrics_df, file_metrics_df
def evaluate_predictions(self, filenames_to_skip: list[str] = []):
"""Process the data and store in results."""
for filename, data_dict in self.YandY_hat_dicts.items():
if filename in filenames_to_skip:
continue
y_hat_dict = data_dict['Y_hat']
y = data_dict['Y']
self.results[filename] = self.get_similarity_results(y_hat_dict, y)
field_df, file_df = self.calculate_metrics(self.save_metrics_in_folder, self.output_metrics_filename)
return self.results, field_df, file_df
def evaluate_predictions_for_list(
predictions_json_filenames: list[str] | str, filenames_to_skip: list[str] = [],
dataset_json_filepath: str = 'data/ground_truth/sroie_ground_truth.json',
save_metrics_in_folder: bool = True):
outputs = {}
if isinstance(predictions_json_filenames, str):
predictions_json_filenames = [predictions_json_filenames]
for predictions_json_filename in predictions_json_filenames:
analyzer = Evaluator(
predictions_json_filename,
dataset_json_filepath,
save_metrics_in_folder=save_metrics_in_folder,
)
outputs[os.path.basename(predictions_json_filename)] = analyzer.evaluate_predictions(filenames_to_skip=filenames_to_skip)
return outputs
if __name__ == '__main__':
# predictions_folder = r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\data\The4EnginesSroieMetrics\Predictions'
# evaluate_predictions_for_list(os.path.join(predictions_folder, 'microsoft.json'))
# faulty_filenames = ['X51006414677.jpg', 'X51006414719.jpg', 'X51006913073.jpg', 'X51005763940.jpg']
# evaluate_predictions_for_list(r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\data\The4EnginesSroieMetrics\Predictions\microsoft.json')
analyzer = Evaluator(
r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\Qwen2vl\qwen2_vl2b_results.json',
r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\data\sroie_task2train.json',
save_metrics_in_folder=True
)
# faulty_filenames = ['X51006414677.jpg', 'X51006414719.jpg', 'X51006913073.jpg', 'X51005763940.jpg']
analyzer.evaluate_predictions()
|