haotongl
inital version
37cca23
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import argparse
from functools import partial
import json
import logging
import os
import sys
from typing import List, Optional
import torch
from torch.nn.functional import one_hot, softmax
import dinov2.distributed as distributed
from dinov2.data import SamplerType, make_data_loader, make_dataset
from dinov2.data.transforms import make_classification_eval_transform
from dinov2.eval.metrics import AccuracyAveraging, build_topk_accuracy_metric
from dinov2.eval.setup import get_args_parser as get_setup_args_parser
from dinov2.eval.setup import setup_and_build_model
from dinov2.eval.utils import ModelWithNormalize, evaluate, extract_features
logger = logging.getLogger("dinov2")
def get_args_parser(
description: Optional[str] = None,
parents: Optional[List[argparse.ArgumentParser]] = None,
add_help: bool = True,
):
parents = parents or []
setup_args_parser = get_setup_args_parser(parents=parents, add_help=False)
parents = [setup_args_parser]
parser = argparse.ArgumentParser(
description=description,
parents=parents,
add_help=add_help,
)
parser.add_argument(
"--train-dataset",
dest="train_dataset_str",
type=str,
help="Training dataset",
)
parser.add_argument(
"--val-dataset",
dest="val_dataset_str",
type=str,
help="Validation dataset",
)
parser.add_argument(
"--nb_knn",
nargs="+",
type=int,
help="Number of NN to use. 20 is usually working the best.",
)
parser.add_argument(
"--temperature",
type=float,
help="Temperature used in the voting coefficient",
)
parser.add_argument(
"--gather-on-cpu",
action="store_true",
help="Whether to gather the train features on cpu, slower"
"but useful to avoid OOM for large datasets (e.g. ImageNet22k).",
)
parser.add_argument(
"--batch-size",
type=int,
help="Batch size.",
)
parser.add_argument(
"--n-per-class-list",
nargs="+",
type=int,
help="Number to take per class",
)
parser.add_argument(
"--n-tries",
type=int,
help="Number of tries",
)
parser.set_defaults(
train_dataset_str="ImageNet:split=TRAIN",
val_dataset_str="ImageNet:split=VAL",
nb_knn=[10, 20, 100, 200],
temperature=0.07,
batch_size=256,
n_per_class_list=[-1],
n_tries=1,
)
return parser
class KnnModule(torch.nn.Module):
"""
Gets knn of test features from all processes on a chunk of the train features
Each rank gets a chunk of the train features as well as a chunk of the test features.
In `compute_neighbors`, for each rank one after the other, its chunk of test features
is sent to all devices, partial knns are computed with each chunk of train features
then collated back on the original device.
"""
def __init__(self, train_features, train_labels, nb_knn, T, device, num_classes=1000):
super().__init__()
self.global_rank = distributed.get_global_rank()
self.global_size = distributed.get_global_size()
self.device = device
self.train_features_rank_T = train_features.chunk(self.global_size)[self.global_rank].T.to(self.device)
self.candidates = train_labels.chunk(self.global_size)[self.global_rank].view(1, -1).to(self.device)
self.nb_knn = nb_knn
self.max_k = max(self.nb_knn)
self.T = T
self.num_classes = num_classes
def _get_knn_sims_and_labels(self, similarity, train_labels):
topk_sims, indices = similarity.topk(self.max_k, largest=True, sorted=True)
neighbors_labels = torch.gather(train_labels, 1, indices)
return topk_sims, neighbors_labels
def _similarity_for_rank(self, features_rank, source_rank):
# Send the features from `source_rank` to all ranks
broadcast_shape = torch.tensor(features_rank.shape).to(self.device)
torch.distributed.broadcast(broadcast_shape, source_rank)
broadcasted = features_rank
if self.global_rank != source_rank:
broadcasted = torch.zeros(*broadcast_shape, dtype=features_rank.dtype, device=self.device)
torch.distributed.broadcast(broadcasted, source_rank)
# Compute the neighbors for `source_rank` among `train_features_rank_T`
similarity_rank = torch.mm(broadcasted, self.train_features_rank_T)
candidate_labels = self.candidates.expand(len(similarity_rank), -1)
return self._get_knn_sims_and_labels(similarity_rank, candidate_labels)
def _gather_all_knn_for_rank(self, topk_sims, neighbors_labels, target_rank):
# Gather all neighbors for `target_rank`
topk_sims_rank = retrieved_rank = None
if self.global_rank == target_rank:
topk_sims_rank = [torch.zeros_like(topk_sims) for _ in range(self.global_size)]
retrieved_rank = [torch.zeros_like(neighbors_labels) for _ in range(self.global_size)]
torch.distributed.gather(topk_sims, topk_sims_rank, dst=target_rank)
torch.distributed.gather(neighbors_labels, retrieved_rank, dst=target_rank)
if self.global_rank == target_rank:
# Perform a second top-k on the k * global_size retrieved neighbors
topk_sims_rank = torch.cat(topk_sims_rank, dim=1)
retrieved_rank = torch.cat(retrieved_rank, dim=1)
results = self._get_knn_sims_and_labels(topk_sims_rank, retrieved_rank)
return results
return None
def compute_neighbors(self, features_rank):
for rank in range(self.global_size):
topk_sims, neighbors_labels = self._similarity_for_rank(features_rank, rank)
results = self._gather_all_knn_for_rank(topk_sims, neighbors_labels, rank)
if results is not None:
topk_sims_rank, neighbors_labels_rank = results
return topk_sims_rank, neighbors_labels_rank
def forward(self, features_rank):
"""
Compute the results on all values of `self.nb_knn` neighbors from the full `self.max_k`
"""
assert all(k <= self.max_k for k in self.nb_knn)
topk_sims, neighbors_labels = self.compute_neighbors(features_rank)
batch_size = neighbors_labels.shape[0]
topk_sims_transform = softmax(topk_sims / self.T, 1)
matmul = torch.mul(
one_hot(neighbors_labels, num_classes=self.num_classes),
topk_sims_transform.view(batch_size, -1, 1),
)
probas_for_k = {k: torch.sum(matmul[:, :k, :], 1) for k in self.nb_knn}
return probas_for_k
class DictKeysModule(torch.nn.Module):
def __init__(self, keys):
super().__init__()
self.keys = keys
def forward(self, features_dict, targets):
for k in self.keys:
features_dict = features_dict[k]
return {"preds": features_dict, "target": targets}
def create_module_dict(*, module, n_per_class_list, n_tries, nb_knn, train_features, train_labels):
modules = {}
mapping = create_class_indices_mapping(train_labels)
for npc in n_per_class_list:
if npc < 0: # Only one try needed when using the full data
full_module = module(
train_features=train_features,
train_labels=train_labels,
nb_knn=nb_knn,
)
modules["full"] = ModuleDictWithForward({"1": full_module})
continue
all_tries = {}
for t in range(n_tries):
final_indices = filter_train(mapping, npc, seed=t)
k_list = list(set(nb_knn + [npc]))
k_list = sorted([el for el in k_list if el <= npc])
all_tries[str(t)] = module(
train_features=train_features[final_indices],
train_labels=train_labels[final_indices],
nb_knn=k_list,
)
modules[f"{npc} per class"] = ModuleDictWithForward(all_tries)
return ModuleDictWithForward(modules)
def filter_train(mapping, n_per_class, seed):
torch.manual_seed(seed)
final_indices = []
for k in mapping.keys():
index = torch.randperm(len(mapping[k]))[:n_per_class]
final_indices.append(mapping[k][index])
return torch.cat(final_indices).squeeze()
def create_class_indices_mapping(labels):
unique_labels, inverse = torch.unique(labels, return_inverse=True)
mapping = {unique_labels[i]: (inverse == i).nonzero() for i in range(len(unique_labels))}
return mapping
class ModuleDictWithForward(torch.nn.ModuleDict):
def forward(self, *args, **kwargs):
return {k: module(*args, **kwargs) for k, module in self._modules.items()}
def eval_knn(
model,
train_dataset,
val_dataset,
accuracy_averaging,
nb_knn,
temperature,
batch_size,
num_workers,
gather_on_cpu,
n_per_class_list=[-1],
n_tries=1,
):
model = ModelWithNormalize(model)
logger.info("Extracting features for train set...")
train_features, train_labels = extract_features(
model, train_dataset, batch_size, num_workers, gather_on_cpu=gather_on_cpu
)
logger.info(f"Train features created, shape {train_features.shape}.")
val_dataloader = make_data_loader(
dataset=val_dataset,
batch_size=batch_size,
num_workers=num_workers,
sampler_type=SamplerType.DISTRIBUTED,
drop_last=False,
shuffle=False,
persistent_workers=True,
)
num_classes = train_labels.max() + 1
metric_collection = build_topk_accuracy_metric(accuracy_averaging, num_classes=num_classes)
device = torch.cuda.current_device()
partial_module = partial(KnnModule, T=temperature, device=device, num_classes=num_classes)
knn_module_dict = create_module_dict(
module=partial_module,
n_per_class_list=n_per_class_list,
n_tries=n_tries,
nb_knn=nb_knn,
train_features=train_features,
train_labels=train_labels,
)
postprocessors, metrics = {}, {}
for n_per_class, knn_module in knn_module_dict.items():
for t, knn_try in knn_module.items():
postprocessors = {
**postprocessors,
**{(n_per_class, t, k): DictKeysModule([n_per_class, t, k]) for k in knn_try.nb_knn},
}
metrics = {**metrics, **{(n_per_class, t, k): metric_collection.clone() for k in knn_try.nb_knn}}
model_with_knn = torch.nn.Sequential(model, knn_module_dict)
# ============ evaluation ... ============
logger.info("Start the k-NN classification.")
_, results_dict = evaluate(model_with_knn, val_dataloader, postprocessors, metrics, device)
# Averaging the results over the n tries for each value of n_per_class
for n_per_class, knn_module in knn_module_dict.items():
first_try = list(knn_module.keys())[0]
k_list = knn_module[first_try].nb_knn
for k in k_list:
keys = results_dict[(n_per_class, first_try, k)].keys() # keys are e.g. `top-1` and `top-5`
results_dict[(n_per_class, k)] = {
key: torch.mean(torch.stack([results_dict[(n_per_class, t, k)][key] for t in knn_module.keys()]))
for key in keys
}
for t in knn_module.keys():
del results_dict[(n_per_class, t, k)]
return results_dict
def eval_knn_with_model(
model,
output_dir,
train_dataset_str="ImageNet:split=TRAIN",
val_dataset_str="ImageNet:split=VAL",
nb_knn=(10, 20, 100, 200),
temperature=0.07,
autocast_dtype=torch.float,
accuracy_averaging=AccuracyAveraging.MEAN_ACCURACY,
transform=None,
gather_on_cpu=False,
batch_size=256,
num_workers=5,
n_per_class_list=[-1],
n_tries=1,
):
transform = transform or make_classification_eval_transform()
train_dataset = make_dataset(
dataset_str=train_dataset_str,
transform=transform,
)
val_dataset = make_dataset(
dataset_str=val_dataset_str,
transform=transform,
)
with torch.cuda.amp.autocast(dtype=autocast_dtype):
results_dict_knn = eval_knn(
model=model,
train_dataset=train_dataset,
val_dataset=val_dataset,
accuracy_averaging=accuracy_averaging,
nb_knn=nb_knn,
temperature=temperature,
batch_size=batch_size,
num_workers=num_workers,
gather_on_cpu=gather_on_cpu,
n_per_class_list=n_per_class_list,
n_tries=n_tries,
)
results_dict = {}
if distributed.is_main_process():
for knn_ in results_dict_knn.keys():
top1 = results_dict_knn[knn_]["top-1"].item() * 100.0
top5 = results_dict_knn[knn_]["top-5"].item() * 100.0
results_dict[f"{knn_} Top 1"] = top1
results_dict[f"{knn_} Top 5"] = top5
logger.info(f"{knn_} classifier result: Top1: {top1:.2f} Top5: {top5:.2f}")
metrics_file_path = os.path.join(output_dir, "results_eval_knn.json")
with open(metrics_file_path, "a") as f:
for k, v in results_dict.items():
f.write(json.dumps({k: v}) + "\n")
if distributed.is_enabled():
torch.distributed.barrier()
return results_dict
def main(args):
model, autocast_dtype = setup_and_build_model(args)
eval_knn_with_model(
model=model,
output_dir=args.output_dir,
train_dataset_str=args.train_dataset_str,
val_dataset_str=args.val_dataset_str,
nb_knn=args.nb_knn,
temperature=args.temperature,
autocast_dtype=autocast_dtype,
accuracy_averaging=AccuracyAveraging.MEAN_ACCURACY,
transform=None,
gather_on_cpu=args.gather_on_cpu,
batch_size=args.batch_size,
num_workers=5,
n_per_class_list=args.n_per_class_list,
n_tries=args.n_tries,
)
return 0
if __name__ == "__main__":
description = "DINOv2 k-NN evaluation"
args_parser = get_args_parser(description=description)
args = args_parser.parse_args()
sys.exit(main(args))