diff --git a/doc/source/example-resnet.rst b/doc/source/example-resnet.rst index dd94c5e85..6be26d9c9 100644 --- a/doc/source/example-resnet.rst +++ b/doc/source/example-resnet.rst @@ -7,24 +7,48 @@ across multiple GPUs using Ray. View the `code for this example`_. To run the example, you will need to install `TensorFlow with GPU support`_ (at least version ``1.0.0``). Then you can run the example as follows. -First download the CIFAR-10 dataset. +First download the CIFAR-10 or CIFAR-100 dataset. .. code-block:: bash + # Get the CIFAR-10 dataset. curl -o cifar-10-binary.tar.gz https://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz - tar -xvf cifar-10-binary.tar.gz + # Get the CIFAR-100 dataset. + curl -o cifar-100-binary.tar.gz https://www.cs.toronto.edu/~kriz/cifar-100-binary.tar.gz + tar -xvf cifar-100-binary.tar.gz -Then run the training script. +Then run the training script that matches the dataset you downloaded. .. code-block:: bash + # Train Resnet on CIFAR-10. python ray/examples/resnet/resnet_main.py \ + --eval_dir=/tmp/resnet-model/eval \ --train_data_path=cifar-10-batches-bin/data_batch* \ --eval_data_path=cifar-10-batches-bin/test_batch.bin \ + --dataset=cifar10 \ --num_gpus=1 + # Train Resnet on CIFAR-100. + python ray/examples/resnet/resnet_main.py \ + --eval_dir=/tmp/resnet-model/eval \ + --train_data_path=cifar-100-binary/train.bin \ + --eval_data_path=cifar-100-binary/test.bin \ + --dataset=cifar100 \ + --num_gpus=1 + +The script will print out the IP address that the log files are stored on. In the single-node case, +you can ignore this and run tensorboard on the current machine. + +.. code-block:: bash + + python -m tensorflow.tensorboard --logdir=/tmp/resnet-model + +If you are running Ray on multiple nodes, you will need to go to the node at the IP address printed, and +run the command. + The core of the script is the actor definition. .. code-block:: python diff --git a/examples/resnet/cifar_input.py b/examples/resnet/cifar_input.py index 7d0e04c91..bf7c18e80 100644 --- a/examples/resnet/cifar_input.py +++ b/examples/resnet/cifar_input.py @@ -9,12 +9,16 @@ from __future__ import print_function import numpy as np import tensorflow as tf -def build_data(data_path, size): +def build_data(data_path, size, dataset): image_size = 32 - label_bytes = 1 - label_offset = 0 - num_classes = 10 - + if dataset == 'cifar10': + label_bytes = 1 + label_offset = 0 + num_classes = 10 + elif dataset == 'cifar100': + label_bytes = 1 + label_offset = 1 + num_classes = 100 depth = 3 image_bytes = image_size * image_size * depth record_bytes = label_bytes + label_offset + image_bytes @@ -36,7 +40,7 @@ def build_data(data_path, size): queue = tf.train.shuffle_batch([image, label], size, size, 0, num_threads=16) return queue -def build_input(data, batch_size, train): +def build_input(data, batch_size, dataset, train): """Build CIFAR image and labels. Args: @@ -55,8 +59,8 @@ def build_input(data, batch_size, train): labels_constant = tf.constant(data[1]) image_size = 32 depth = 3 - num_classes = 10 - image, label = tf.train.slice_input_producer([images_constant, labels_constant]) + num_classes = 10 if dataset == 'cifar10' else 100 + image, label = tf.train.slice_input_producer([images_constant, labels_constant], capacity=16 * batch_size) if train: image = tf.image.resize_image_with_crop_or_pad( image, image_size+4, image_size+4) @@ -102,5 +106,6 @@ def build_input(data, batch_size, train): assert len(labels.get_shape()) == 2 assert labels.get_shape()[0] == batch_size assert labels.get_shape()[1] == num_classes - + if not train: + tf.summary.image('images', images) return images, labels diff --git a/examples/resnet/resnet_main.py b/examples/resnet/resnet_main.py index a6fc42b3d..0e4fc4e2b 100644 --- a/examples/resnet/resnet_main.py +++ b/examples/resnet/resnet_main.py @@ -15,18 +15,24 @@ import cifar_input import resnet_model FLAGS = tf.app.flags.FLAGS +tf.app.flags.DEFINE_string('dataset', 'cifar10', 'cifar10 or cifar100.') tf.app.flags.DEFINE_string('train_data_path', '', 'Filepattern for training data.') tf.app.flags.DEFINE_string('eval_data_path', '', 'Filepattern for eval data') -tf.app.flags.DEFINE_string('num_gpus', 0, 'Number of gpus to run with') +tf.app.flags.DEFINE_string('eval_dir', '', + 'Directory to keep eval outputs.') +tf.app.flags.DEFINE_integer('eval_batch_count', 50, + 'Number of batches to eval.') +tf.app.flags.DEFINE_integer('num_gpus', 0, + 'Number of gpus used for training.') use_gpu = 1 if int(FLAGS.num_gpus) > 0 else 0 @ray.remote(num_return_vals=4) -def get_data(path, size): +def get_data(path, size, dataset): os.environ['CUDA_VISIBLE_DEVICES'] = '' with tf.device('/cpu:0'): - queue = cifar_input.build_data(path, size) + queue = cifar_input.build_data(path, size, dataset) sess = tf.Session() coord = tf.train.Coordinator() tf.train.start_queue_runners(sess, coord=coord) @@ -40,11 +46,11 @@ def get_data(path, size): @ray.actor(num_gpus=use_gpu) class ResNetTrainActor(object): - def __init__(self, data, num_gpus): + def __init__(self, data, dataset, num_gpus): if num_gpus > 0: os.environ['CUDA_VISIBLE_DEVICES'] = ','.join([str(i) for i in ray.get_gpu_ids()]) hps = resnet_model.HParams(batch_size=128, - num_classes=10, + num_classes=10 if dataset == 'cifar10' else 100, min_lrn_rate=0.0001, lrn_rate=0.1, num_residual_units=5, @@ -55,23 +61,21 @@ class ResNetTrainActor(object): num_gpus=num_gpus) data = ray.get(data) total_images = np.concatenate([data[0], data[1], data[2]]) - with tf.Graph().as_default(): - if num_gpus > 0: - tf.set_random_seed(ray.get_gpu_ids()[0] + 1) - else: - tf.set_random_seed(1) - - with tf.device('/gpu:0' if num_gpus > 0 else '/cpu:0'): - images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, True) - self.model = resnet_model.ResNet(hps, images, labels, 'train') - self.model.build_graph() - config = tf.ConfigProto(allow_soft_placement=True) - sess = tf.Session(config=config) - self.model.variables.set_session(sess) - self.coord = tf.train.Coordinator() - tf.train.start_queue_runners(sess, coord=self.coord) - init = tf.global_variables_initializer() - sess.run(init) + if num_gpus > 0: + tf.set_random_seed(ray.get_gpu_ids()[0] + 1) + else: + tf.set_random_seed(1) + with tf.device('/gpu:0' if num_gpus > 0 else '/cpu:0'): + images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, dataset, True) + self.model = resnet_model.ResNet(hps, images, labels, 'train') + self.model.build_graph() + config = tf.ConfigProto(allow_soft_placement=True) + sess = tf.Session(config=config) + self.model.variables.set_session(sess) + self.coord = tf.train.Coordinator() + tf.train.start_queue_runners(sess, coord=self.coord) + init = tf.global_variables_initializer() + sess.run(init) def compute_steps(self, weights): # This method sets the weights in the network, runs some training steps, @@ -87,9 +91,9 @@ class ResNetTrainActor(object): @ray.actor class ResNetTestActor(object): - def __init__(self, data, eval_batch_count): + def __init__(self, data, dataset, eval_batch_count, eval_dir): hps = resnet_model.HParams(batch_size=100, - num_classes=10, + num_classes=10 if dataset == 'cifar10' else 100, min_lrn_rate=0.0001, lrn_rate=0.1, num_residual_units=5, @@ -100,30 +104,32 @@ class ResNetTestActor(object): num_gpus=0) data = ray.get(data) total_images = np.concatenate([data[0], data[1], data[2]]) - with tf.Graph().as_default(): - with tf.device('/cpu:0'): - images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, False) - self.model = resnet_model.ResNet(hps, images, labels, 'eval') - self.model.build_graph() - config = tf.ConfigProto(allow_soft_placement=True) - sess = tf.Session(config=config) - self.model.variables.set_session(sess) - self.coord = tf.train.Coordinator() - tf.train.start_queue_runners(sess, coord=self.coord) - init = tf.global_variables_initializer() - sess.run(init) - self.best_precision = 0.0 - self.eval_batch_count = eval_batch_count + with tf.device('/cpu:0'): + images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, dataset, False) + self.model = resnet_model.ResNet(hps, images, labels, 'eval') + self.model.build_graph() + config = tf.ConfigProto(allow_soft_placement=True) + sess = tf.Session(config=config) + self.model.variables.set_session(sess) + self.coord = tf.train.Coordinator() + tf.train.start_queue_runners(sess, coord=self.coord) + init = tf.global_variables_initializer() + sess.run(init) + self.best_precision = 0.0 + self.eval_batch_count = eval_batch_count + self.summary_writer = tf.summary.FileWriter(eval_dir, sess.graph) + self.summary_writer + self.ip_addr = ray.services.get_node_ip_address() - def accuracy(self, weights): + def accuracy(self, weights, train_step): self.model.variables.set_weights(weights) total_prediction, correct_prediction = 0, 0 model = self.model sess = self.model.variables.sess for _ in range(self.eval_batch_count): - loss, predictions, truth, train_step = sess.run( - [model.cost, model.predictions, - model.labels, model.global_step]) + summaries, loss, predictions, truth = sess.run( + [model.summaries, model.cost, model.predictions, + model.labels]) truth = np.argmax(truth, axis=1) predictions = np.argmax(predictions, axis=1) @@ -132,36 +138,50 @@ class ResNetTestActor(object): precision = 1.0 * correct_prediction / total_prediction self.best_precision = max(precision, self.best_precision) + precision_summ = tf.Summary() + precision_summ.value.add( + tag='Precision', simple_value=precision) + self.summary_writer.add_summary(precision_summ, train_step) + best_precision_summ = tf.Summary() + best_precision_summ.value.add( + tag='Best Precision', simple_value=self.best_precision) + self.summary_writer.add_summary(best_precision_summ, train_step) + self.summary_writer.add_summary(summaries, train_step) + tf.logging.info('loss: %.3f, precision: %.3f, best precision: %.3f' % + (loss, precision, self.best_precision)) + self.summary_writer.flush() return precision + def get_ip_addr(self): + return self.ip_addr + def train(): """Training loop.""" num_gpus = int(FLAGS.num_gpus) - ray.init(num_gpus=num_gpus) - train_data = get_data.remote(FLAGS.train_data_path, 50000) - test_data = get_data.remote(FLAGS.eval_data_path, 10000) + ray.init(num_gpus=num_gpus, redirect_output=True) + train_data = get_data.remote(FLAGS.train_data_path, 50000, FLAGS.dataset) + test_data = get_data.remote(FLAGS.eval_data_path, 10000, FLAGS.dataset) if num_gpus > 0: - train_actors = [ResNetTrainActor(train_data, num_gpus) for _ in range(num_gpus)] + train_actors = [ResNetTrainActor(train_data, FLAGS.dataset, num_gpus) for _ in range(num_gpus)] else: train_actors = [ResNetTrainActor(train_data, num_gpus)] - test_actor = ResNetTestActor(test_data, 50) + test_actor = ResNetTestActor(test_data, FLAGS.dataset, FLAGS.eval_batch_count, FLAGS.eval_dir) + print('The log files for tensorboard are stored at ip {}.'.format(ray.get(test_actor.get_ip_addr()))) step = 0 weight_id = train_actors[0].get_weights() - acc_id = test_actor.accuracy(weight_id) + acc_id = test_actor.accuracy(weight_id, step) if num_gpus == 0: num_gpus = 1 + print("Starting computation.") while True: - with open('results.txt', 'a') as results: - print('Computing steps') - all_weights = ray.get([actor.compute_steps(weight_id) for actor in train_actors]) - mean_weights = {k: sum([weights[k] for weights in all_weights]) / num_gpus for k in all_weights[0]} - weight_id = ray.put(mean_weights) - step += 10 - if step % 200 == 0: - acc = ray.get(acc_id) - acc_id = test_actor.accuracy(weight_id) - print('Step {0}: {1:.6f}'.format(step - 200, acc)) - results.write(str(step - 200) + ' ' + str(acc) + '\n') + all_weights = ray.get([actor.compute_steps(weight_id) for actor in train_actors]) + mean_weights = {k: sum([weights[k] for weights in all_weights]) / num_gpus for k in all_weights[0]} + weight_id = ray.put(mean_weights) + step += 10 + if step % 200 == 0: + acc = ray.get(acc_id) + acc_id = test_actor.accuracy(weight_id, step) + print('Step {0}: {1:.6f}'.format(step - 200, acc)) def main(_): train() diff --git a/examples/resnet/resnet_model.py b/examples/resnet/resnet_model.py index c4c23f527..75cbf3840 100644 --- a/examples/resnet/resnet_model.py +++ b/examples/resnet/resnet_model.py @@ -50,6 +50,7 @@ class ResNet(object): self._build_train_op() else: self.variables = ray.experimental.TensorFlowVariables(self.cost) + self.summaries = tf.summary.merge_all() def _stride_arr(self, stride): """Map a stride scalar to the stride array for tf.nn.conv2d.""" @@ -112,9 +113,8 @@ class ResNet(object): self.cost = tf.reduce_mean(xent, name='xent') self.cost += self._decay() - truth = tf.argmax(self.labels, axis=1) - predictions = tf.argmax(self.predictions, axis=1) - self.precision = tf.reduce_mean(tf.to_float(tf.equal(predictions, truth))) + if self.mode == 'eval': + tf.summary.scalar('cost', self.cost) def _build_train_op(self): """Build training specific ops for the graph.""" @@ -124,6 +124,7 @@ class ResNet(object): boundaries = [int(20000 * i / np.sqrt(num_gpus)) for i in range(2, 5)] values = [0.1, 0.01, 0.001, 0.0001] self.lrn_rate = tf.train.piecewise_constant(self.global_step, boundaries, values) + tf.summary.scalar('learning rate', self.lrn_rate) if self.hps.optimizer == 'sgd': optimizer = tf.train.GradientDescentOptimizer(self.lrn_rate) @@ -172,6 +173,8 @@ class ResNet(object): 'moving_variance', params_shape, tf.float32, initializer=tf.constant_initializer(1.0, tf.float32), trainable=False) + tf.summary.histogram(mean.op.name, mean) + tf.summary.histogram(variance.op.name, variance) # elipson used to be 1e-5. Maybe 0.001 solves NaN problem in deeper net. y = tf.nn.batch_normalization( x, mean, variance, beta, gamma, 0.001)