WeCanopy / polygons_processing /postpprocess_detectree2.py
babypoby's picture
IM DONE BITCH
3f65192
import json
from shapely.geometry import Polygon, Point
from shapely.ops import unary_union
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon as MplPolygon
import numpy as np
import pandas as pd
def add_extreme_coordinates(polygon_data):
polygon_coords = np.array(polygon_data["geometry"]["coordinates"][0])
polygon_data["geometry"]["max_lat"] = max(polygon_coords[:, 1])
polygon_data["geometry"]["min_lat"] = min(polygon_coords[:, 1])
polygon_data["geometry"]["max_lon"] = max(polygon_coords[:, 0])
polygon_data["geometry"]["min_lon"] = min(polygon_coords[:, 0])
def turn_into_dataframe(data):
data_list = data["features"]
for i in range(len(data_list)):
add_extreme_coordinates(data_list[i])
df = pd.DataFrame(data_list).drop(columns="type")
dict_cols = ["properties", "geometry"]
for dict_col in dict_cols:
dict_df = pd.json_normalize(df[dict_col])
# Merge the new columns back into the original DataFrame
df = df.drop(columns=[dict_col]).join(dict_df)
df["coordinates"] = df["coordinates"].apply(lambda x: x[0])
df["polygon"] = df["coordinates"].apply(lambda x: Polygon(x))
df = df.drop(columns=["type"])
return df
# Function to plot a polygon
def plot_polygon(ax, polygon, color, label="label"):
if not polygon.is_empty:
x, y = polygon.exterior.xy
ax.fill(x, y, color=color, alpha=0.5, label=label)
def plot_polygons(list_polygons, first_one_different=False, dpi=150):
# Plot the polygons and their intersection
plt.figure(dpi=dpi)
fig, ax = plt.subplots()
if first_one_different:
plot_polygon(ax, list_polygons[0], "red", f"polygon {0}")
for i, polygon in enumerate(list_polygons[1:]):
plot_polygon(ax, polygon, "blue", f"polygon {i}")
else:
for i, polygon in enumerate(list_polygons):
plot_polygon(ax, polygon, "blue", f"polygon {i}")
# Plot the intersection
# plot_polygon(ax, intersection, 'red', 'Intersection')
# Add legend
# ax.legend()
# Set axis limits
ax.set_aspect("equal")
# Set title
ax.set_title("Polygons and their Intersection")
plt.ylabel("lat")
plt.xlabel("lon")
plt.show()
def plot_polygons_with_colors(list_polygons, list_colors, dpi=150):
# Plot the polygons and their intersection
plt.figure(dpi=dpi)
fig, ax = plt.subplots()
for polygon, color in zip(list_polygons, list_colors):
plot_polygon(ax, polygon, color)
# Set axis limits
ax.set_aspect("equal")
# Set title
ax.set_title("Polygons and their Intersection")
plt.ylabel("lat")
plt.xlabel("lon")
plt.show()
def plot_polygons_from_df(df, dpi=150):
list_polygons = []
for index, row in df.iterrows():
list_polygons.append(row["polygon"])
plot_polygons(list_polygons=list_polygons, dpi=dpi)
def map_color(id):
return "blue"
def plot_polygons_from_df_with_color(df, dpi=150):
df["plot_colors"] = df["id"].apply(map_color)
list_polygons = []
list_colors = []
for index, row in df.iterrows():
list_polygons.append(row["polygon"])
list_colors.append(row["plot_colors"])
plot_polygons_with_colors(
list_polygons=list_polygons, list_colors=list_colors, dpi=dpi
)
def intersection(polygon, polygon_comparison):
return polygon.intersection(polygon_comparison)
def intersection_area(polygon, polygon_comparison):
return intersection(polygon, polygon_comparison).area
def intersection_area_ratio(polygon, polygon_comparison):
return intersection_area(polygon, polygon_comparison) / polygon.area
def containsPoint(polygonB, polygon):
coordinatesB = get_coordinates(polygonB)
for coord in coordinatesB:
coord = Point(coord)
if polygon.contains(coord):
return True
else:
return False
def get_coordinates(polygon):
coordinates = polygon.exterior.coords
coordinates = [list(pair) for pair in coordinates]
return coordinates
def mark_id_to_be_dropped(df, id_string):
df.loc[df['id']== id_string , 'to_drop'] = True
def mark_id_to_be_merged(df, id_string):
df.loc[df['id']== id_string , 'to_merge'] = True
def calc_overlapping_subset(df_input, index):
max_lat = df_input.iloc[index]['max_lat']
min_lat = df_input.iloc[index]['min_lat']
max_lon = df_input.iloc[index]['max_lon']
min_lon = df_input.iloc[index]['min_lon']
relevant_subset = df_input.loc[( (( ((max_lat < df_input['max_lat']) & (max_lat > df_input['min_lat'])) | \
((min_lat < df_input['max_lat']) & (min_lat > df_input['min_lat'])) )| \
( ((df_input['max_lat'] < max_lat) & (df_input['max_lat'] > min_lat)) | \
((df_input['min_lat'] > min_lat ) & ( df_input['min_lat'] < max_lat)) ) ) & \
(( ( ((max_lon < df_input['max_lon']) & (max_lon > df_input['min_lon'])) | \
((min_lon < df_input['max_lon']) & (min_lon > df_input['min_lon'])) ) ) |
( ((df_input['max_lon'] < max_lon ) & (df_input['max_lon'] > min_lon)) | \
((df_input['min_lon'] > min_lon) & (df_input['min_lon'] < max_lon)) ) ) )]
return relevant_subset
def remove_contained_poylgons(df_input):
df_result = df_input.copy()
for i in range (len(df_result)):
polygonA = df_input.iloc[i]['polygon']
#relevant_subset = df_result[df_result['polygon'].apply(lambda polygonB: containsPoint(polygonA, polygonB))]
#relevant_subset = relevant_subset[relevant_subset['id'] != df_input.iloc[i]['id']]
relevant_subset = calc_overlapping_subset(df_input = df_result, index = i)
# Experiment with this parameter to find the best threshold
# It certainly has to be smaller than 0.9
threshold = 0.85
for j in range(len(relevant_subset)):
ratio_current_choice = intersection_area_ratio(polygon = polygonA, polygon_comparison = relevant_subset.iloc[j]['polygon'])
ratio_alternative_choice = intersection_area_ratio(polygon = relevant_subset.iloc[j]['polygon'], polygon_comparison= polygonA)
if (ratio_current_choice > threshold) or (ratio_alternative_choice > threshold): # or ratio_alternative_choice > threashold:
if polygonA.area > relevant_subset.iloc[j]['polygon'].area:
mark_id_to_be_dropped(df=df_result, id_string = relevant_subset.iloc[j]['id'])
else:
mark_id_to_be_dropped(df=df_result, id_string = df_input.iloc[i]['id'])
#remove all polygons that had a marked id
df_result = df_result.loc[df_result["to_drop"] == False]
return df_result
def merge(df_input, polygon_index, merge_subset):
for j in range(len(merge_subset)):
#merge merged_polygon with j-th polygon in merge_subset
#delete j_th polygon in merge_subset from df_input
merged_polygon = df_input.iloc[polygon_index]
merged_polygon_id = df_input.iloc[polygon_index]['id']
merged_polygon_index = merged_polygon.index
#change by merge --> polygon, coordinates, min/max long lat, score (use max or min or avg)
tmp = merged_polygon['polygon'].union(merge_subset.iloc[j]['polygon'])
merged_coordinates = list(tmp.exterior.coords)
merged_polygon = Polygon(merged_coordinates) #new polygon
coordinates = [list(tup) for tup in merged_coordinates] #new coordinates
#updating min/max long/lat
min_lon = min([point[0] for point in coordinates])
max_lon = max([point[0] for point in coordinates])
min_lat = min([point[1] for point in coordinates])
max_lat = max([point[1] for point in coordinates])
polygon_score = merge_subset.iloc[j]['Confidence_score']
#updating merged polygon
df_input.loc[df_input['id'] == merged_polygon_id,'polygon'] = merged_polygon
df_input.loc[df_input['id'] == merged_polygon_id,'min_lon'] = min_lon
df_input.loc[df_input['id'] == merged_polygon_id,'max_lon'] = max_lon
df_input.loc[df_input['id'] == merged_polygon_id,'min_lat'] = min_lat
df_input.loc[df_input['id'] == merged_polygon_id,'max_lat'] = max_lat
df_input.loc[df_input['id'] == merged_polygon_id,'Confidence_score'] = (df_input.iloc[polygon_index]['Confidence_score'] + polygon_score)/2
df_input.loc[df_input['id'] == merged_polygon_id, 'coordinates'] = df_input.loc[df_input['id'] == merged_polygon_id, 'polygon'].apply(get_coordinates)
df_input = df_input.loc[df_input['id'] != merge_subset.iloc[j]['id']]
return df_input
def merge_overlapping(df_input):
# Experiment with this parameter to get the best results
threshold = 0.40
#df_result = df_input.copy()
for i in range(len(df_input)):
polygon = df_input.iloc[i]['polygon']
relevant_subset = calc_overlapping_subset(df_input=df_input, index=i)
toBeMerged = False
for j in range(len(relevant_subset)):
ratio_current_choice = intersection_area_ratio(polygon = polygon, polygon_comparison = relevant_subset.iloc[j]['polygon'])
ratio_alternative_choice = intersection_area_ratio(polygon = relevant_subset.iloc[j]['polygon'], polygon_comparison= polygon)
if (ratio_current_choice > threshold) or (ratio_alternative_choice > threshold):
toBeMerged = True
mark_id_to_be_merged(df=relevant_subset, id_string = relevant_subset.iloc[j]['id'])
if toBeMerged:
# deleting is handled in this funciton as well
df_input = merge(df_input=df_input, polygon_index=i, merge_subset=relevant_subset[relevant_subset['to_merge']==True])
return True, df_input
return False, df_input
def process(list_df):
df_res = pd.concat(list_df)
df_res = remove_contained_poylgons(df_input= df_res)
i = 0
merged, df_res = merge_overlapping(df_input=df_res)
while(merged):
i+=1
if i%100 == 0:
print(i)
merged, df_res = merge_overlapping(df_input=df_res)
return df_res
def combine_different_tile_size(df_smaller, df_bigger):
df_result = df_bigger.copy()
for i in range(len(df_smaller)):
max_lat = df_smaller.iloc[i]["max_lat"]
min_lat = df_smaller.iloc[i]["min_lat"]
max_lon = df_smaller.iloc[i]["max_lon"]
min_lon = df_smaller.iloc[i]["min_lon"]
polygon = df_smaller.iloc[i]["polygon"]
relevant_subset = df_bigger.loc[
(
((max_lat < df_bigger["max_lat"]) & (max_lat > df_bigger["min_lat"]))
| ((min_lat < df_bigger["max_lat"]) & (min_lat > df_bigger["min_lat"]))
)
& (
((max_lon < df_bigger["max_lon"]) & (max_lon > df_bigger["min_lon"]))
| ((min_lon < df_bigger["max_lon"]) & (min_lon > df_bigger["min_lon"]))
)
]
list_polygons = [polygon]
for index, row in relevant_subset.iterrows():
list_polygons.append(row["polygon"])
add_polygon = True
threashold = 0.15
for comparison_polygon in list_polygons[1:]:
ratio = intersection_area_ratio(polygon, comparison_polygon)
if ratio > threashold:
add_polygon = False
if add_polygon:
# df_result = pd.concat([df_result, df_result.iloc[[i]]], axis= 1, ignore_index=True)#df_result.append(df_result.iloc[i], ignore_index=True)
df_result = pd.concat(
[df_result, df_smaller.iloc[[i]]], axis=0, join="outer"
) #
return df_result
def clean(df, score_threashold=0.5):
df = df.loc[df["score"] > score_threashold]
return df
def row_to_feature(row):
feature = {
"id": row["id"],
"type": "Feature",
"properties": {"Confidence_score": row["Confidence_score"]},
"geometry": {"type": "Polygon", "coordinates": [row["coordinates"]]},
}
return feature
def export_df_as_geojson(df, filename):
features = [row_to_feature(row) for idx, row in df.iterrows()]
feature_collection = {
"type": "FeatureCollection",
"crs": {"type": "name", "properties": {"name": "urn:ogc:def:crs:EPSG::32720"}},
"features": features,
}
output_geojson = json.dumps(feature_collection)
with open(f"{filename}", "w") as f:
f.write(output_geojson)
print(f"GeoJSON data exported to '{filename}' file.")
def convert_id_to_string(prefix, x):
return prefix + str(x)
def postprocess(prediction_geojson_path, store_path):
with open(prediction_geojson_path,"r",) as file:
prediction_data = json.load(file)
df = turn_into_dataframe(prediction_data)
df["id"] = df.index
df['Confidence_score'] = df['Confidence_score'].astype(float)
df["id"] = df["id"].apply(lambda x: convert_id_to_string("df_", x))
df["to_drop"] = False
df["to_merge"] = False
print(f"Number of polygons before postprocessing: {len(df)}")
df_res = process([df])
print(f"Number of polygons after postprocessing: {len(df_res)}")
export_df_as_geojson(df=df_res, filename=store_path)
return df_res