GenNet / gennet_launch_Gene_networks_RefGene.py
lnalinaf's picture
Upload code for launch networks
75c583d
import os
os.chdir("/data/public/GenNet")
import sys
import glob
import numpy as np
import pandas as pd
#sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import matplotlib
matplotlib.use('agg')
import tensorflow as tf
import tensorflow.keras as K
import scipy
import tables
tf.keras.backend.set_epsilon(0.0000001)
tf_version = tf.__version__ # ToDo use packaging.version
if tf_version <= '1.13.1':
from GenNet_utils.LocallyDirectedConnected import LocallyDirected1D
print('= or less then 1.13.1: tensorflow version is', tf_version)
elif tf_version >= '2.0':
from GenNet_utils.LocallyDirectedConnected_tf2 import LocallyDirected1D
print('= or more then 2.0: tensorflow version is', tf_version)
else:
print("unexpected tensorflow version")
from GenNet_utils.LocallyDirectedConnected_tf2 import LocallyDirected1D
studyname = 'test_GN_ref'
def layer_block(model, mask, i, regression):
if regression:
activation_type="relu"
else:
activation_type="tanh"
model = LocallyDirected1D(mask=mask, filters=1, input_shape=(mask.shape[0], 1),
name="LocallyDirected_" + str(i))(model)
model = K.layers.Flatten()(model)
model = K.layers.Activation(activation_type)(model)
model = K.layers.BatchNormalization(center=False, scale=False)(model)
return model
def add_covariates(model, input_cov, num_covariates, regression, negative_values_ytrain, mean_ytrain):
if num_covariates > 0:
model = activation_layer(model, regression, negative_values_ytrain)
model = K.layers.concatenate([model, input_cov], axis=1)
model = K.layers.BatchNormalization(center=False, scale=False)(model)
model = K.layers.Dense(units=1, bias_initializer= tf.keras.initializers.Constant(mean_ytrain))(model)
return model
def activation_layer(model, regression, negative_values_ytrain):
if regression:
if negative_values_ytrain:
model = K.layers.Activation("linear")(model)
print('using a linear activation function')
else:
model = K.layers.Activation("relu")(model)
print('using a relu activation function')
else:
model = K.layers.Activation("sigmoid")(model)
return model
def create_network_from_npz(datapath,
inputsize,
genotype_path,
l1_value=0.01,
regression=False,
num_covariates=0,
mask_order = []):
print("Creating networks from npz masks")
print("regression", regression)
if regression:
mean_ytrain, negative_values_ytrain = regression_properties(datapath)
else:
mean_ytrain = 0
negative_values_ytrain = False
masks = []
mask_shapes_x = []
mask_shapes_y = []
print(mask_order)
if len(mask_order) > 0: # if mask_order is defined we use this order
for mask in mask_order:
mask = scipy.sparse.load_npz(datapath + '/'+str(mask)+'.npz')
masks.append(mask)
mask_shapes_x.append(mask.shape[0])
mask_shapes_y.append(mask.shape[1])
for x in range(len(masks) - 1): # check that the masks fit eachother
assert mask_shapes_y[x] == mask_shapes_x[x + 1]
else:
# if mask order is not defined we can sort the mask by the size
for npz_path in glob.glob(datapath + '/*.npz'):
mask = scipy.sparse.load_npz(npz_path)
masks.append(mask)
mask_shapes_x.append(mask.shape[0])
mask_shapes_y.append(mask.shape[1])
for i in range(len(masks)): # sort all the masks in the correct order
argsort_x = np.argsort(mask_shapes_x)[::-1]
argsort_y = np.argsort(mask_shapes_y)[::-1]
mask_shapes_x = np.array(mask_shapes_x)
mask_shapes_y = np.array(mask_shapes_y)
assert all(argsort_x == argsort_y) # check that both dimensions have the same order
masks = [masks[i] for i in argsort_y] # sort masks
mask_shapes_x = mask_shapes_x[argsort_x]
mask_shapes_y = mask_shapes_y[argsort_y]
for x in range(len(masks) - 1): # check that the masks fit eachother
assert mask_shapes_y[x] == mask_shapes_x[x + 1]
print('mask_shapes_x[0]', mask_shapes_x[0])
assert mask_shapes_x[0] == inputsize
print('mask_shapes_y[-1]', mask_shapes_y[-1])
if mask_shapes_y[-1] == 1: # should we end with a dense layer?
all_masks_available = True
else:
all_masks_available = False
input_layer = K.Input((inputsize,), name='input_layer')
input_cov = K.Input((num_covariates,), name='inputs_cov')
model = K.layers.Reshape(input_shape=(inputsize,), target_shape=(inputsize, 1))(input_layer)
for i in range(len(masks)):
mask = masks[i]
model = layer_block(model, mask, i, regression)
model = K.layers.Flatten()(model)
if all_masks_available:
model = LocallyDirected1D(mask=masks[-1], filters=1, input_shape=(mask.shape[0], 1),
name="output_layer")(model)
else:
model = K.layers.Dense(units=1, name="output_layer",
kernel_regularizer=tf.keras.regularizers.l1(l=l1_value)
)(model)
model = add_covariates(model, input_cov, num_covariates, regression, negative_values_ytrain, mean_ytrain)
output_layer = activation_layer(model, regression, negative_values_ytrain)
model = K.Model(inputs=[input_layer, input_cov], outputs=output_layer)
print(model.summary())
return model, masks
def get_testdata(datapath):
# ytest = pd.read_csv(datapath + "ytest_"+studyname+".csv")
h5file = tables.open_file(datapath + studyname + '_genotype_processed.h5', "r")
# ybatch = ytest["labels"]
# xbatchid = np.array(ytest["tot_index"].values, dtype=np.int64)
xbatch = h5file.root.data[:]
# ybatch = np.reshape(np.array(ybatch), (-1, 1))
h5file.close()
return xbatch
def predict():
xtest = get_testdata(datapath)
pred = model.predict(xtest)
print('model prediction: ', pred)
datapath = '/data/public/GenNet/processed_data/'
inputsize = 6986636
num_covariates = 0
genotype_path = datapath
l1_value = 0.001
model, masks = create_network_from_npz(datapath=datapath, inputsize=inputsize, genotype_path=genotype_path,mask_order=['UKBB_sparse_connection_mask_refseq_alligned'],
l1_value=l1_value, regression=False, num_covariates=num_covariates,)
model.load_weights(datapath + 'bestweight_job_diabetes.h5')
print('weights have been loaded')
predict()