mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
Added tensorboard to resnet (#374)
Added tensorboard to resnet example.
This commit is contained in:
parent
12c9618c0c
commit
6d9820ef5d
4 changed files with 126 additions and 74 deletions
|
@ -7,24 +7,48 @@ across multiple GPUs using Ray. View the `code for this example`_.
|
|||
To run the example, you will need to install `TensorFlow with GPU support`_ (at
|
||||
least version ``1.0.0``). Then you can run the example as follows.
|
||||
|
||||
First download the CIFAR-10 dataset.
|
||||
First download the CIFAR-10 or CIFAR-100 dataset.
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
# Get the CIFAR-10 dataset.
|
||||
curl -o cifar-10-binary.tar.gz https://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz
|
||||
|
||||
tar -xvf cifar-10-binary.tar.gz
|
||||
|
||||
# Get the CIFAR-100 dataset.
|
||||
curl -o cifar-100-binary.tar.gz https://www.cs.toronto.edu/~kriz/cifar-100-binary.tar.gz
|
||||
tar -xvf cifar-100-binary.tar.gz
|
||||
|
||||
Then run the training script.
|
||||
Then run the training script that matches the dataset you downloaded.
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
# Train Resnet on CIFAR-10.
|
||||
python ray/examples/resnet/resnet_main.py \
|
||||
--eval_dir=/tmp/resnet-model/eval \
|
||||
--train_data_path=cifar-10-batches-bin/data_batch* \
|
||||
--eval_data_path=cifar-10-batches-bin/test_batch.bin \
|
||||
--dataset=cifar10 \
|
||||
--num_gpus=1
|
||||
|
||||
# Train Resnet on CIFAR-100.
|
||||
python ray/examples/resnet/resnet_main.py \
|
||||
--eval_dir=/tmp/resnet-model/eval \
|
||||
--train_data_path=cifar-100-binary/train.bin \
|
||||
--eval_data_path=cifar-100-binary/test.bin \
|
||||
--dataset=cifar100 \
|
||||
--num_gpus=1
|
||||
|
||||
The script will print out the IP address that the log files are stored on. In the single-node case,
|
||||
you can ignore this and run tensorboard on the current machine.
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
python -m tensorflow.tensorboard --logdir=/tmp/resnet-model
|
||||
|
||||
If you are running Ray on multiple nodes, you will need to go to the node at the IP address printed, and
|
||||
run the command.
|
||||
|
||||
The core of the script is the actor definition.
|
||||
|
||||
.. code-block:: python
|
||||
|
|
|
@ -9,12 +9,16 @@ from __future__ import print_function
|
|||
import numpy as np
|
||||
import tensorflow as tf
|
||||
|
||||
def build_data(data_path, size):
|
||||
def build_data(data_path, size, dataset):
|
||||
image_size = 32
|
||||
label_bytes = 1
|
||||
label_offset = 0
|
||||
num_classes = 10
|
||||
|
||||
if dataset == 'cifar10':
|
||||
label_bytes = 1
|
||||
label_offset = 0
|
||||
num_classes = 10
|
||||
elif dataset == 'cifar100':
|
||||
label_bytes = 1
|
||||
label_offset = 1
|
||||
num_classes = 100
|
||||
depth = 3
|
||||
image_bytes = image_size * image_size * depth
|
||||
record_bytes = label_bytes + label_offset + image_bytes
|
||||
|
@ -36,7 +40,7 @@ def build_data(data_path, size):
|
|||
queue = tf.train.shuffle_batch([image, label], size, size, 0, num_threads=16)
|
||||
return queue
|
||||
|
||||
def build_input(data, batch_size, train):
|
||||
def build_input(data, batch_size, dataset, train):
|
||||
"""Build CIFAR image and labels.
|
||||
|
||||
Args:
|
||||
|
@ -55,8 +59,8 @@ def build_input(data, batch_size, train):
|
|||
labels_constant = tf.constant(data[1])
|
||||
image_size = 32
|
||||
depth = 3
|
||||
num_classes = 10
|
||||
image, label = tf.train.slice_input_producer([images_constant, labels_constant])
|
||||
num_classes = 10 if dataset == 'cifar10' else 100
|
||||
image, label = tf.train.slice_input_producer([images_constant, labels_constant], capacity=16 * batch_size)
|
||||
if train:
|
||||
image = tf.image.resize_image_with_crop_or_pad(
|
||||
image, image_size+4, image_size+4)
|
||||
|
@ -102,5 +106,6 @@ def build_input(data, batch_size, train):
|
|||
assert len(labels.get_shape()) == 2
|
||||
assert labels.get_shape()[0] == batch_size
|
||||
assert labels.get_shape()[1] == num_classes
|
||||
|
||||
if not train:
|
||||
tf.summary.image('images', images)
|
||||
return images, labels
|
||||
|
|
|
@ -15,18 +15,24 @@ import cifar_input
|
|||
import resnet_model
|
||||
|
||||
FLAGS = tf.app.flags.FLAGS
|
||||
tf.app.flags.DEFINE_string('dataset', 'cifar10', 'cifar10 or cifar100.')
|
||||
tf.app.flags.DEFINE_string('train_data_path', '',
|
||||
'Filepattern for training data.')
|
||||
tf.app.flags.DEFINE_string('eval_data_path', '',
|
||||
'Filepattern for eval data')
|
||||
tf.app.flags.DEFINE_string('num_gpus', 0, 'Number of gpus to run with')
|
||||
tf.app.flags.DEFINE_string('eval_dir', '',
|
||||
'Directory to keep eval outputs.')
|
||||
tf.app.flags.DEFINE_integer('eval_batch_count', 50,
|
||||
'Number of batches to eval.')
|
||||
tf.app.flags.DEFINE_integer('num_gpus', 0,
|
||||
'Number of gpus used for training.')
|
||||
use_gpu = 1 if int(FLAGS.num_gpus) > 0 else 0
|
||||
|
||||
@ray.remote(num_return_vals=4)
|
||||
def get_data(path, size):
|
||||
def get_data(path, size, dataset):
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = ''
|
||||
with tf.device('/cpu:0'):
|
||||
queue = cifar_input.build_data(path, size)
|
||||
queue = cifar_input.build_data(path, size, dataset)
|
||||
sess = tf.Session()
|
||||
coord = tf.train.Coordinator()
|
||||
tf.train.start_queue_runners(sess, coord=coord)
|
||||
|
@ -40,11 +46,11 @@ def get_data(path, size):
|
|||
|
||||
@ray.actor(num_gpus=use_gpu)
|
||||
class ResNetTrainActor(object):
|
||||
def __init__(self, data, num_gpus):
|
||||
def __init__(self, data, dataset, num_gpus):
|
||||
if num_gpus > 0:
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = ','.join([str(i) for i in ray.get_gpu_ids()])
|
||||
hps = resnet_model.HParams(batch_size=128,
|
||||
num_classes=10,
|
||||
num_classes=10 if dataset == 'cifar10' else 100,
|
||||
min_lrn_rate=0.0001,
|
||||
lrn_rate=0.1,
|
||||
num_residual_units=5,
|
||||
|
@ -55,23 +61,21 @@ class ResNetTrainActor(object):
|
|||
num_gpus=num_gpus)
|
||||
data = ray.get(data)
|
||||
total_images = np.concatenate([data[0], data[1], data[2]])
|
||||
with tf.Graph().as_default():
|
||||
if num_gpus > 0:
|
||||
tf.set_random_seed(ray.get_gpu_ids()[0] + 1)
|
||||
else:
|
||||
tf.set_random_seed(1)
|
||||
|
||||
with tf.device('/gpu:0' if num_gpus > 0 else '/cpu:0'):
|
||||
images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, True)
|
||||
self.model = resnet_model.ResNet(hps, images, labels, 'train')
|
||||
self.model.build_graph()
|
||||
config = tf.ConfigProto(allow_soft_placement=True)
|
||||
sess = tf.Session(config=config)
|
||||
self.model.variables.set_session(sess)
|
||||
self.coord = tf.train.Coordinator()
|
||||
tf.train.start_queue_runners(sess, coord=self.coord)
|
||||
init = tf.global_variables_initializer()
|
||||
sess.run(init)
|
||||
if num_gpus > 0:
|
||||
tf.set_random_seed(ray.get_gpu_ids()[0] + 1)
|
||||
else:
|
||||
tf.set_random_seed(1)
|
||||
with tf.device('/gpu:0' if num_gpus > 0 else '/cpu:0'):
|
||||
images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, dataset, True)
|
||||
self.model = resnet_model.ResNet(hps, images, labels, 'train')
|
||||
self.model.build_graph()
|
||||
config = tf.ConfigProto(allow_soft_placement=True)
|
||||
sess = tf.Session(config=config)
|
||||
self.model.variables.set_session(sess)
|
||||
self.coord = tf.train.Coordinator()
|
||||
tf.train.start_queue_runners(sess, coord=self.coord)
|
||||
init = tf.global_variables_initializer()
|
||||
sess.run(init)
|
||||
|
||||
def compute_steps(self, weights):
|
||||
# This method sets the weights in the network, runs some training steps,
|
||||
|
@ -87,9 +91,9 @@ class ResNetTrainActor(object):
|
|||
|
||||
@ray.actor
|
||||
class ResNetTestActor(object):
|
||||
def __init__(self, data, eval_batch_count):
|
||||
def __init__(self, data, dataset, eval_batch_count, eval_dir):
|
||||
hps = resnet_model.HParams(batch_size=100,
|
||||
num_classes=10,
|
||||
num_classes=10 if dataset == 'cifar10' else 100,
|
||||
min_lrn_rate=0.0001,
|
||||
lrn_rate=0.1,
|
||||
num_residual_units=5,
|
||||
|
@ -100,30 +104,32 @@ class ResNetTestActor(object):
|
|||
num_gpus=0)
|
||||
data = ray.get(data)
|
||||
total_images = np.concatenate([data[0], data[1], data[2]])
|
||||
with tf.Graph().as_default():
|
||||
with tf.device('/cpu:0'):
|
||||
images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, False)
|
||||
self.model = resnet_model.ResNet(hps, images, labels, 'eval')
|
||||
self.model.build_graph()
|
||||
config = tf.ConfigProto(allow_soft_placement=True)
|
||||
sess = tf.Session(config=config)
|
||||
self.model.variables.set_session(sess)
|
||||
self.coord = tf.train.Coordinator()
|
||||
tf.train.start_queue_runners(sess, coord=self.coord)
|
||||
init = tf.global_variables_initializer()
|
||||
sess.run(init)
|
||||
self.best_precision = 0.0
|
||||
self.eval_batch_count = eval_batch_count
|
||||
with tf.device('/cpu:0'):
|
||||
images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, dataset, False)
|
||||
self.model = resnet_model.ResNet(hps, images, labels, 'eval')
|
||||
self.model.build_graph()
|
||||
config = tf.ConfigProto(allow_soft_placement=True)
|
||||
sess = tf.Session(config=config)
|
||||
self.model.variables.set_session(sess)
|
||||
self.coord = tf.train.Coordinator()
|
||||
tf.train.start_queue_runners(sess, coord=self.coord)
|
||||
init = tf.global_variables_initializer()
|
||||
sess.run(init)
|
||||
self.best_precision = 0.0
|
||||
self.eval_batch_count = eval_batch_count
|
||||
self.summary_writer = tf.summary.FileWriter(eval_dir, sess.graph)
|
||||
self.summary_writer
|
||||
self.ip_addr = ray.services.get_node_ip_address()
|
||||
|
||||
def accuracy(self, weights):
|
||||
def accuracy(self, weights, train_step):
|
||||
self.model.variables.set_weights(weights)
|
||||
total_prediction, correct_prediction = 0, 0
|
||||
model = self.model
|
||||
sess = self.model.variables.sess
|
||||
for _ in range(self.eval_batch_count):
|
||||
loss, predictions, truth, train_step = sess.run(
|
||||
[model.cost, model.predictions,
|
||||
model.labels, model.global_step])
|
||||
summaries, loss, predictions, truth = sess.run(
|
||||
[model.summaries, model.cost, model.predictions,
|
||||
model.labels])
|
||||
|
||||
truth = np.argmax(truth, axis=1)
|
||||
predictions = np.argmax(predictions, axis=1)
|
||||
|
@ -132,36 +138,50 @@ class ResNetTestActor(object):
|
|||
|
||||
precision = 1.0 * correct_prediction / total_prediction
|
||||
self.best_precision = max(precision, self.best_precision)
|
||||
precision_summ = tf.Summary()
|
||||
precision_summ.value.add(
|
||||
tag='Precision', simple_value=precision)
|
||||
self.summary_writer.add_summary(precision_summ, train_step)
|
||||
best_precision_summ = tf.Summary()
|
||||
best_precision_summ.value.add(
|
||||
tag='Best Precision', simple_value=self.best_precision)
|
||||
self.summary_writer.add_summary(best_precision_summ, train_step)
|
||||
self.summary_writer.add_summary(summaries, train_step)
|
||||
tf.logging.info('loss: %.3f, precision: %.3f, best precision: %.3f' %
|
||||
(loss, precision, self.best_precision))
|
||||
self.summary_writer.flush()
|
||||
return precision
|
||||
|
||||
def get_ip_addr(self):
|
||||
return self.ip_addr
|
||||
|
||||
def train():
|
||||
"""Training loop."""
|
||||
num_gpus = int(FLAGS.num_gpus)
|
||||
ray.init(num_gpus=num_gpus)
|
||||
train_data = get_data.remote(FLAGS.train_data_path, 50000)
|
||||
test_data = get_data.remote(FLAGS.eval_data_path, 10000)
|
||||
ray.init(num_gpus=num_gpus, redirect_output=True)
|
||||
train_data = get_data.remote(FLAGS.train_data_path, 50000, FLAGS.dataset)
|
||||
test_data = get_data.remote(FLAGS.eval_data_path, 10000, FLAGS.dataset)
|
||||
if num_gpus > 0:
|
||||
train_actors = [ResNetTrainActor(train_data, num_gpus) for _ in range(num_gpus)]
|
||||
train_actors = [ResNetTrainActor(train_data, FLAGS.dataset, num_gpus) for _ in range(num_gpus)]
|
||||
else:
|
||||
train_actors = [ResNetTrainActor(train_data, num_gpus)]
|
||||
test_actor = ResNetTestActor(test_data, 50)
|
||||
test_actor = ResNetTestActor(test_data, FLAGS.dataset, FLAGS.eval_batch_count, FLAGS.eval_dir)
|
||||
print('The log files for tensorboard are stored at ip {}.'.format(ray.get(test_actor.get_ip_addr())))
|
||||
step = 0
|
||||
weight_id = train_actors[0].get_weights()
|
||||
acc_id = test_actor.accuracy(weight_id)
|
||||
acc_id = test_actor.accuracy(weight_id, step)
|
||||
if num_gpus == 0:
|
||||
num_gpus = 1
|
||||
print("Starting computation.")
|
||||
while True:
|
||||
with open('results.txt', 'a') as results:
|
||||
print('Computing steps')
|
||||
all_weights = ray.get([actor.compute_steps(weight_id) for actor in train_actors])
|
||||
mean_weights = {k: sum([weights[k] for weights in all_weights]) / num_gpus for k in all_weights[0]}
|
||||
weight_id = ray.put(mean_weights)
|
||||
step += 10
|
||||
if step % 200 == 0:
|
||||
acc = ray.get(acc_id)
|
||||
acc_id = test_actor.accuracy(weight_id)
|
||||
print('Step {0}: {1:.6f}'.format(step - 200, acc))
|
||||
results.write(str(step - 200) + ' ' + str(acc) + '\n')
|
||||
all_weights = ray.get([actor.compute_steps(weight_id) for actor in train_actors])
|
||||
mean_weights = {k: sum([weights[k] for weights in all_weights]) / num_gpus for k in all_weights[0]}
|
||||
weight_id = ray.put(mean_weights)
|
||||
step += 10
|
||||
if step % 200 == 0:
|
||||
acc = ray.get(acc_id)
|
||||
acc_id = test_actor.accuracy(weight_id, step)
|
||||
print('Step {0}: {1:.6f}'.format(step - 200, acc))
|
||||
|
||||
def main(_):
|
||||
train()
|
||||
|
|
|
@ -50,6 +50,7 @@ class ResNet(object):
|
|||
self._build_train_op()
|
||||
else:
|
||||
self.variables = ray.experimental.TensorFlowVariables(self.cost)
|
||||
self.summaries = tf.summary.merge_all()
|
||||
|
||||
def _stride_arr(self, stride):
|
||||
"""Map a stride scalar to the stride array for tf.nn.conv2d."""
|
||||
|
@ -112,9 +113,8 @@ class ResNet(object):
|
|||
self.cost = tf.reduce_mean(xent, name='xent')
|
||||
self.cost += self._decay()
|
||||
|
||||
truth = tf.argmax(self.labels, axis=1)
|
||||
predictions = tf.argmax(self.predictions, axis=1)
|
||||
self.precision = tf.reduce_mean(tf.to_float(tf.equal(predictions, truth)))
|
||||
if self.mode == 'eval':
|
||||
tf.summary.scalar('cost', self.cost)
|
||||
|
||||
def _build_train_op(self):
|
||||
"""Build training specific ops for the graph."""
|
||||
|
@ -124,6 +124,7 @@ class ResNet(object):
|
|||
boundaries = [int(20000 * i / np.sqrt(num_gpus)) for i in range(2, 5)]
|
||||
values = [0.1, 0.01, 0.001, 0.0001]
|
||||
self.lrn_rate = tf.train.piecewise_constant(self.global_step, boundaries, values)
|
||||
tf.summary.scalar('learning rate', self.lrn_rate)
|
||||
|
||||
if self.hps.optimizer == 'sgd':
|
||||
optimizer = tf.train.GradientDescentOptimizer(self.lrn_rate)
|
||||
|
@ -172,6 +173,8 @@ class ResNet(object):
|
|||
'moving_variance', params_shape, tf.float32,
|
||||
initializer=tf.constant_initializer(1.0, tf.float32),
|
||||
trainable=False)
|
||||
tf.summary.histogram(mean.op.name, mean)
|
||||
tf.summary.histogram(variance.op.name, variance)
|
||||
# elipson used to be 1e-5. Maybe 0.001 solves NaN problem in deeper net.
|
||||
y = tf.nn.batch_normalization(
|
||||
x, mean, variance, beta, gamma, 0.001)
|
||||
|
|
Loading…
Add table
Reference in a new issue