import json |
from shapely.geometry import Polygon, Point |
from shapely.ops import unary_union |
import matplotlib.pyplot as plt |
from matplotlib.patches import Polygon as MplPolygon |
import numpy as np |
import pandas as pd |
def add_extreme_coordinates(polygon_data): |
polygon_coords = np.array(polygon_data["geometry"]["coordinates"][0]) |
polygon_data["geometry"]["max_lat"] = max(polygon_coords[:, 1]) |
polygon_data["geometry"]["min_lat"] = min(polygon_coords[:, 1]) |
polygon_data["geometry"]["max_lon"] = max(polygon_coords[:, 0]) |
polygon_data["geometry"]["min_lon"] = min(polygon_coords[:, 0]) |
def turn_into_dataframe(data): |
data_list = data["features"] |
for i in range(len(data_list)): |
add_extreme_coordinates(data_list[i]) |
df = pd.DataFrame(data_list).drop(columns="type") |
dict_cols = ["properties", "geometry"] |
for dict_col in dict_cols: |
dict_df = pd.json_normalize(df[dict_col]) |
df = df.drop(columns=[dict_col]).join(dict_df) |
df["coordinates"] = df["coordinates"].apply(lambda x: x[0]) |
df["polygon"] = df["coordinates"].apply(lambda x: Polygon(x)) |
df = df.drop(columns=["type"]) |
return df |
def plot_polygon(ax, polygon, color, label="label"): |
if not polygon.is_empty: |
x, y = polygon.exterior.xy |
ax.fill(x, y, color=color, alpha=0.5, label=label) |
def plot_polygons(list_polygons, first_one_different=False, dpi=150): |
plt.figure(dpi=dpi) |
fig, ax = plt.subplots() |
if first_one_different: |
plot_polygon(ax, list_polygons[0], "red", f"polygon {0}") |
for i, polygon in enumerate(list_polygons[1:]): |
plot_polygon(ax, polygon, "blue", f"polygon {i}") |
else: |
for i, polygon in enumerate(list_polygons): |
plot_polygon(ax, polygon, "blue", f"polygon {i}") |
ax.set_aspect("equal") |
ax.set_title("Polygons and their Intersection") |
plt.ylabel("lat") |
plt.xlabel("lon") |
plt.show() |
def plot_polygons_with_colors(list_polygons, list_colors, dpi=150): |
plt.figure(dpi=dpi) |
fig, ax = plt.subplots() |
for polygon, color in zip(list_polygons, list_colors): |
plot_polygon(ax, polygon, color) |
ax.set_aspect("equal") |
ax.set_title("Polygons and their Intersection") |
plt.ylabel("lat") |
plt.xlabel("lon") |
plt.show() |
def plot_polygons_from_df(df, dpi=150): |
list_polygons = [] |
for index, row in df.iterrows(): |
list_polygons.append(row["polygon"]) |
plot_polygons(list_polygons=list_polygons, dpi=dpi) |
def map_color(id): |
return "blue" |
def plot_polygons_from_df_with_color(df, dpi=150): |
df["plot_colors"] = df["id"].apply(map_color) |
list_polygons = [] |
list_colors = [] |
for index, row in df.iterrows(): |
list_polygons.append(row["polygon"]) |
list_colors.append(row["plot_colors"]) |
plot_polygons_with_colors( |
list_polygons=list_polygons, list_colors=list_colors, dpi=dpi |
) |
def intersection(polygon, polygon_comparison): |
return polygon.intersection(polygon_comparison) |
def intersection_area(polygon, polygon_comparison): |
return intersection(polygon, polygon_comparison).area |
def intersection_area_ratio(polygon, polygon_comparison): |
return intersection_area(polygon, polygon_comparison) / polygon.area |
def containsPoint(polygonB, polygon): |
coordinatesB = get_coordinates(polygonB) |
for coord in coordinatesB: |
coord = Point(coord) |
if polygon.contains(coord): |
return True |
else: |
return False |
def get_coordinates(polygon): |
coordinates = polygon.exterior.coords |
coordinates = [list(pair) for pair in coordinates] |
return coordinates |
def mark_id_to_be_dropped(df, id_string): |
df.loc[df['id']== id_string , 'to_drop'] = True |
def mark_id_to_be_merged(df, id_string): |
df.loc[df['id']== id_string , 'to_merge'] = True |
def calc_overlapping_subset(df_input, index): |
max_lat = df_input.iloc[index]['max_lat'] |
min_lat = df_input.iloc[index]['min_lat'] |
max_lon = df_input.iloc[index]['max_lon'] |
min_lon = df_input.iloc[index]['min_lon'] |
relevant_subset = df_input.loc[( (( ((max_lat < df_input['max_lat']) & (max_lat > df_input['min_lat'])) | \ |
((min_lat < df_input['max_lat']) & (min_lat > df_input['min_lat'])) )| \ |
( ((df_input['max_lat'] < max_lat) & (df_input['max_lat'] > min_lat)) | \ |
((df_input['min_lat'] > min_lat ) & ( df_input['min_lat'] < max_lat)) ) ) & \ |
(( ( ((max_lon < df_input['max_lon']) & (max_lon > df_input['min_lon'])) | \ |
((min_lon < df_input['max_lon']) & (min_lon > df_input['min_lon'])) ) ) | |
( ((df_input['max_lon'] < max_lon ) & (df_input['max_lon'] > min_lon)) | \ |
((df_input['min_lon'] > min_lon) & (df_input['min_lon'] < max_lon)) ) ) )] |
return relevant_subset |
def remove_contained_poylgons(df_input): |
df_result = df_input.copy() |
for i in range (len(df_result)): |
polygonA = df_input.iloc[i]['polygon'] |
relevant_subset = calc_overlapping_subset(df_input = df_result, index = i) |
threshold = 0.85 |
for j in range(len(relevant_subset)): |
ratio_current_choice = intersection_area_ratio(polygon = polygonA, polygon_comparison = relevant_subset.iloc[j]['polygon']) |
ratio_alternative_choice = intersection_area_ratio(polygon = relevant_subset.iloc[j]['polygon'], polygon_comparison= polygonA) |
if (ratio_current_choice > threshold) or (ratio_alternative_choice > threshold): |
if polygonA.area > relevant_subset.iloc[j]['polygon'].area: |
mark_id_to_be_dropped(df=df_result, id_string = relevant_subset.iloc[j]['id']) |
else: |
mark_id_to_be_dropped(df=df_result, id_string = df_input.iloc[i]['id']) |
df_result = df_result.loc[df_result["to_drop"] == False] |
return df_result |
def merge(df_input, polygon_index, merge_subset): |
for j in range(len(merge_subset)): |
merged_polygon = df_input.iloc[polygon_index] |
merged_polygon_id = df_input.iloc[polygon_index]['id'] |
merged_polygon_index = merged_polygon.index |
tmp = merged_polygon['polygon'].union(merge_subset.iloc[j]['polygon']) |
merged_coordinates = list(tmp.exterior.coords) |
merged_polygon = Polygon(merged_coordinates) |
coordinates = [list(tup) for tup in merged_coordinates] |
min_lon = min([point[0] for point in coordinates]) |
max_lon = max([point[0] for point in coordinates]) |
min_lat = min([point[1] for point in coordinates]) |
max_lat = max([point[1] for point in coordinates]) |
polygon_score = merge_subset.iloc[j]['Confidence_score'] |
df_input.loc[df_input['id'] == merged_polygon_id,'polygon'] = merged_polygon |
df_input.loc[df_input['id'] == merged_polygon_id,'min_lon'] = min_lon |
df_input.loc[df_input['id'] == merged_polygon_id,'max_lon'] = max_lon |
df_input.loc[df_input['id'] == merged_polygon_id,'min_lat'] = min_lat |
df_input.loc[df_input['id'] == merged_polygon_id,'max_lat'] = max_lat |
df_input.loc[df_input['id'] == merged_polygon_id,'Confidence_score'] = (df_input.iloc[polygon_index]['Confidence_score'] + polygon_score)/2 |
df_input.loc[df_input['id'] == merged_polygon_id, 'coordinates'] = df_input.loc[df_input['id'] == merged_polygon_id, 'polygon'].apply(get_coordinates) |
df_input = df_input.loc[df_input['id'] != merge_subset.iloc[j]['id']] |
return df_input |
def merge_overlapping(df_input): |
threshold = 0.40 |
for i in range(len(df_input)): |
polygon = df_input.iloc[i]['polygon'] |
relevant_subset = calc_overlapping_subset(df_input=df_input, index=i) |
toBeMerged = False |
for j in range(len(relevant_subset)): |
ratio_current_choice = intersection_area_ratio(polygon = polygon, polygon_comparison = relevant_subset.iloc[j]['polygon']) |
ratio_alternative_choice = intersection_area_ratio(polygon = relevant_subset.iloc[j]['polygon'], polygon_comparison= polygon) |
if (ratio_current_choice > threshold) or (ratio_alternative_choice > threshold): |
toBeMerged = True |
mark_id_to_be_merged(df=relevant_subset, id_string = relevant_subset.iloc[j]['id']) |
if toBeMerged: |
df_input = merge(df_input=df_input, polygon_index=i, merge_subset=relevant_subset[relevant_subset['to_merge']==True]) |
return True, df_input |
return False, df_input |
def process(list_df): |
df_res = pd.concat(list_df) |
df_res = remove_contained_poylgons(df_input= df_res) |
i = 0 |
merged, df_res = merge_overlapping(df_input=df_res) |
while(merged): |
i+=1 |
if i%100 == 0: |
print(i) |
merged, df_res = merge_overlapping(df_input=df_res) |
return df_res |
def combine_different_tile_size(df_smaller, df_bigger): |
df_result = df_bigger.copy() |
for i in range(len(df_smaller)): |
max_lat = df_smaller.iloc[i]["max_lat"] |
min_lat = df_smaller.iloc[i]["min_lat"] |
max_lon = df_smaller.iloc[i]["max_lon"] |
min_lon = df_smaller.iloc[i]["min_lon"] |
polygon = df_smaller.iloc[i]["polygon"] |
relevant_subset = df_bigger.loc[ |
( |
((max_lat < df_bigger["max_lat"]) & (max_lat > df_bigger["min_lat"])) |
| ((min_lat < df_bigger["max_lat"]) & (min_lat > df_bigger["min_lat"])) |
) |
& ( |
((max_lon < df_bigger["max_lon"]) & (max_lon > df_bigger["min_lon"])) |
| ((min_lon < df_bigger["max_lon"]) & (min_lon > df_bigger["min_lon"])) |
) |
] |
list_polygons = [polygon] |
for index, row in relevant_subset.iterrows(): |
list_polygons.append(row["polygon"]) |
add_polygon = True |
threashold = 0.15 |
for comparison_polygon in list_polygons[1:]: |
ratio = intersection_area_ratio(polygon, comparison_polygon) |
if ratio > threashold: |
add_polygon = False |
if add_polygon: |
df_result = pd.concat( |
[df_result, df_smaller.iloc[[i]]], axis=0, join="outer" |
) |
return df_result |
def clean(df, score_threashold=0.5): |
df = df.loc[df["score"] > score_threashold] |
return df |
def row_to_feature(row): |
feature = { |
"id": row["id"], |
"type": "Feature", |
"properties": {"Confidence_score": row["Confidence_score"]}, |
"geometry": {"type": "Polygon", "coordinates": [row["coordinates"]]}, |
} |
return feature |
def export_df_as_geojson(df, filename): |
features = [row_to_feature(row) for idx, row in df.iterrows()] |
feature_collection = { |
"type": "FeatureCollection", |
"crs": {"type": "name", "properties": {"name": "urn:ogc:def:crs:EPSG::32720"}}, |
"features": features, |
} |
output_geojson = json.dumps(feature_collection) |
with open(f"{filename}", "w") as f: |
f.write(output_geojson) |
print(f"GeoJSON data exported to '{filename}' file.") |
def convert_id_to_string(prefix, x): |
return prefix + str(x) |
def postprocess(prediction_geojson_path, store_path): |
with open(prediction_geojson_path,"r",) as file: |
prediction_data = json.load(file) |
df = turn_into_dataframe(prediction_data) |
df["id"] = df.index |
df['Confidence_score'] = df['Confidence_score'].astype(float) |
df["id"] = df["id"].apply(lambda x: convert_id_to_string("df_", x)) |
df["to_drop"] = False |
df["to_merge"] = False |
print(f"Number of polygons before postprocessing: {len(df)}") |
df_res = process([df]) |
print(f"Number of polygons after postprocessing: {len(df_res)}") |
export_df_as_geojson(df=df_res, filename=store_path) |
return df_res |