Spaces:
Sleeping
Sleeping
import numpy as np | |
import pandas as pd | |
import os | |
import glob | |
import random | |
import matplotlib.pyplot as plt | |
import cv2 | |
import plotly.express as px | |
from annoy import AnnoyIndex | |
from PIL import Image | |
from tqdm import tqdm | |
# https://github.com/erikbern/ann-presentation/blob/master/cifar.py | |
# https://www.slideshare.net/erikbern/approximate-nearest-neighbor-methods-and-vector-models-nyc-ml-meetup | |
# https://erikbern.com/2015/10/01/nearest-neighbors-and-vector-models-part-2-how-to-search-in-high-dimensional-spaces.html | |
# t-SNE space | |
def get_top_n_dissimilar_samples_path(embeddings,embeddings_id_list,test_size_ratio =0.1, annoy_path=None): | |
if annoy_path is None: | |
build_annoy_tree(embeddings, embeddings_id_list,annoy_path, n_trees=50) | |
annoy_tree = load_annoy_tree(embeddings.shape[1],annoy_path) | |
dist_map = create_distance_map(annoy_tree,embeddings_id_list) | |
sorted_isolation_values, sorted_indices = get_isolated_elements_from_distance_map(dist_map,embeddings_id_list) | |
_, test_paths = splitListByIsolationValues(embeddings_id_list, sorted_indices, test_size_ratio) | |
return test_paths | |
def build_annoy_tree(embeddings, embeddings_id_list,save_filename, n_trees=50): | |
tree = AnnoyIndex(embeddings.shape[1], 'euclidean') | |
ntree = n_trees | |
# add all items | |
for path, vector in zip(list(range(len(embeddings_id_list))),embeddings): | |
tree.add_item(path, vector) | |
# build tree | |
tree.build(ntree) | |
tree.save(save_filename) | |
def load_annoy_tree(embeddings_dim,annoy_fn): | |
a = AnnoyIndex(embeddings_dim, 'euclidean') | |
a.load(annoy_fn) | |
return a | |
def create_distance_map(annoy_tree,embeddings_id_list): | |
# generate distance map | |
distance_map = np.zeros((len(embeddings_id_list),len(embeddings_id_list)),np.float32) | |
for i in tqdm(range(len(embeddings_id_list))): | |
for j in range(len(embeddings_id_list)): | |
distance_map[i,j] = annoy_tree.get_distance(i,j) | |
return distance_map | |
def get_isolated_elements_from_distance_map(distance_map,embeddings_id_list): | |
# Now, sample n percent of the ones with maximum distances to closest neighbors. Isolated ones. | |
test_samples = np.where(distance_map == 0, 500, distance_map) | |
isolation_values = np.min(test_samples,1) | |
# get results in descending order | |
sorted_isolation_values, sorted_indices = zip(*sorted(zip(isolation_values, list(range(len(embeddings_id_list)))),reverse=True)) | |
return sorted_isolation_values, sorted_indices | |
def splitListByIsolationValues(lst, sorted_indices, test_part=0.1): | |
# TEST_SIZE = 0.05 # Percentage of test data from all | |
# train_paths, test_paths = splitListByIsolationValues(train_id_list, sorted_indices, TEST_SIZE) | |
# print(len(train_paths)) | |
# print(len(test_paths)) | |
n_test = int(len(lst)*test_part) | |
indices_test = sorted_indices[:n_test] | |
indices_train = sorted_indices[n_test:] | |
lst_train = [lst[ind] for ind in indices_train] | |
lst_test = [lst[ind] for ind in indices_test] | |
return lst_train, lst_test | |
# tree = AnnoyIndex(train_tsne_2d.shape[1], 'euclidean') | |
# ntree = 50 | |
# # add all items | |
# for path, vector in zip(list(range(len(train_id_list))),train_tsne_2d): | |
# tree.add_item(path, vector) | |
# # build tree | |
# _ = tree.build(ntree) | |
# # generate distance map | |
# distance_map = np.zeros((len(train_id_list),len(train_id_list)),np.float32) | |
# for i in tqdm(range(len(train_id_list))): | |
# for j in range(len(train_id_list)): | |
# distance_map[i,j] = tree.get_distance(i,j) | |
# # Now, sample n percent of the ones with maximum distances to closest neighbors. Isolated ones. | |
# test_samples = np.where(distance_map == 0, 500, distance_map) | |
# isolation_values = np.min(test_samples,1) | |
# # get results in descending order | |
# sorted_isolation_values, sorted_indices = zip(*sorted(zip(isolation_values, list(range(len(train_id_list)))),reverse=True)) | |
# print(sorted_isolation_values[:5],sorted_indices[:5]) | |
# #Plot some of the images and compare them to rest of the set to see if there are any similar samples. | |
# for isolated_id in sorted_indices[:10]: | |
# plot_n_similar(isolated_id,4) | |
# plt.show() | |
# TEST_SIZE = 0.05 # Percentage of test data from all | |
# train_paths, test_paths = splitListByIsolationValues(train_id_list, sorted_indices, TEST_SIZE) | |
# print(len(train_paths)) | |
# print(len(test_paths)) | |
# def build(fn, f, fun): # lol @ parameters :) | |
# a = annoy.AnnoyIndex(f, 'euclidean') | |
# i = 0 | |
# for pixels, label in read_cifar(): | |
# a.add_item(i, fun(pixels)) | |
# i += 1 | |
# if i % 1000 == 0: | |
# print i, '...' | |
# a.build(100) | |
# a.save(fn) | |
# def build_annoy_tree(): | |
# annoy_fn = 'mnist.annoy' | |
# data_fn = 'mnist.pkl.gz' | |
# if not os.path.exists(annoy_fn): | |
# if not os.path.exists(data_fn): | |
# print 'downloading' | |
# urlretrieve('http://deeplearning.net/data/mnist/mnist.pkl.gz', data_fn) | |
# a = annoy.AnnoyIndex(784, 'euclidean') | |
# for i, pic in util.get_vectors(data_fn): | |
# a.add_item(i, pic) | |
# print 'building' | |
# a.build(10) | |
# a.save(annoy_fn) | |
def scatter_thumbnails_train_test(data, image_paths, train_labels, test_paths, zoom=0.3, | |
colors=None, xlabel='PCA dimension 1', | |
ylabel='PCA dimension 2'): | |
# assert len(data) == len(image_paths) | |
# reduce embedding dimensions to 2 | |
# x = PCA(n_components=2).fit_transform(data) #if len(data[0]) > 2 else data | |
x = data | |
tmp_colors = ['y', 'g', 'b', 'c'] | |
f = plt.figure(figsize=(22, 15)) | |
ax = plt.subplot(aspect='equal') | |
np_label = np.array(train_labels) | |
cls_categories = ['CNV', 'DRUSEN', 'DME', 'NORMAL'] | |
for cls,clr in zip(cls_categories,tmp_colors): | |
indices = np_label==cls | |
ax.scatter(data[indices,0],data[indices,1], c=clr, label = cls ,alpha=0.5, s=4) | |
_ = ax.axis('tight') | |
ax.set_xlabel(xlabel, fontsize=14) | |
ax.set_ylabel(ylabel, fontsize=14) | |
ax.legend(fontsize='large', markerscale=2) | |
# create a scatter plot. | |
# f = plt.figure(figsize=(22, 15)) | |
# ax = plt.subplot(aspect='equal') | |
# sc = ax.scatter(x[:,0], x[:,1], s=4) | |
# #_ = ax.axis('off') | |
# _ = ax.axis('tight') | |
# ax.set_xlabel(xlabel, fontsize=14) | |
# ax.set_ylabel(ylabel, fontsize=14) | |
# add thumbnails :) | |
from matplotlib.offsetbox import OffsetImage, AnnotationBbox | |
for i in range(len(image_paths)): | |
isTest = image_paths[i] in test_paths | |
if isTest: | |
image = get_img(image_paths[i].replace("F:/","E:/"), thumbnail=True) | |
if not (len(image.shape))==1: | |
im = OffsetImage(image, cmap='gray',zoom=zoom if isTest else zoom-0.2) | |
bboxprops = dict(edgecolor= 'red' if isTest else 'gray') | |
ab = AnnotationBbox(im, x[i], xycoords='data', | |
frameon=(bboxprops is not None), | |
pad=0.0, | |
bboxprops=bboxprops) | |
ax.add_artist(ab) | |
return ax | |
# _ = scatter_thumbnails_train_test(train_tsne_2d, train_id_list, test_paths, | |
# zoom=0.2, xlabel="Dimension 1", ylabel="Dimension 2") | |
# plt.title('2D t-SNE Visualization of Sampled Data (OCT2017 Train) - RGB=Picked') | |
# plt.show() | |
def splitListByIsolationValues(lst, sorted_indices, test_part=0.1): | |
n_test = int(len(lst)*test_part) | |
indices_test = sorted_indices[:n_test] | |
indices_train = sorted_indices[n_test:] | |
lst_train = [lst[ind] for ind in indices_train] | |
lst_test = [lst[ind] for ind in indices_test] | |
return lst_train, lst_test | |
def plot_random_samples(paths, n=5): | |
f, ax = plt.subplots(1,5,figsize=(20,5)) | |
for i in range(n): | |
rand_index = random.randint(0,len(paths)-1) | |
ax[i].imshow(plt.imread(paths[rand_index])) | |
def get_img(fn ,thumbnail=False): | |
img = Image.open(fn) | |
if thumbnail: | |
img.thumbnail((100,100)) | |
#print(img.size) | |
return np.array(img)[:,:] | |
def plot_n_similar(annoy_tree,train_id_list,train_labels,seed_id,n, scale=5): | |
ids, dists = annoy_tree.get_nns_by_item(seed_id, n+1, search_k=-1, include_distances=True) | |
f,ax = plt.subplots(1,n+1,figsize=((n+1)*scale,scale)) | |
for i,_id in enumerate(ids): | |
img_id = _id if i != 0 else seed_id | |
ax[i].imshow(get_img(train_id_list[img_id]),cmap='gray') | |
title = "ID:{0}\nDistance: {1:.3f}\nLabel:{2}".format(img_id,dists[i],train_labels[img_id]) if i != 0 else "SEED ID:{0}\nLabel:{1}".format(img_id,train_labels[img_id]) | |
ax[i].set_title(title,fontsize=12) | |
f.suptitle("Images similar to seed_id {0}".format(seed_id),fontsize=18) | |
plt.subplots_adjust(top=0.97) | |
# plot_n_similar(5) | |
# def match_gallery_2_query(save_dir): | |
# gallery_emb = np.load(os.path.join(save_dir, 'gallery_embedding.npy')) | |
# query_emb = np.load(os.path.join(save_dir, 'query_embedding.npy')) | |
# gallery_ids = np.load(os.path.join(save_dir, 'gallery_ids.npy')) | |
# query_ids = np.load(os.path.join(save_dir, 'query_ids.npy')) | |
# query_results = [] | |
# get_closest = None | |
# if matching_method == 'annoy': | |
# annoy_metric = 'hamming' if gallery_emb.dtype == np.bool else 'angular' | |
# annoy_f = AnnoyIndex(gallery_emb.shape[1], annoy_metric) | |
# for i in range(gallery_emb.shape[0]): | |
# annoy_f.add_item(i, gallery_emb[i]) | |
# annoy_f.build(10) # number of trees | |
# def annoy_matching(query_item, query_index, n=10): | |
# return annoy_f.get_nns_by_vector(query_item, n) | |
# get_closest = annoy_matching | |
# elif matching_method == 'knn': | |
# #distances = distance.cdist(query_emb, gallery_emb, 'cosine') | |
# #sorted_dist = np.argsort(distances, axis=1) | |
# def knn_matching(query_item, query_index, n=10): | |
# distances = distance.cdist((query_emb[query_index]).reshape(1,-1), gallery_emb, 'cosine') | |
# sorted_dist = np.argsort(distances, axis=1) | |
# return sorted_dist[0,:n] | |
# get_closest = knn_matching | |
# else: | |
# raise Exception(f'{FLAGS.matching_method} not implemented in matching') | |
# for i, query_item in tqdm(enumerate(query_emb),'Finding matches...'): | |
# closest_idxs = get_closest(query_item, i, 10) | |
# closest_fns = [gallery_ids[close_i] for close_i in closest_idxs] | |
# beginning = f'{query_ids[i]},' + '{' | |
# line = ','.join(closest_fns) | |
# end = '}' | |
# query_results.append(beginning + line + end) | |
# sub_fn = os.path.join(save_dir, 'submission.csv') | |
# with open(sub_fn, 'w') as f: | |
# f.writelines("%s\n" % l for l in query_results) | |
# plot_submission(sub_fn, FLAGS.testdata_dir, save_dir) |