import json from shapely.geometry import Polygon, Point from shapely.ops import unary_union import matplotlib.pyplot as plt from matplotlib.patches import Polygon as MplPolygon import numpy as np import pandas as pd def add_extreme_coordinates(polygon_data): polygon_coords = np.array(polygon_data["geometry"]["coordinates"][0]) polygon_data["geometry"]["max_lat"] = max(polygon_coords[:, 1]) polygon_data["geometry"]["min_lat"] = min(polygon_coords[:, 1]) polygon_data["geometry"]["max_lon"] = max(polygon_coords[:, 0]) polygon_data["geometry"]["min_lon"] = min(polygon_coords[:, 0]) def turn_into_dataframe(data): data_list = data["features"] for i in range(len(data_list)): add_extreme_coordinates(data_list[i]) df = pd.DataFrame(data_list).drop(columns="type") dict_cols = ["properties", "geometry"] for dict_col in dict_cols: dict_df = pd.json_normalize(df[dict_col]) # Merge the new columns back into the original DataFrame df = df.drop(columns=[dict_col]).join(dict_df) df["coordinates"] = df["coordinates"].apply(lambda x: x[0]) df["polygon"] = df["coordinates"].apply(lambda x: Polygon(x)) df = df.drop(columns=["type"]) return df # Function to plot a polygon def plot_polygon(ax, polygon, color, label="label"): if not polygon.is_empty: x, y = polygon.exterior.xy ax.fill(x, y, color=color, alpha=0.5, label=label) def plot_polygons(list_polygons, first_one_different=False, dpi=150): # Plot the polygons and their intersection plt.figure(dpi=dpi) fig, ax = plt.subplots() if first_one_different: plot_polygon(ax, list_polygons[0], "red", f"polygon {0}") for i, polygon in enumerate(list_polygons[1:]): plot_polygon(ax, polygon, "blue", f"polygon {i}") else: for i, polygon in enumerate(list_polygons): plot_polygon(ax, polygon, "blue", f"polygon {i}") # Plot the intersection # plot_polygon(ax, intersection, 'red', 'Intersection') # Add legend # ax.legend() # Set axis limits ax.set_aspect("equal") # Set title ax.set_title("Polygons and their Intersection") plt.ylabel("lat") plt.xlabel("lon") plt.show() def plot_polygons_with_colors(list_polygons, list_colors, dpi=150): # Plot the polygons and their intersection plt.figure(dpi=dpi) fig, ax = plt.subplots() for polygon, color in zip(list_polygons, list_colors): plot_polygon(ax, polygon, color) # Set axis limits ax.set_aspect("equal") # Set title ax.set_title("Polygons and their Intersection") plt.ylabel("lat") plt.xlabel("lon") plt.show() def plot_polygons_from_df(df, dpi=150): list_polygons = [] for index, row in df.iterrows(): list_polygons.append(row["polygon"]) plot_polygons(list_polygons=list_polygons, dpi=dpi) def map_color(id): return "blue" def plot_polygons_from_df_with_color(df, dpi=150): df["plot_colors"] = df["id"].apply(map_color) list_polygons = [] list_colors = [] for index, row in df.iterrows(): list_polygons.append(row["polygon"]) list_colors.append(row["plot_colors"]) plot_polygons_with_colors( list_polygons=list_polygons, list_colors=list_colors, dpi=dpi ) def intersection(polygon, polygon_comparison): return polygon.intersection(polygon_comparison) def intersection_area(polygon, polygon_comparison): return intersection(polygon, polygon_comparison).area def intersection_area_ratio(polygon, polygon_comparison): return intersection_area(polygon, polygon_comparison) / polygon.area def containsPoint(polygonB, polygon): coordinatesB = get_coordinates(polygonB) for coord in coordinatesB: coord = Point(coord) if polygon.contains(coord): return True else: return False def get_coordinates(polygon): coordinates = polygon.exterior.coords coordinates = [list(pair) for pair in coordinates] return coordinates def mark_id_to_be_dropped(df, id_string): df.loc[df['id']== id_string , 'to_drop'] = True def mark_id_to_be_merged(df, id_string): df.loc[df['id']== id_string , 'to_merge'] = True def calc_overlapping_subset(df_input, index): max_lat = df_input.iloc[index]['max_lat'] min_lat = df_input.iloc[index]['min_lat'] max_lon = df_input.iloc[index]['max_lon'] min_lon = df_input.iloc[index]['min_lon'] relevant_subset = df_input.loc[( (( ((max_lat < df_input['max_lat']) & (max_lat > df_input['min_lat'])) | \ ((min_lat < df_input['max_lat']) & (min_lat > df_input['min_lat'])) )| \ ( ((df_input['max_lat'] < max_lat) & (df_input['max_lat'] > min_lat)) | \ ((df_input['min_lat'] > min_lat ) & ( df_input['min_lat'] < max_lat)) ) ) & \ (( ( ((max_lon < df_input['max_lon']) & (max_lon > df_input['min_lon'])) | \ ((min_lon < df_input['max_lon']) & (min_lon > df_input['min_lon'])) ) ) | ( ((df_input['max_lon'] < max_lon ) & (df_input['max_lon'] > min_lon)) | \ ((df_input['min_lon'] > min_lon) & (df_input['min_lon'] < max_lon)) ) ) )] return relevant_subset def remove_contained_poylgons(df_input): df_result = df_input.copy() for i in range (len(df_result)): polygonA = df_input.iloc[i]['polygon'] #relevant_subset = df_result[df_result['polygon'].apply(lambda polygonB: containsPoint(polygonA, polygonB))] #relevant_subset = relevant_subset[relevant_subset['id'] != df_input.iloc[i]['id']] relevant_subset = calc_overlapping_subset(df_input = df_result, index = i) # Experiment with this parameter to find the best threshold # It certainly has to be smaller than 0.9 threshold = 0.85 for j in range(len(relevant_subset)): ratio_current_choice = intersection_area_ratio(polygon = polygonA, polygon_comparison = relevant_subset.iloc[j]['polygon']) ratio_alternative_choice = intersection_area_ratio(polygon = relevant_subset.iloc[j]['polygon'], polygon_comparison= polygonA) if (ratio_current_choice > threshold) or (ratio_alternative_choice > threshold): # or ratio_alternative_choice > threashold: if polygonA.area > relevant_subset.iloc[j]['polygon'].area: mark_id_to_be_dropped(df=df_result, id_string = relevant_subset.iloc[j]['id']) else: mark_id_to_be_dropped(df=df_result, id_string = df_input.iloc[i]['id']) #remove all polygons that had a marked id df_result = df_result.loc[df_result["to_drop"] == False] return df_result def merge(df_input, polygon_index, merge_subset): for j in range(len(merge_subset)): #merge merged_polygon with j-th polygon in merge_subset #delete j_th polygon in merge_subset from df_input merged_polygon = df_input.iloc[polygon_index] merged_polygon_id = df_input.iloc[polygon_index]['id'] merged_polygon_index = merged_polygon.index #change by merge --> polygon, coordinates, min/max long lat, score (use max or min or avg) tmp = merged_polygon['polygon'].union(merge_subset.iloc[j]['polygon']) merged_coordinates = list(tmp.exterior.coords) merged_polygon = Polygon(merged_coordinates) #new polygon coordinates = [list(tup) for tup in merged_coordinates] #new coordinates #updating min/max long/lat min_lon = min([point[0] for point in coordinates]) max_lon = max([point[0] for point in coordinates]) min_lat = min([point[1] for point in coordinates]) max_lat = max([point[1] for point in coordinates]) polygon_score = merge_subset.iloc[j]['Confidence_score'] #updating merged polygon df_input.loc[df_input['id'] == merged_polygon_id,'polygon'] = merged_polygon df_input.loc[df_input['id'] == merged_polygon_id,'min_lon'] = min_lon df_input.loc[df_input['id'] == merged_polygon_id,'max_lon'] = max_lon df_input.loc[df_input['id'] == merged_polygon_id,'min_lat'] = min_lat df_input.loc[df_input['id'] == merged_polygon_id,'max_lat'] = max_lat df_input.loc[df_input['id'] == merged_polygon_id,'Confidence_score'] = (df_input.iloc[polygon_index]['Confidence_score'] + polygon_score)/2 df_input.loc[df_input['id'] == merged_polygon_id, 'coordinates'] = df_input.loc[df_input['id'] == merged_polygon_id, 'polygon'].apply(get_coordinates) df_input = df_input.loc[df_input['id'] != merge_subset.iloc[j]['id']] return df_input def merge_overlapping(df_input): # Experiment with this parameter to get the best results threshold = 0.40 #df_result = df_input.copy() for i in range(len(df_input)): polygon = df_input.iloc[i]['polygon'] relevant_subset = calc_overlapping_subset(df_input=df_input, index=i) toBeMerged = False for j in range(len(relevant_subset)): ratio_current_choice = intersection_area_ratio(polygon = polygon, polygon_comparison = relevant_subset.iloc[j]['polygon']) ratio_alternative_choice = intersection_area_ratio(polygon = relevant_subset.iloc[j]['polygon'], polygon_comparison= polygon) if (ratio_current_choice > threshold) or (ratio_alternative_choice > threshold): toBeMerged = True mark_id_to_be_merged(df=relevant_subset, id_string = relevant_subset.iloc[j]['id']) if toBeMerged: # deleting is handled in this funciton as well df_input = merge(df_input=df_input, polygon_index=i, merge_subset=relevant_subset[relevant_subset['to_merge']==True]) return True, df_input return False, df_input def process(list_df): df_res = pd.concat(list_df) df_res = remove_contained_poylgons(df_input= df_res) i = 0 merged, df_res = merge_overlapping(df_input=df_res) while(merged): i+=1 if i%100 == 0: print(i) merged, df_res = merge_overlapping(df_input=df_res) return df_res def combine_different_tile_size(df_smaller, df_bigger): df_result = df_bigger.copy() for i in range(len(df_smaller)): max_lat = df_smaller.iloc[i]["max_lat"] min_lat = df_smaller.iloc[i]["min_lat"] max_lon = df_smaller.iloc[i]["max_lon"] min_lon = df_smaller.iloc[i]["min_lon"] polygon = df_smaller.iloc[i]["polygon"] relevant_subset = df_bigger.loc[ ( ((max_lat < df_bigger["max_lat"]) & (max_lat > df_bigger["min_lat"])) | ((min_lat < df_bigger["max_lat"]) & (min_lat > df_bigger["min_lat"])) ) & ( ((max_lon < df_bigger["max_lon"]) & (max_lon > df_bigger["min_lon"])) | ((min_lon < df_bigger["max_lon"]) & (min_lon > df_bigger["min_lon"])) ) ] list_polygons = [polygon] for index, row in relevant_subset.iterrows(): list_polygons.append(row["polygon"]) add_polygon = True threashold = 0.15 for comparison_polygon in list_polygons[1:]: ratio = intersection_area_ratio(polygon, comparison_polygon) if ratio > threashold: add_polygon = False if add_polygon: # df_result = pd.concat([df_result, df_result.iloc[[i]]], axis= 1, ignore_index=True)#df_result.append(df_result.iloc[i], ignore_index=True) df_result = pd.concat( [df_result, df_smaller.iloc[[i]]], axis=0, join="outer" ) # return df_result def clean(df, score_threashold=0.5): df = df.loc[df["score"] > score_threashold] return df def row_to_feature(row): feature = { "id": row["id"], "type": "Feature", "properties": {"Confidence_score": row["Confidence_score"]}, "geometry": {"type": "Polygon", "coordinates": [row["coordinates"]]}, } return feature def export_df_as_geojson(df, filename): features = [row_to_feature(row) for idx, row in df.iterrows()] feature_collection = { "type": "FeatureCollection", "crs": {"type": "name", "properties": {"name": "urn:ogc:def:crs:EPSG::32720"}}, "features": features, } output_geojson = json.dumps(feature_collection) with open(f"{filename}", "w") as f: f.write(output_geojson) print(f"GeoJSON data exported to '{filename}' file.") def convert_id_to_string(prefix, x): return prefix + str(x) def postprocess(prediction_geojson_path, store_path): with open(prediction_geojson_path,"r",) as file: prediction_data = json.load(file) df = turn_into_dataframe(prediction_data) df["id"] = df.index df['Confidence_score'] = df['Confidence_score'].astype(float) df["id"] = df["id"].apply(lambda x: convert_id_to_string("df_", x)) df["to_drop"] = False df["to_merge"] = False print(f"Number of polygons before postprocessing: {len(df)}") df_res = process([df]) print(f"Number of polygons after postprocessing: {len(df_res)}") export_df_as_geojson(df=df_res, filename=store_path) return df_res