|
import numpy as np |
|
import gzip |
|
from pathlib import Path |
|
|
|
import cv2 |
|
|
|
import skimage.morphology |
|
import skimage.filters.rank |
|
import skimage.util |
|
|
|
from tensorflow.keras import backend as K |
|
import tensorflow as tf |
|
import keras |
|
|
|
from aix import logger |
|
import aix.constants as C |
|
|
|
|
|
|
|
def dice_coef(y_true, y_pred, smooth = .0001): |
|
|
|
intersection = K.sum(y_true * y_pred, axis = [1, 2, 3]) |
|
union = K.sum(y_true, axis = [1, 2, 3]) + K.sum(y_pred, axis = [1, 2, 3]) |
|
dice = K.mean((2. * intersection + smooth) / (union + smooth), axis = 0) |
|
|
|
return dice |
|
|
|
def harden(y, threshold=0.5): |
|
y2 = tf.where(y>threshold,1.,0.) |
|
return y2 |
|
|
|
@keras.saving.register_keras_serializable(package="aix.utils") |
|
def hardened_dice_coef(y_true, y_pred, smooth = .0001): |
|
y_true2 = harden(y_true) |
|
y_pred2 = harden(y_pred) |
|
return dice_coef(y_true2,y_pred2) |
|
|
|
def dice_coef_loss(y_true, y_pred): |
|
|
|
loss = - dice_coef(y_true, y_pred) |
|
|
|
return loss |
|
|
|
def local_entropy(im, kernel_size=5, normalize=True): |
|
kernel=skimage.morphology.disk(kernel_size) |
|
entr_img = skimage.filters.rank.entropy(skimage.util.img_as_ubyte(im), kernel) |
|
if normalize: |
|
max_img = np.max(entr_img) |
|
entr_img = (entr_img*255/max_img).astype(np.uint8) |
|
return entr_img |
|
|
|
def calc_dim(contour): |
|
c_0 = [ point[0][0] for point in contour] |
|
c_1 = [ point[0][1] for point in contour] |
|
return (min(c_0), max(c_0), min(c_1), max(c_1)) |
|
|
|
def calc_size(dim): |
|
return (dim[1] - dim[0]) * (dim[3] - dim[2]) |
|
|
|
def calc_dist(dim1, dim2): |
|
return None |
|
|
|
def extract_roi(img, threshold=135, kernel_size=5, min_fratio=.3, max_sratio=5, filled=True, border=.01): |
|
|
|
entr_img = local_entropy(img, kernel_size=kernel_size) |
|
_, mask = cv2.threshold(entr_img, threshold, 255, cv2.THRESH_BINARY) |
|
|
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) |
|
|
|
contours_d = [calc_dim(c) for c in contours] |
|
contours_sizes = [calc_size(c) for c in contours_d] |
|
contour_indices = np.argsort(contours_sizes)[::-1] |
|
|
|
|
|
fratio = min_fratio |
|
sratio = max_sratio |
|
idx = -1 |
|
while fratio<=min_fratio or sratio>=max_sratio: |
|
idx += 1 |
|
biggest = contour_indices[idx] |
|
filled_mask = np.zeros(img.shape, dtype=np.uint8) |
|
filled_mask = cv2.fillPoly(filled_mask, [contours[biggest]], 255) |
|
fratio = filled_mask.sum()/255/contours_sizes[biggest] |
|
cdim = contours_d[biggest] |
|
sratio = (cdim[3]-cdim[2])/(cdim[1]-cdim[0]) |
|
if sratio<1: sratio = 1 / sratio |
|
|
|
|
|
|
|
filled_mask = np.zeros(img.shape, dtype=np.uint8) |
|
|
|
extra = ( int(img.shape[0] * border) , int(img.shape[1] * border) ) |
|
origin = (max(0, cdim[0]-extra[1]), max(0, cdim[2]-extra[0])) |
|
to = (min(img.shape[1]-1 , cdim[1]+extra[1]), min(img.shape[0]-1 , cdim[3]+extra[0])) |
|
|
|
if filled: |
|
filled_mask = cv2.rectangle(filled_mask, origin, to, 255, -1) |
|
else: |
|
filled_mask = cv2.rectangle(filled_mask, origin, to, 255, 2) |
|
|
|
return filled_mask, origin, to |
|
|
|
def preprocessor(input_img, img_rows, img_cols): |
|
""" |
|
Resize input images to constants sizes |
|
:param input_img: numpy array of images |
|
:return: numpy array of preprocessed images |
|
""" |
|
logger.debug("Preprocessing...") |
|
|
|
input_img = np.swapaxes(input_img, 2, 3) |
|
input_img = np.swapaxes(input_img, 1, 2) |
|
|
|
logger.debug("Input: " + str(input_img.shape)) |
|
|
|
output_img = np.ndarray((input_img.shape[0], input_img.shape[1], img_rows, img_cols), dtype = np.uint8) |
|
|
|
|
|
for i in range(input_img.shape[0]): |
|
output_img[i, 0] = cv2.resize(input_img[i, 0], (img_cols, img_rows), interpolation = cv2.INTER_AREA) |
|
|
|
|
|
output_img = np.swapaxes(output_img, 1, 2) |
|
output_img = np.swapaxes(output_img, 2, 3) |
|
|
|
logger.debug("Output: " + str(output_img.shape)) |
|
|
|
return output_img |
|
|
|
def load_train_data(imgs_path, masks_path): |
|
""" |
|
Load training data from project path |
|
:return: [X_train, y_train] numpy arrays containing the training data and their respective masks. |
|
""" |
|
|
|
logger.debug("\nLoading train data ...\n") |
|
|
|
X_train = np.load(gzip.open(imgs_path)) |
|
y_train = np.load(gzip.open(masks_path)) |
|
|
|
logger.debug(X_train.shape) |
|
logger.debug(y_train.shape) |
|
|
|
X_train = preprocessor(X_train, C.IMG_WIDTH, C.IMG_HEIGHT) |
|
y_train = preprocessor(y_train, C.IMG_WIDTH, C.IMG_HEIGHT) |
|
|
|
X_train = X_train.astype('float32')/255 |
|
|
|
mean = np.mean(X_train) |
|
std = np.std(X_train) |
|
|
|
X_train -= mean |
|
X_train /= std |
|
|
|
y_train = y_train.astype('float32') |
|
|
|
return X_train, y_train |
|
|
|
def process_data(X, y): |
|
|
|
logger.debug("\nLoading train data ...\n") |
|
|
|
logger.debug(X.shape) |
|
logger.debug(y.shape) |
|
|
|
X = preprocessor(X, C.IMG_WIDTH, C.IMG_HEIGHT) |
|
y = preprocessor(y, C.IMG_WIDTH, C.IMG_HEIGHT) |
|
|
|
X = X.astype('float32') |
|
y = y.astype('float32') |
|
|
|
return X, y |
|
|
|
def load_skin_train_data(imgs_path, masks_path, img_width, img_height): |
|
""" |
|
Load training data from project path |
|
:return: [X_train, y_train] numpy arrays containing the training data and their respective masks. |
|
""" |
|
|
|
logger.debug("\nLoading train data ...\n") |
|
|
|
X_train = np.load(gzip.open(imgs_path)) |
|
y_train = np.load(gzip.open(masks_path)) |
|
|
|
logger.debug(X_train.shape) |
|
logger.debug(y_train.shape) |
|
|
|
X_train = preprocessor(X_train, C.IMG_WIDTH, C.IMG_HEIGHT) |
|
y_train = preprocessor(y_train, C.IMG_WIDTH, C.IMG_HEIGHT) |
|
|
|
X_train = X_train.astype('float32') |
|
|
|
mean = np.mean(X_train) |
|
std = np.std(X_train) |
|
|
|
X_train -= mean |
|
X_train /= std |
|
|
|
y_train = y_train.astype('float32') |
|
y_train /= 255. |
|
|
|
return X_train, y_train |
|
|