|
import json |
|
from shapely.geometry import Polygon, Point |
|
from shapely.ops import unary_union |
|
import matplotlib.pyplot as plt |
|
from matplotlib.patches import Polygon as MplPolygon |
|
import numpy as np |
|
import pandas as pd |
|
|
|
|
|
def add_extreme_coordinates(polygon_data): |
|
polygon_coords = np.array(polygon_data["geometry"]["coordinates"][0]) |
|
|
|
polygon_data["geometry"]["max_lat"] = max(polygon_coords[:, 1]) |
|
polygon_data["geometry"]["min_lat"] = min(polygon_coords[:, 1]) |
|
polygon_data["geometry"]["max_lon"] = max(polygon_coords[:, 0]) |
|
polygon_data["geometry"]["min_lon"] = min(polygon_coords[:, 0]) |
|
|
|
|
|
def turn_into_dataframe(data): |
|
data_list = data["features"] |
|
|
|
for i in range(len(data_list)): |
|
add_extreme_coordinates(data_list[i]) |
|
|
|
df = pd.DataFrame(data_list).drop(columns="type") |
|
|
|
dict_cols = ["properties", "geometry"] |
|
for dict_col in dict_cols: |
|
dict_df = pd.json_normalize(df[dict_col]) |
|
|
|
df = df.drop(columns=[dict_col]).join(dict_df) |
|
df["coordinates"] = df["coordinates"].apply(lambda x: x[0]) |
|
df["polygon"] = df["coordinates"].apply(lambda x: Polygon(x)) |
|
|
|
df = df.drop(columns=["type"]) |
|
return df |
|
|
|
|
|
def plot_polygon(ax, polygon, color, label="label"): |
|
if not polygon.is_empty: |
|
x, y = polygon.exterior.xy |
|
ax.fill(x, y, color=color, alpha=0.5, label=label) |
|
|
|
|
|
def plot_polygons(list_polygons, first_one_different=False, dpi=150): |
|
|
|
plt.figure(dpi=dpi) |
|
fig, ax = plt.subplots() |
|
|
|
if first_one_different: |
|
plot_polygon(ax, list_polygons[0], "red", f"polygon {0}") |
|
for i, polygon in enumerate(list_polygons[1:]): |
|
plot_polygon(ax, polygon, "blue", f"polygon {i}") |
|
else: |
|
for i, polygon in enumerate(list_polygons): |
|
plot_polygon(ax, polygon, "blue", f"polygon {i}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ax.set_aspect("equal") |
|
|
|
|
|
ax.set_title("Polygons and their Intersection") |
|
plt.ylabel("lat") |
|
plt.xlabel("lon") |
|
|
|
plt.show() |
|
|
|
|
|
def plot_polygons_with_colors(list_polygons, list_colors, dpi=150): |
|
|
|
plt.figure(dpi=dpi) |
|
fig, ax = plt.subplots() |
|
|
|
for polygon, color in zip(list_polygons, list_colors): |
|
plot_polygon(ax, polygon, color) |
|
|
|
|
|
ax.set_aspect("equal") |
|
|
|
|
|
ax.set_title("Polygons and their Intersection") |
|
plt.ylabel("lat") |
|
plt.xlabel("lon") |
|
|
|
plt.show() |
|
|
|
|
|
def plot_polygons_from_df(df, dpi=150): |
|
list_polygons = [] |
|
for index, row in df.iterrows(): |
|
list_polygons.append(row["polygon"]) |
|
plot_polygons(list_polygons=list_polygons, dpi=dpi) |
|
|
|
|
|
def map_color(id): |
|
return "blue" |
|
|
|
|
|
def plot_polygons_from_df_with_color(df, dpi=150): |
|
|
|
df["plot_colors"] = df["id"].apply(map_color) |
|
list_polygons = [] |
|
list_colors = [] |
|
for index, row in df.iterrows(): |
|
list_polygons.append(row["polygon"]) |
|
list_colors.append(row["plot_colors"]) |
|
plot_polygons_with_colors( |
|
list_polygons=list_polygons, list_colors=list_colors, dpi=dpi |
|
) |
|
|
|
def intersection(polygon, polygon_comparison): |
|
return polygon.intersection(polygon_comparison) |
|
|
|
|
|
def intersection_area(polygon, polygon_comparison): |
|
return intersection(polygon, polygon_comparison).area |
|
|
|
|
|
def intersection_area_ratio(polygon, polygon_comparison): |
|
return intersection_area(polygon, polygon_comparison) / polygon.area |
|
|
|
def containsPoint(polygonB, polygon): |
|
coordinatesB = get_coordinates(polygonB) |
|
for coord in coordinatesB: |
|
coord = Point(coord) |
|
if polygon.contains(coord): |
|
return True |
|
else: |
|
return False |
|
|
|
def get_coordinates(polygon): |
|
coordinates = polygon.exterior.coords |
|
coordinates = [list(pair) for pair in coordinates] |
|
return coordinates |
|
|
|
def mark_id_to_be_dropped(df, id_string): |
|
df.loc[df['id']== id_string , 'to_drop'] = True |
|
|
|
def mark_id_to_be_merged(df, id_string): |
|
df.loc[df['id']== id_string , 'to_merge'] = True |
|
|
|
def calc_overlapping_subset(df_input, index): |
|
max_lat = df_input.iloc[index]['max_lat'] |
|
min_lat = df_input.iloc[index]['min_lat'] |
|
max_lon = df_input.iloc[index]['max_lon'] |
|
min_lon = df_input.iloc[index]['min_lon'] |
|
relevant_subset = df_input.loc[( (( ((max_lat < df_input['max_lat']) & (max_lat > df_input['min_lat'])) | \ |
|
((min_lat < df_input['max_lat']) & (min_lat > df_input['min_lat'])) )| \ |
|
( ((df_input['max_lat'] < max_lat) & (df_input['max_lat'] > min_lat)) | \ |
|
((df_input['min_lat'] > min_lat ) & ( df_input['min_lat'] < max_lat)) ) ) & \ |
|
(( ( ((max_lon < df_input['max_lon']) & (max_lon > df_input['min_lon'])) | \ |
|
((min_lon < df_input['max_lon']) & (min_lon > df_input['min_lon'])) ) ) | |
|
( ((df_input['max_lon'] < max_lon ) & (df_input['max_lon'] > min_lon)) | \ |
|
((df_input['min_lon'] > min_lon) & (df_input['min_lon'] < max_lon)) ) ) )] |
|
return relevant_subset |
|
|
|
def remove_contained_poylgons(df_input): |
|
df_result = df_input.copy() |
|
|
|
for i in range (len(df_result)): |
|
|
|
polygonA = df_input.iloc[i]['polygon'] |
|
|
|
|
|
|
|
relevant_subset = calc_overlapping_subset(df_input = df_result, index = i) |
|
|
|
|
|
|
|
threshold = 0.85 |
|
for j in range(len(relevant_subset)): |
|
ratio_current_choice = intersection_area_ratio(polygon = polygonA, polygon_comparison = relevant_subset.iloc[j]['polygon']) |
|
ratio_alternative_choice = intersection_area_ratio(polygon = relevant_subset.iloc[j]['polygon'], polygon_comparison= polygonA) |
|
if (ratio_current_choice > threshold) or (ratio_alternative_choice > threshold): |
|
if polygonA.area > relevant_subset.iloc[j]['polygon'].area: |
|
mark_id_to_be_dropped(df=df_result, id_string = relevant_subset.iloc[j]['id']) |
|
else: |
|
mark_id_to_be_dropped(df=df_result, id_string = df_input.iloc[i]['id']) |
|
|
|
|
|
df_result = df_result.loc[df_result["to_drop"] == False] |
|
return df_result |
|
|
|
def merge(df_input, polygon_index, merge_subset): |
|
for j in range(len(merge_subset)): |
|
|
|
|
|
merged_polygon = df_input.iloc[polygon_index] |
|
merged_polygon_id = df_input.iloc[polygon_index]['id'] |
|
merged_polygon_index = merged_polygon.index |
|
|
|
|
|
tmp = merged_polygon['polygon'].union(merge_subset.iloc[j]['polygon']) |
|
merged_coordinates = list(tmp.exterior.coords) |
|
merged_polygon = Polygon(merged_coordinates) |
|
|
|
coordinates = [list(tup) for tup in merged_coordinates] |
|
|
|
min_lon = min([point[0] for point in coordinates]) |
|
max_lon = max([point[0] for point in coordinates]) |
|
min_lat = min([point[1] for point in coordinates]) |
|
max_lat = max([point[1] for point in coordinates]) |
|
polygon_score = merge_subset.iloc[j]['Confidence_score'] |
|
|
|
|
|
df_input.loc[df_input['id'] == merged_polygon_id,'polygon'] = merged_polygon |
|
df_input.loc[df_input['id'] == merged_polygon_id,'min_lon'] = min_lon |
|
df_input.loc[df_input['id'] == merged_polygon_id,'max_lon'] = max_lon |
|
df_input.loc[df_input['id'] == merged_polygon_id,'min_lat'] = min_lat |
|
df_input.loc[df_input['id'] == merged_polygon_id,'max_lat'] = max_lat |
|
df_input.loc[df_input['id'] == merged_polygon_id,'Confidence_score'] = (df_input.iloc[polygon_index]['Confidence_score'] + polygon_score)/2 |
|
df_input.loc[df_input['id'] == merged_polygon_id, 'coordinates'] = df_input.loc[df_input['id'] == merged_polygon_id, 'polygon'].apply(get_coordinates) |
|
df_input = df_input.loc[df_input['id'] != merge_subset.iloc[j]['id']] |
|
return df_input |
|
|
|
|
|
def merge_overlapping(df_input): |
|
|
|
threshold = 0.40 |
|
|
|
|
|
for i in range(len(df_input)): |
|
polygon = df_input.iloc[i]['polygon'] |
|
relevant_subset = calc_overlapping_subset(df_input=df_input, index=i) |
|
toBeMerged = False |
|
for j in range(len(relevant_subset)): |
|
ratio_current_choice = intersection_area_ratio(polygon = polygon, polygon_comparison = relevant_subset.iloc[j]['polygon']) |
|
ratio_alternative_choice = intersection_area_ratio(polygon = relevant_subset.iloc[j]['polygon'], polygon_comparison= polygon) |
|
if (ratio_current_choice > threshold) or (ratio_alternative_choice > threshold): |
|
toBeMerged = True |
|
mark_id_to_be_merged(df=relevant_subset, id_string = relevant_subset.iloc[j]['id']) |
|
|
|
if toBeMerged: |
|
|
|
df_input = merge(df_input=df_input, polygon_index=i, merge_subset=relevant_subset[relevant_subset['to_merge']==True]) |
|
return True, df_input |
|
|
|
return False, df_input |
|
|
|
|
|
def process(list_df): |
|
df_res = pd.concat(list_df) |
|
df_res = remove_contained_poylgons(df_input= df_res) |
|
i = 0 |
|
merged, df_res = merge_overlapping(df_input=df_res) |
|
while(merged): |
|
i+=1 |
|
if i%100 == 0: |
|
print(i) |
|
merged, df_res = merge_overlapping(df_input=df_res) |
|
return df_res |
|
|
|
|
|
def combine_different_tile_size(df_smaller, df_bigger): |
|
|
|
df_result = df_bigger.copy() |
|
|
|
for i in range(len(df_smaller)): |
|
max_lat = df_smaller.iloc[i]["max_lat"] |
|
min_lat = df_smaller.iloc[i]["min_lat"] |
|
max_lon = df_smaller.iloc[i]["max_lon"] |
|
min_lon = df_smaller.iloc[i]["min_lon"] |
|
|
|
polygon = df_smaller.iloc[i]["polygon"] |
|
|
|
relevant_subset = df_bigger.loc[ |
|
( |
|
((max_lat < df_bigger["max_lat"]) & (max_lat > df_bigger["min_lat"])) |
|
| ((min_lat < df_bigger["max_lat"]) & (min_lat > df_bigger["min_lat"])) |
|
) |
|
& ( |
|
((max_lon < df_bigger["max_lon"]) & (max_lon > df_bigger["min_lon"])) |
|
| ((min_lon < df_bigger["max_lon"]) & (min_lon > df_bigger["min_lon"])) |
|
) |
|
] |
|
|
|
list_polygons = [polygon] |
|
|
|
for index, row in relevant_subset.iterrows(): |
|
list_polygons.append(row["polygon"]) |
|
|
|
add_polygon = True |
|
threashold = 0.15 |
|
for comparison_polygon in list_polygons[1:]: |
|
ratio = intersection_area_ratio(polygon, comparison_polygon) |
|
if ratio > threashold: |
|
add_polygon = False |
|
|
|
if add_polygon: |
|
|
|
df_result = pd.concat( |
|
[df_result, df_smaller.iloc[[i]]], axis=0, join="outer" |
|
) |
|
|
|
return df_result |
|
|
|
|
|
def clean(df, score_threashold=0.5): |
|
df = df.loc[df["score"] > score_threashold] |
|
return df |
|
|
|
def row_to_feature(row): |
|
feature = { |
|
"id": row["id"], |
|
"type": "Feature", |
|
"properties": {"Confidence_score": row["Confidence_score"]}, |
|
"geometry": {"type": "Polygon", "coordinates": [row["coordinates"]]}, |
|
} |
|
return feature |
|
|
|
|
|
def export_df_as_geojson(df, filename): |
|
features = [row_to_feature(row) for idx, row in df.iterrows()] |
|
|
|
feature_collection = { |
|
"type": "FeatureCollection", |
|
"crs": {"type": "name", "properties": {"name": "urn:ogc:def:crs:EPSG::32720"}}, |
|
"features": features, |
|
} |
|
|
|
output_geojson = json.dumps(feature_collection) |
|
|
|
with open(f"{filename}", "w") as f: |
|
f.write(output_geojson) |
|
|
|
print(f"GeoJSON data exported to '{filename}' file.") |
|
|
|
def convert_id_to_string(prefix, x): |
|
return prefix + str(x) |
|
|
|
def postprocess(prediction_geojson_path, store_path): |
|
with open(prediction_geojson_path,"r",) as file: |
|
prediction_data = json.load(file) |
|
|
|
df = turn_into_dataframe(prediction_data) |
|
|
|
df["id"] = df.index |
|
|
|
df['Confidence_score'] = df['Confidence_score'].astype(float) |
|
|
|
df["id"] = df["id"].apply(lambda x: convert_id_to_string("df_", x)) |
|
|
|
df["to_drop"] = False |
|
df["to_merge"] = False |
|
print(f"Number of polygons before postprocessing: {len(df)}") |
|
|
|
df_res = process([df]) |
|
|
|
print(f"Number of polygons after postprocessing: {len(df_res)}") |
|
|
|
export_df_as_geojson(df=df_res, filename=store_path) |
|
|
|
return df_res |