File size: 8,834 Bytes
47543ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64e8a72
47543ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import os
import re
import json
import pandas as pd
from dateutil import parser

from fuzzywuzzy import fuzz  # make sure you have fuzzywuzzy installed


class Evaluator:
    def __init__(self, predicted_json_filepath: str, dataset_json_filepath: str,
                 output_metrics_filename: str | None = None,
                 save_metrics_in_folder: bool = False):
        self.predicted_json_filepath = predicted_json_filepath
        self.dataset_json_filepath = dataset_json_filepath
        self.output_metrics_filename = output_metrics_filename
        self.save_metrics_in_folder = save_metrics_in_folder
        self.results = {}
        self.YandY_hat_dicts = self.load_and_combine_data()

    def load_and_combine_data(self) -> dict:
        with open(self.dataset_json_filepath, 'r') as f:
            ground_truth_dict = json.load(f)

        with open(self.predicted_json_filepath, 'r') as f:
            predicted_json = json.load(f)

        return {filename: {'Y_hat': predicted_json[filename], 'Y': ground_truth_dict[filename]} for filename in
                predicted_json.keys() if filename in ground_truth_dict.keys()}

    @staticmethod
    def _normalize_string(s: str) -> str:
        """Convert to lowercase and remove extra spaces."""
        if not isinstance(s, str):
            s = str(s)
        return ''.join(e for e in s.lower().strip())

    @staticmethod
    def _clean_numeric_value(s: str) -> str:
        """
        Extract the numeric part of the total value and format it to two decimal places.
        The total for some engines comes from calculations and not extraction from text so,
            it can be instead of 19.93, 19.9299867. That's why we do rounding in 3rd and keep the 2 decimals
        """
        pattern = r'\d+\.?\d*'
        match = re.search(pattern, str(s))
        if not match:
            return s  # Return the original string if no numeric part is found

        num = float(match.group(0))
        rounded_num = round(num, 3)
        formatted_num = f"{rounded_num:.2f}"
        return formatted_num

    @staticmethod
    def convert_to_ddmmyyyy(date_str):
        try:
            date_obj = parser.parse(date_str, dayfirst=False)  # This will try to infer the format
            return date_obj.strftime('%d/%m/%Y')
        except ValueError:
            return date_str

    def get_similarity_results(self, y_hat: dict, y: dict, threshold: int = 70):
        """Compare two dictionaries and calculate similarity scores."""
        results = {}
        for key in y_hat.keys():
            normalized_hat = self._normalize_string(y_hat[key])
            normalized_y = self._normalize_string(y[key])

            # Apply numeric cleaning for the "total" attribute
            if key == "total":
                normalized_hat = self._clean_numeric_value(normalized_hat)
                normalized_y = self._clean_numeric_value(normalized_y)

            if key == "date":
                normalized_hat = self.convert_to_ddmmyyyy(normalized_hat)
                normalized_y = self.convert_to_ddmmyyyy(normalized_y)

            similarity = fuzz.partial_ratio(normalized_hat, normalized_y)
            results[key] = {
                'Y_hat': y_hat[key],
                'Y': y[key],
                'similarity': similarity,
                'match': similarity >= threshold
            }
        return results

    def save_results(self):
        """Save the results to a JSON file."""
        with open(self.output_metrics_filename, 'w') as f:
            json.dump(self.results, f, indent=4, sort_keys=True)

    def calculate_metrics(self, save_to_folder: bool = False, eval_results_folder: str | None = None):

        data = self.results

        # Initialize a dynamic metrics dictionary
        metrics = {}

        # Calculate metrics for each attribute
        for file_attrs in data.values():
            for attribute, details in file_attrs.items():
                if attribute not in metrics:
                    metrics[attribute] = {"correct": 0, "total": 0, "similarity_sum": 0}

                metrics[attribute]["total"] += 1
                metrics[attribute]["similarity_sum"] += details["similarity"]
                if details["match"]:
                    metrics[attribute]["correct"] += 1

        # Calculate accuracy and average similarity for each attribute
        field_metrics = []
        for attribute, counts in metrics.items():
            accuracy = counts["correct"] / counts["total"] * 100
            avg_similarity = counts["similarity_sum"] / counts["total"]

            field_metrics.append({
                "attribute": attribute,
                "accuracy": accuracy,
                "avg_similarity": avg_similarity
            })

        # Create the field metrics DataFrame
        field_metrics_df = pd.DataFrame(field_metrics)

        # Create a DataFrame to include the similarity values and correctness check
        results = []
        for filename, attributes in data.items():
            row = {"filename": filename}
            passed = True
            failed_attributes = []

            for attribute, details in attributes.items():
                similarity_score = details["similarity"]
                match = details["match"]
                row[attribute + "_similarity"] = similarity_score
                if not match:
                    passed = False
                    failed_attributes.append(attribute)

            row["passed"] = passed
            row["failed_attributes"] = ", ".join(failed_attributes) if failed_attributes else "None"

            results.append(row)

        # Create the results DataFrame
        file_metrics_df = pd.DataFrame(results)

        # Save DataFrames to a folder if required
        if save_to_folder:
            base_name = os.path.splitext(os.path.basename(self.predicted_json_filepath))[0]
            directory = eval_results_folder if eval_results_folder else f"{base_name}_eval_results"

            if not os.path.exists(directory):
                os.makedirs(directory)

            field_metrics_df.to_csv(os.path.join(directory, f"{base_name}_field_metrics.csv"), index=False)
            file_metrics_df.to_csv(os.path.join(directory, f"{base_name}_file_metrics.csv"), index=False)
            with open(os.path.join(directory, f'{base_name}_YnY_hat.json'), 'w') as outfile:
                outfile.write(json.dumps(self.results))

            print(f"Saved {base_name} evaluation results in {directory}")

        return field_metrics_df, file_metrics_df

    def evaluate_predictions(self, filenames_to_skip: list[str] = []):
        """Process the data and store in results."""
        for filename, data_dict in self.YandY_hat_dicts.items():
            if filename in filenames_to_skip:
                continue
            y_hat_dict = data_dict['Y_hat']
            y = data_dict['Y']
            self.results[filename] = self.get_similarity_results(y_hat_dict, y)
        field_df, file_df = self.calculate_metrics(self.save_metrics_in_folder, self.output_metrics_filename)
        return self.results, field_df, file_df


def evaluate_predictions_for_list(
        predictions_json_filenames: list[str] | str, filenames_to_skip: list[str] = [],
        dataset_json_filepath: str = 'data/ground_truth/sroie_ground_truth.json',
        save_metrics_in_folder: bool = True):
    outputs = {}

    if isinstance(predictions_json_filenames, str):
        predictions_json_filenames = [predictions_json_filenames]
    for predictions_json_filename in predictions_json_filenames:
        analyzer = Evaluator(
            predictions_json_filename,
            dataset_json_filepath,
            save_metrics_in_folder=save_metrics_in_folder,
        )
        outputs[os.path.basename(predictions_json_filename)] = analyzer.evaluate_predictions(filenames_to_skip=filenames_to_skip)
    return outputs


if __name__ == '__main__':
    # predictions_folder = r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\data\The4EnginesSroieMetrics\Predictions'
    # evaluate_predictions_for_list(os.path.join(predictions_folder, 'microsoft.json'))
    # faulty_filenames = ['X51006414677.jpg', 'X51006414719.jpg', 'X51006913073.jpg', 'X51005763940.jpg']
    # evaluate_predictions_for_list(r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\data\The4EnginesSroieMetrics\Predictions\microsoft.json')

    analyzer = Evaluator(
        r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\Products\Qwen2vl\qwen2_vl2b_results.json',
        r'C:\Users\kosti\OneDrive - Image Access Corp\ml-rnd\data\sroie_task2train.json',
        save_metrics_in_folder=True
    )
    # faulty_filenames = ['X51006414677.jpg', 'X51006414719.jpg', 'X51006913073.jpg', 'X51005763940.jpg']

    analyzer.evaluate_predictions()