bpiyush's picture
Upload folder using huggingface_hub
eafbf97 verified
"""Helpers for metric functions"""
import numpy as np
import pandas as pd
def calculate_iou(box1, box2):
"""
Calculate Intersection over Union (IoU) between two bounding boxes.
Args:
box1 (tuple): Coordinates of the first bounding box in the format (x1, y1, x2, y2).
box2 (tuple): Coordinates of the second bounding box in the format (x1, y1, x2, y2).
Returns:
float: Intersection over Union (IoU) score.
"""
# Extract coordinates
x1, y1, x2, y2 = box1
x1_, y1_, x2_, y2_ = box2
# Calculate the intersection area
intersection_area = max(0, min(x2, x2_) - max(x1, x1_)) * max(0, min(y2, y2_) - max(y1, y1_))
# Calculate the areas of each bounding box
box1_area = (x2 - x1) * (y2 - y1)
box2_area = (x2_ - x1_) * (y2_ - y1_)
# Calculate IoU
iou = intersection_area / float(box1_area + box2_area - intersection_area)
return iou
def compute_intersection_1d(x, y):
# sort the boxes
x1, x2 = sorted(x)
y1, y2 = sorted(y)
# compute the intersection
intersection = max(0, min(x2, y2) - max(x1, y1))
return intersection
def compute_union_1d(x, y):
# sort the boxes
x1, x2 = sorted(x)
y1, y2 = sorted(y)
# compute the union
union = max(x2, y2) - min(x1, y1)
return union
def compute_iou_1d(pred_box, true_box):
"""
Compute IoU for 1D boxes.
Args:
pred_box (float): Predicted box, [x1, x2]
true_box (float): Ground truth box, [x1, x2]
Returns:
float: IoU
"""
intersection = compute_intersection_1d(pred_box, true_box)
union = compute_union_1d(pred_box, true_box)
iou = intersection / union
return iou
def compute_iou_1d_single_candidate_multiple_targets(pred_box, true_boxes):
"""
Compute IoU for 1D boxes.
Args:
pred_box (float): Predicted box, [x1, x2]
true_boxes (np.ndarray): Ground truth boxes, shape: (N, 2)
Returns:
float: IoU
"""
ious = []
for i, true_box in enumerate(true_boxes):
ious.append(compute_iou_1d(pred_box, true_box))
return np.array(ious)
def compute_iou_1d_multiple_candidates_multiple_targets(pred_boxes, true_boxes):
"""
Compute IoU for 1D boxes.
Args:
pred_boxes (np.ndarray): Predicted boxes, shape: (N, 2)
true_boxes (np.ndarray): Ground truth boxes, shape: (N, 2)
Returns:
float: IoU
"""
iou_matrix = np.zeros((len(pred_boxes), len(true_boxes)))
for i, pred_box in enumerate(pred_boxes):
for j, true_box in enumerate(true_boxes):
iou_matrix[i, j] = compute_iou_1d(pred_box, true_box)
return iou_matrix
def compute_mean_iou_1d(pred_boxes, gt_boxes, threshold=0.5):
"""
Computes mean IOU for 1D bounding boxes.
Args:
pred_boxes (np.ndarray): Predicted boxes, shape: (N, 2)
gt_boxes (np.ndarray): Ground truth boxes, shape: (N, 2)
threshold (float): Threshold to consider a prediction correct
Returns:
float: Mean IOU
"""
# Compute IoU for each pair of boxes
iou_matrix = np.zeros((len(pred_boxes), len(gt_boxes)))
for i, pred_box in enumerate(pred_boxes):
for j, gt_box in enumerate(gt_boxes):
iou_matrix[i, j] = compute_iou_1d(pred_box, gt_box)
# Compute the max IoU for each predicted box
max_iou_indices = np.argmax(iou_matrix, axis=1)
max_iou = iou_matrix[np.arange(len(pred_boxes)), max_iou_indices]
# For each predicted box, compute TP and FP ground truth boxes
tp = np.zeros(len(pred_boxes))
fp = np.zeros(len(pred_boxes))
iou = np.zeros(len(pred_boxes))
tp = np.where(iou_matrix >= threshold, 1, 0)
tp = max_iou >= threshold
fp = max_iou < threshold
iou = max_iou
mean_iou = np.mean(iou)
import ipdb; ipdb.set_trace()
def calculate_mAP_1d(pred_boxes, pred_scores, true_boxes, iou_thresh=0.5):
"""Calculate mean average precision for 1D boxes.
Args:
pred_boxes (numpy array): Predicted boxes, shape (num_boxes,)
pred_scores (numpy array): Predicted scores, shape (num_boxes,)
true_boxes (numpy array): Ground truth boxes, shape (num_boxes,)
iou_thresh (float): IoU threshold to consider a prediction correct
Returns:
float: Mean average precision (mAP)
"""
# Sort predicted boxes by score (in descending order)
sort_inds = np.argsort(pred_scores)[::-1]
pred_boxes = pred_boxes[sort_inds]
pred_scores = pred_scores[sort_inds]
# Compute true positives and false positives at each threshold
tp = np.zeros(len(pred_boxes))
fp = np.zeros(len(pred_boxes))
for i, box in enumerate(pred_boxes):
ious = np.abs(box - true_boxes) / np.maximum(1e-9, np.abs(box) + np.abs(true_boxes))
if len(ious) > 0:
max_iou_idx = np.argmax(ious)
if ious[max_iou_idx] >= iou_thresh:
if tp[max_iou_idx] == 0:
tp[i] = 1
fp[i] = 0
else:
fp[i] = 1
else:
fp[i] = 1
# Compute precision and recall at each threshold
tp_cumsum = np.cumsum(tp)
fp_cumsum = np.cumsum(fp)
recall = tp_cumsum / len(true_boxes)
precision = tp_cumsum / (tp_cumsum + fp_cumsum)
# Compute AP as area under precision-recall curve
ap = 0
for t in np.arange(0, 1.1, 0.1):
if np.sum(recall >= t) == 0:
p = 0
else:
p = np.max(precision[recall >= t])
ap += p / 11
return ap
def segment_iou(target_segment, candidate_segments):
"""Compute the temporal intersection over union between a
target segment and all the test segments.
Parameters
----------
target_segment : 1d array
Temporal target segment containing [starting, ending] times.
candidate_segments : 2d array
Temporal candidate segments containing N x [starting, ending] times.
Outputs
-------
tiou : 1d array
Temporal intersection over union score of the N's candidate segments.
"""
tt1 = np.maximum(target_segment[0], candidate_segments[:, 0])
tt2 = np.minimum(target_segment[1], candidate_segments[:, 1])
# Intersection including Non-negative overlap score.
segments_intersection = (tt2 - tt1).clip(0)
# Segment union.
segments_union = (candidate_segments[:, 1] - candidate_segments[:, 0]) \
+ (target_segment[1] - target_segment[0]) - segments_intersection
# Compute overlap as the ratio of the intersection
# over union of two segments.
tIoU = segments_intersection.astype(float) / segments_union
return tIoU
def interpolated_prec_rec(prec, rec):
"""Interpolated AP - VOCdevkit from VOC 2011.
"""
mprec = np.hstack([[0], prec, [0]])
mrec = np.hstack([[0], rec, [1]])
for i in range(len(mprec) - 1)[::-1]:
mprec[i] = max(mprec[i], mprec[i + 1])
idx = np.where(mrec[1::] != mrec[0:-1])[0] + 1
ap = np.sum((mrec[idx] - mrec[idx - 1]) * mprec[idx])
return ap
from tqdm import tqdm
def compute_average_precision_detection(
ground_truth,
prediction,
tiou_thresholds=np.linspace(0.5, 0.95, 10),
):
"""Compute average precision (detection task) between ground truth and
predictions data frames. If multiple predictions occurs for the same
predicted segment, only the one with highest score is matches as
true positive. This code is greatly inspired by Pascal VOC devkit.
Ref: https://github.com/zhang-can/CoLA/blob/\
d21f1b5a4c6c13f9715cfd4ac1ebcd065d179157/eval/eval_detection.py#L200
Parameters
----------
ground_truth : df
Data frame containing the ground truth instances.
Required fields: ['video-id', 't-start', 't-end']
prediction : df
Data frame containing the prediction instances.
Required fields: ['video-id, 't-start', 't-end', 'score']
tiou_thresholds : 1darray, optional
Temporal intersection over union threshold.
Outputs
-------
ap : float
Average precision score.
"""
ap = np.zeros(len(tiou_thresholds))
if prediction.empty:
return ap
npos = float(len(ground_truth))
lock_gt = np.ones((len(tiou_thresholds),len(ground_truth))) * -1
# Sort predictions by decreasing score order.
sort_idx = prediction['score'].values.argsort()[::-1]
prediction = prediction.loc[sort_idx].reset_index(drop=True)
# Initialize true positive and false positive vectors.
tp = np.zeros((len(tiou_thresholds), len(prediction)))
fp = np.zeros((len(tiou_thresholds), len(prediction)))
# Adaptation to query faster
ground_truth_gbvn = ground_truth.groupby('video-id')
# Assigning true positive to truly grount truth instances.
for idx, this_pred in prediction.iterrows():
try:
# Check if there is at least one ground truth in the video associated.
ground_truth_videoid = ground_truth_gbvn.get_group(this_pred['video-id'])
except Exception as e:
fp[:, idx] = 1
continue
this_gt = ground_truth_videoid.reset_index()
tiou_arr = segment_iou(this_pred[['t-start', 't-end']].values,
this_gt[['t-start', 't-end']].values)
# We would like to retrieve the predictions with highest tiou score.
tiou_sorted_idx = tiou_arr.argsort()[::-1]
for tidx, tiou_thr in enumerate(tiou_thresholds):
for jdx in tiou_sorted_idx:
if tiou_arr[jdx] < tiou_thr:
fp[tidx, idx] = 1
break
if lock_gt[tidx, this_gt.loc[jdx]['index']] >= 0:
continue
# Assign as true positive after the filters above.
tp[tidx, idx] = 1
lock_gt[tidx, this_gt.loc[jdx]['index']] = idx
break
if fp[tidx, idx] == 0 and tp[tidx, idx] == 0:
fp[tidx, idx] = 1
tp_cumsum = np.cumsum(tp, axis=1).astype(float)
fp_cumsum = np.cumsum(fp, axis=1).astype(float)
recall_cumsum = tp_cumsum / npos
precision_cumsum = tp_cumsum / (tp_cumsum + fp_cumsum)
for tidx in range(len(tiou_thresholds)):
ap[tidx] = interpolated_prec_rec(precision_cumsum[tidx,:], recall_cumsum[tidx,:])
return ap
def ap_wrapper(
true_clips,
pred_clips,
pred_scores,
tiou_thresholds=np.linspace(0.5, 0.95, 10),
):
assert isinstance(true_clips, np.ndarray)
assert len(true_clips.shape) == 2 and true_clips.shape[1] == 2
assert isinstance(pred_clips, np.ndarray)
assert len(pred_clips.shape) == 2 and pred_clips.shape[1] == 2
assert isinstance(pred_scores, np.ndarray)
assert len(pred_scores.shape) == 1 and len(pred_scores) == pred_clips.shape[0]
true_df = pd.DataFrame(
{
"video-id": ["video1"] * len(true_clips),
"t-start": true_clips[:, 0],
"t-end": true_clips[:, 1],
}
)
pred_df = pd.DataFrame(
{
"video-id": ["video1"] * len(pred_clips),
"t-start": pred_clips[:, 0],
"t-end": pred_clips[:, 1],
"score": pred_scores,
}
)
return compute_average_precision_detection(
true_df,
pred_df,
tiou_thresholds=tiou_thresholds,
)
def nms_1d(df: pd.DataFrame, score_col="score", iou_thresh=0.5):
"""Applies NMS on 1D (start, end) box predictions."""
columns = set(df.columns)
# assert columns == set(["video_id", "start", "end", "score"])
assert set(["start", "end", "video_id", score_col]).issubset(columns)
video_ids = df["video_id"].unique()
# Group by video_id
groups = df.groupby("video_id")
# Loop over videos
keep_indices = []
net_success_fraction = []
tqdm._instances.clear()
iterator = tqdm(
video_ids,
desc="Applying NMS to each video",
bar_format='{l_bar}{bar:10}{r_bar}{bar:-10b}',
)
for video_id in iterator:
# Get rows for this video
rows = groups.get_group(video_id)
# Sort by score
rows = rows.sort_values(score_col, ascending=False)
# Loop over rows until empty
n_clips = len(rows)
n_clips_selected_in_video = 0
while len(rows):
# Add top row to keep_indices
top_row = rows.iloc[0]
keep_indices.append(rows.index[0])
n_clips_selected_in_video += 1
top_row = top_row.to_dict()
top_segment = np.array([top_row["start"], top_row["end"]])
rows = rows.iloc[1:]
other_segments = rows[["start", "end"]].values
iou_values = segment_iou(top_segment, other_segments)
# Remove rows IoU > iou_thresh
rows = rows[iou_values < iou_thresh]
net_success_fraction.append(n_clips_selected_in_video / n_clips)
net_success_fraction = np.array(net_success_fraction).mean()
print("> Net success fraction: {:.2f}".format(net_success_fraction))
return keep_indices
if __name__ == "__main__":
true_clips = np.array(
[
[0.1, 0.7],
[3.4, 7.8],
[3.9, 5.4],
]
)
pred_clips = np.array(
[
[0.2, 0.8],
[3.5, 7.9],
[3.9, 5.4],
[5.6, 6.7],
[6.0, 6.5],
],
)
pred_scores = np.array([0.9, 0.8, 0.7, 0.6, 0.5])
# 1. Check IoU for a single pair of boxes
iou = compute_iou_1d(pred_clips[0], true_clips[0])
# Manually check that the result is correct
# Clips are [0.1, 0.7] and [0.2, 0.8]
# Intersection: [0.2, 0.7] - length = 0.5
# Union: [0.1, 0.8] - length = 0.7
# Ratio: 0.5 / 0.7 = 0.714
assert np.isclose(iou, 0.714, 3), "Incorrect IoU"
# 2. Check IoU for a single predicted box and multiple ground truth boxes
ious = compute_iou_1d_single_candidate_multiple_targets(pred_clips[0], true_clips)
assert np.allclose(ious, [0.714, 0.0, 0.0], 3), "Incorrect IoU"
# 3. Check mean IoU for multiple predicted boxes and multiple ground truth boxes
ious = compute_iou_1d_multiple_candidates_multiple_targets(pred_clips, true_clips)
assert ious.shape == (5, 3), "Incorrect shape"
ap = ap_wrapper(
true_clips,
pred_clips,
pred_scores,
tiou_thresholds=np.linspace(0.5, 0.95, 3),
)
# Take the mean of the APs across IoU thresholds
final_ap = np.mean(ap)
import ipdb; ipdb.set_trace()