innat
upload
5637560
"""
Mask R-CNN
Multi-GPU Support for Keras.
Copyright (c) 2017 Matterport, Inc.
Licensed under the MIT License (see LICENSE for details)
Written by Waleed Abdulla
Ideas and a small code snippets from these sources:
https://github.com/fchollet/keras/issues/2436
https://medium.com/@kuza55/transparent-multi-gpu-training-on-tensorflow-with-keras-8b0016fd9012
https://github.com/avolkov1/keras_experiments/blob/master/keras_exp/multigpu/
https://github.com/fchollet/keras/blob/master/keras/utils/training_utils.py
"""
import keras.backend as K
import keras.layers as KL
import keras.models as KM
import tensorflow as tf
class ParallelModel(KM.Model):
"""Subclasses the standard Keras Model and adds multi-GPU support.
It works by creating a copy of the model on each GPU. Then it slices
the inputs and sends a slice to each copy of the model, and then
merges the outputs together and applies the loss on the combined
outputs.
"""
def __init__(self, keras_model, gpu_count):
"""Class constructor.
keras_model: The Keras model to parallelize
gpu_count: Number of GPUs. Must be > 1
"""
self.inner_model = keras_model
self.gpu_count = gpu_count
merged_outputs = self.make_parallel()
super(ParallelModel, self).__init__(
inputs=self.inner_model.inputs, outputs=merged_outputs
)
def __getattribute__(self, attrname):
"""Redirect loading and saving methods to the inner model. That's where
the weights are stored."""
if "load" in attrname or "save" in attrname:
return getattr(self.inner_model, attrname)
return super(ParallelModel, self).__getattribute__(attrname)
def summary(self, *args, **kwargs):
"""Override summary() to display summaries of both, the wrapper
and inner models."""
super(ParallelModel, self).summary(*args, **kwargs)
self.inner_model.summary(*args, **kwargs)
def make_parallel(self):
"""Creates a new wrapper model that consists of multiple replicas of
the original model placed on different GPUs.
"""
# Slice inputs. Slice inputs on the CPU to avoid sending a copy
# of the full inputs to all GPUs. Saves on bandwidth and memory.
input_slices = {
name: tf.split(x, self.gpu_count)
for name, x in zip(self.inner_model.input_names, self.inner_model.inputs)
}
output_names = self.inner_model.output_names
outputs_all = []
for i in range(len(self.inner_model.outputs)):
outputs_all.append([])
# Run the model call() on each GPU to place the ops there
for i in range(self.gpu_count):
with tf.device("/gpu:%d" % i):
with tf.name_scope("tower_%d" % i):
# Run a slice of inputs through this replica
zipped_inputs = zip(
self.inner_model.input_names, self.inner_model.inputs
)
inputs = [
KL.Lambda(
lambda s: input_slices[name][i],
output_shape=lambda s: (None,) + s[1:],
)(tensor)
for name, tensor in zipped_inputs
]
# Create the model replica and get the outputs
outputs = self.inner_model(inputs)
if not isinstance(outputs, list):
outputs = [outputs]
# Save the outputs for merging back together later
for l, o in enumerate(outputs):
outputs_all[l].append(o)
# Merge outputs on CPU
with tf.device("/cpu:0"):
merged = []
for outputs, name in zip(outputs_all, output_names):
# Concatenate or average outputs?
# Outputs usually have a batch dimension and we concatenate
# across it. If they don't, then the output is likely a loss
# or a metric value that gets averaged across the batch.
# Keras expects losses and metrics to be scalars.
if K.int_shape(outputs[0]) == ():
# Average
m = KL.Lambda(lambda o: tf.add_n(o) / len(outputs), name=name)(
outputs
)
else:
# Concatenate
m = KL.Concatenate(axis=0, name=name)(outputs)
merged.append(m)
return merged
if __name__ == "__main__":
# Testing code below. It creates a simple model to train on MNIST and
# tries to run it on 2 GPUs. It saves the graph so it can be viewed
# in TensorBoard. Run it as:
#
# python3 parallel_model.py
import os
import keras.optimizers
import numpy as np
from keras.datasets import mnist
from keras.preprocessing.image import ImageDataGenerator
GPU_COUNT = 2
# Root directory of the project
ROOT_DIR = os.path.abspath("../")
# Directory to save logs and trained model
MODEL_DIR = os.path.join(ROOT_DIR, "logs")
def build_model(x_train, num_classes):
# Reset default graph. Keras leaves old ops in the graph,
# which are ignored for execution but clutter graph
# visualization in TensorBoard.
tf.reset_default_graph()
inputs = KL.Input(shape=x_train.shape[1:], name="input_image")
x = KL.Conv2D(32, (3, 3), activation="relu", padding="same", name="conv1")(
inputs
)
x = KL.Conv2D(64, (3, 3), activation="relu", padding="same", name="conv2")(x)
x = KL.MaxPooling2D(pool_size=(2, 2), name="pool1")(x)
x = KL.Flatten(name="flat1")(x)
x = KL.Dense(128, activation="relu", name="dense1")(x)
x = KL.Dense(num_classes, activation="softmax", name="dense2")(x)
return KM.Model(inputs, x, "digit_classifier_model")
# Load MNIST Data
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = np.expand_dims(x_train, -1).astype("float32") / 255
x_test = np.expand_dims(x_test, -1).astype("float32") / 255
print("x_train shape:", x_train.shape)
print("x_test shape:", x_test.shape)
# Build data generator and model
datagen = ImageDataGenerator()
model = build_model(x_train, 10)
# Add multi-GPU support.
model = ParallelModel(model, GPU_COUNT)
optimizer = keras.optimizers.SGD(lr=0.01, momentum=0.9, clipnorm=5.0)
model.compile(
loss="sparse_categorical_crossentropy",
optimizer=optimizer,
metrics=["accuracy"],
)
model.summary()
# Train
model.fit_generator(
datagen.flow(x_train, y_train, batch_size=64),
steps_per_epoch=50,
epochs=10,
verbose=1,
validation_data=(x_test, y_test),
callbacks=[keras.callbacks.TensorBoard(log_dir=MODEL_DIR, write_graph=True)],
)