|
import os |
|
os.chdir("/data/public/GenNet") |
|
import sys |
|
import glob |
|
import numpy as np |
|
import pandas as pd |
|
|
|
import matplotlib |
|
matplotlib.use('agg') |
|
import tensorflow as tf |
|
import tensorflow.keras as K |
|
import scipy |
|
import tables |
|
|
|
tf.keras.backend.set_epsilon(0.0000001) |
|
tf_version = tf.__version__ |
|
if tf_version <= '1.13.1': |
|
from GenNet_utils.LocallyDirectedConnected import LocallyDirected1D |
|
print('= or less then 1.13.1: tensorflow version is', tf_version) |
|
elif tf_version >= '2.0': |
|
from GenNet_utils.LocallyDirectedConnected_tf2 import LocallyDirected1D |
|
print('= or more then 2.0: tensorflow version is', tf_version) |
|
else: |
|
print("unexpected tensorflow version") |
|
from GenNet_utils.LocallyDirectedConnected_tf2 import LocallyDirected1D |
|
|
|
studyname = 'test_GN_ref' |
|
|
|
def layer_block(model, mask, i, regression): |
|
|
|
if regression: |
|
activation_type="relu" |
|
else: |
|
activation_type="tanh" |
|
|
|
model = LocallyDirected1D(mask=mask, filters=1, input_shape=(mask.shape[0], 1), |
|
name="LocallyDirected_" + str(i))(model) |
|
model = K.layers.Flatten()(model) |
|
|
|
model = K.layers.Activation(activation_type)(model) |
|
model = K.layers.BatchNormalization(center=False, scale=False)(model) |
|
return model |
|
|
|
|
|
def add_covariates(model, input_cov, num_covariates, regression, negative_values_ytrain, mean_ytrain): |
|
if num_covariates > 0: |
|
model = activation_layer(model, regression, negative_values_ytrain) |
|
model = K.layers.concatenate([model, input_cov], axis=1) |
|
model = K.layers.BatchNormalization(center=False, scale=False)(model) |
|
model = K.layers.Dense(units=1, bias_initializer= tf.keras.initializers.Constant(mean_ytrain))(model) |
|
return model |
|
|
|
|
|
def activation_layer(model, regression, negative_values_ytrain): |
|
|
|
if regression: |
|
if negative_values_ytrain: |
|
model = K.layers.Activation("linear")(model) |
|
print('using a linear activation function') |
|
else: |
|
model = K.layers.Activation("relu")(model) |
|
print('using a relu activation function') |
|
else: |
|
model = K.layers.Activation("sigmoid")(model) |
|
|
|
return model |
|
|
|
def create_network_from_npz(datapath, |
|
inputsize, |
|
genotype_path, |
|
l1_value=0.01, |
|
regression=False, |
|
num_covariates=0, |
|
mask_order = []): |
|
print("Creating networks from npz masks") |
|
print("regression", regression) |
|
if regression: |
|
mean_ytrain, negative_values_ytrain = regression_properties(datapath) |
|
else: |
|
mean_ytrain = 0 |
|
negative_values_ytrain = False |
|
|
|
masks = [] |
|
mask_shapes_x = [] |
|
mask_shapes_y = [] |
|
|
|
print(mask_order) |
|
|
|
if len(mask_order) > 0: |
|
for mask in mask_order: |
|
mask = scipy.sparse.load_npz(datapath + '/'+str(mask)+'.npz') |
|
masks.append(mask) |
|
mask_shapes_x.append(mask.shape[0]) |
|
mask_shapes_y.append(mask.shape[1]) |
|
|
|
for x in range(len(masks) - 1): |
|
assert mask_shapes_y[x] == mask_shapes_x[x + 1] |
|
else: |
|
|
|
for npz_path in glob.glob(datapath + '/*.npz'): |
|
mask = scipy.sparse.load_npz(npz_path) |
|
masks.append(mask) |
|
mask_shapes_x.append(mask.shape[0]) |
|
mask_shapes_y.append(mask.shape[1]) |
|
|
|
for i in range(len(masks)): |
|
argsort_x = np.argsort(mask_shapes_x)[::-1] |
|
argsort_y = np.argsort(mask_shapes_y)[::-1] |
|
|
|
mask_shapes_x = np.array(mask_shapes_x) |
|
mask_shapes_y = np.array(mask_shapes_y) |
|
assert all(argsort_x == argsort_y) |
|
|
|
masks = [masks[i] for i in argsort_y] |
|
mask_shapes_x = mask_shapes_x[argsort_x] |
|
mask_shapes_y = mask_shapes_y[argsort_y] |
|
|
|
for x in range(len(masks) - 1): |
|
assert mask_shapes_y[x] == mask_shapes_x[x + 1] |
|
print('mask_shapes_x[0]', mask_shapes_x[0]) |
|
assert mask_shapes_x[0] == inputsize |
|
print('mask_shapes_y[-1]', mask_shapes_y[-1]) |
|
if mask_shapes_y[-1] == 1: |
|
all_masks_available = True |
|
else: |
|
all_masks_available = False |
|
|
|
input_layer = K.Input((inputsize,), name='input_layer') |
|
input_cov = K.Input((num_covariates,), name='inputs_cov') |
|
|
|
model = K.layers.Reshape(input_shape=(inputsize,), target_shape=(inputsize, 1))(input_layer) |
|
|
|
for i in range(len(masks)): |
|
mask = masks[i] |
|
model = layer_block(model, mask, i, regression) |
|
|
|
model = K.layers.Flatten()(model) |
|
|
|
if all_masks_available: |
|
model = LocallyDirected1D(mask=masks[-1], filters=1, input_shape=(mask.shape[0], 1), |
|
name="output_layer")(model) |
|
else: |
|
model = K.layers.Dense(units=1, name="output_layer", |
|
kernel_regularizer=tf.keras.regularizers.l1(l=l1_value) |
|
)(model) |
|
|
|
model = add_covariates(model, input_cov, num_covariates, regression, negative_values_ytrain, mean_ytrain) |
|
|
|
output_layer = activation_layer(model, regression, negative_values_ytrain) |
|
model = K.Model(inputs=[input_layer, input_cov], outputs=output_layer) |
|
|
|
print(model.summary()) |
|
|
|
return model, masks |
|
|
|
def get_testdata(datapath): |
|
|
|
h5file = tables.open_file(datapath + studyname + '_genotype_processed.h5', "r") |
|
|
|
|
|
xbatch = h5file.root.data[:] |
|
|
|
h5file.close() |
|
return xbatch |
|
|
|
def predict(): |
|
xtest = get_testdata(datapath) |
|
pred = model.predict(xtest) |
|
print('model prediction: ', pred) |
|
|
|
datapath = '/data/public/GenNet/processed_data/' |
|
inputsize = 6986636 |
|
num_covariates = 0 |
|
genotype_path = datapath |
|
l1_value = 0.001 |
|
model, masks = create_network_from_npz(datapath=datapath, inputsize=inputsize, genotype_path=genotype_path,mask_order=['UKBB_sparse_connection_mask_refseq_alligned'], |
|
l1_value=l1_value, regression=False, num_covariates=num_covariates,) |
|
model.load_weights(datapath + 'bestweight_job_diabetes.h5') |
|
print('weights have been loaded') |
|
predict() |