[docs] Convert Examples to Gallery (#5414)

This commit is contained in:
Richard Liaw 2019-09-24 15:46:56 -07:00 committed by GitHub
parent ea9376c9ce
commit 10f21fa313
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 966 additions and 885 deletions

View file

@ -47,7 +47,7 @@ matrix:
- export PATH="$HOME/miniconda/bin:$PATH"
- cd doc
- pip install -q -r requirements-doc.txt
- pip install -q yapf==0.23.0
- pip install -q yapf==0.23.0 sphinx-gallery
- sphinx-build -W -b html -d _build/doctrees source _build/html
- cd ..
# Run Python linting, ignore dict vs {} (C408), others are defaults

View file

@ -15,6 +15,17 @@ DOCKER_SHA=$($ROOT_DIR/../../build-docker.sh --output-sha --no-cache)
SUPPRESS_OUTPUT=$ROOT_DIR/../suppress_output
echo "Using Docker image" $DOCKER_SHA
######################## EXAMPLE TESTS #################################
$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
python /ray/doc/examples/plot_pong_example.py
$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
python /ray/doc/examples/plot_parameter_server.py
$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
python /ray/doc/examples/plot_hyperparameter.py
######################## RLLIB TESTS #################################
source $ROOT_DIR/run_rllib_tests.sh

1
doc/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
auto_examples/

View file

@ -6,6 +6,7 @@ SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
AUTOGALLERYDIR= source/auto_examples
# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
@ -49,7 +50,7 @@ help:
@echo " coverage to run coverage check of the documentation (if enabled)"
clean:
rm -rf $(BUILDDIR)/*
rm -rf $(BUILDDIR)/* $(AUTOGALLERYDIR)
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html

0
doc/examples/README.rst Normal file
View file

View file

@ -97,23 +97,12 @@ def example8():
"""Cython with blas. NOTE: requires scipy"""
# See cython_blas.pyx for argument documentation
mat = np.array([[[2.0, 2.0], [2.0, 2.0]], [[2.0, 2.0], [2.0, 2.0]]],
dtype=np.float32)
mat = np.array(
[[[2.0, 2.0], [2.0, 2.0]], [[2.0, 2.0], [2.0, 2.0]]], dtype=np.float32)
result = np.zeros((2, 2), np.float32, order="C")
run_func(cyth.compute_kernel_matrix,
"L",
"T",
2,
2,
1.0,
mat,
0,
2,
1.0,
result,
2
)
run_func(cyth.compute_kernel_matrix, "L", "T", 2, 2, 1.0, mat, 0, 2, 1.0,
result, 2)
if __name__ == "__main__":

View file

@ -25,11 +25,10 @@ except ImportError as e: # noqa
modules = [os.path.join(pkg_dir, module) for module in modules]
setup(
name=pkg_dir,
version="0.0.1",
description="Cython examples for Ray",
packages=[pkg_dir],
ext_modules=cythonize(modules),
install_requires=install_requires,
include_dirs=include_dirs
)
name=pkg_dir,
version="0.0.1",
description="Cython examples for Ray",
packages=[pkg_dir],
ext_modules=cythonize(modules),
install_requires=install_requires,
include_dirs=include_dirs)

View file

@ -1,154 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
from collections import defaultdict
import numpy as np
import ray
from tensorflow.examples.tutorials.mnist import input_data
import objective
parser = argparse.ArgumentParser(description="Run the hyperparameter "
"optimization example.")
parser.add_argument("--num-starting-segments", default=5, type=int,
help="The number of training segments to start in "
"parallel.")
parser.add_argument("--num-segments", default=10, type=int,
help="The number of additional training segments to "
"perform.")
parser.add_argument("--steps-per-segment", default=20, type=int,
help="The number of steps of training to do per training "
"segment.")
parser.add_argument("--redis-address", default=None, type=str,
help="The Redis address of the cluster.")
if __name__ == "__main__":
args = parser.parse_args()
ray.init(redis_address=args.redis_address)
# The number of training passes over the dataset to use for network.
steps = args.steps_per_segment
# Load the mnist data and turn the data into remote objects.
print("Downloading the MNIST dataset. This may take a minute.")
mnist = input_data.read_data_sets("MNIST_data", one_hot=True)
train_images = ray.put(mnist.train.images)
train_labels = ray.put(mnist.train.labels)
validation_images = ray.put(mnist.validation.images)
validation_labels = ray.put(mnist.validation.labels)
# Keep track of the accuracies that we've seen at different numbers of
# iterations.
accuracies_by_num_steps = defaultdict(lambda: [])
# Define a method to determine if an experiment looks promising or not.
def is_promising(experiment_info):
accuracies = experiment_info["accuracies"]
total_num_steps = experiment_info["total_num_steps"]
comparable_accuracies = accuracies_by_num_steps[total_num_steps]
if len(comparable_accuracies) == 0:
if len(accuracies) == 1:
# This means that we haven't seen anything finish yet, so keep
# running this experiment.
return True
else:
# The experiment is promising if the second half of the
# accuracies are better than the first half of the accuracies.
return (np.mean(accuracies[:len(accuracies) // 2]) <
np.mean(accuracies[len(accuracies) // 2:]))
# Otherwise, continue running the experiment if it is in the top half
# of experiments we've seen so far at this point in time.
return np.mean(accuracy > np.array(comparable_accuracies)) > 0.5
# Keep track of all of the experiment segments that we're running. This
# dictionary uses the object ID of the experiment as the key.
experiment_info = {}
# Keep track of the curently running experiment IDs.
remaining_ids = []
# Keep track of the best hyperparameters and the best accuracy.
best_hyperparameters = None
best_accuracy = 0
# A function for generating random hyperparameters.
def generate_hyperparameters():
return {"learning_rate": 10 ** np.random.uniform(-5, 5),
"batch_size": np.random.randint(1, 100),
"dropout": np.random.uniform(0, 1),
"stddev": 10 ** np.random.uniform(-5, 5)}
# Launch some initial experiments.
for _ in range(args.num_starting_segments):
hyperparameters = generate_hyperparameters()
experiment_id = objective.train_cnn_and_compute_accuracy.remote(
hyperparameters, steps, train_images, train_labels,
validation_images, validation_labels)
experiment_info[experiment_id] = {"hyperparameters": hyperparameters,
"total_num_steps": steps,
"accuracies": []}
remaining_ids.append(experiment_id)
for _ in range(args.num_segments):
# Wait for a segment of an experiment to finish.
ready_ids, remaining_ids = ray.wait(remaining_ids, num_returns=1)
experiment_id = ready_ids[0]
# Get the accuracy and the weights.
accuracy, weights = ray.get(experiment_id)
# Update the experiment info.
previous_info = experiment_info[experiment_id]
previous_info["accuracies"].append(accuracy)
# Update the best accuracy and best hyperparameters.
if accuracy > best_accuracy:
best_hyperparameters = previous_info["hyperparameters"]
best_accuracy = accuracy
if is_promising(previous_info):
# If the experiment still looks promising, then continue running
# it.
print("Continuing to run the experiment with hyperparameters {}."
.format(previous_info["hyperparameters"]))
new_hyperparameters = previous_info["hyperparameters"]
new_info = {"hyperparameters": new_hyperparameters,
"total_num_steps": (previous_info["total_num_steps"] +
steps),
"accuracies": previous_info["accuracies"][:]}
starting_weights = weights
else:
# If the experiment does not look promising, start a new
# experiment.
print("Ending the experiment with hyperparameters {}."
.format(previous_info["hyperparameters"]))
new_hyperparameters = generate_hyperparameters()
new_info = {"hyperparameters": new_hyperparameters,
"total_num_steps": steps,
"accuracies": []}
starting_weights = None
# Start running the next segment.
new_experiment_id = objective.train_cnn_and_compute_accuracy.remote(
new_hyperparameters, steps, train_images, train_labels,
validation_images, validation_labels, weights=starting_weights)
experiment_info[new_experiment_id] = new_info
remaining_ids.append(new_experiment_id)
# Update the set of all accuracies that we've seen.
accuracies_by_num_steps[previous_info["total_num_steps"]].append(
accuracy)
# Record the best performing set of hyperparameters.
print("""Best accuracy was {:.3} with
learning_rate: {:.2}
batch_size: {}
dropout: {:.2}
stddev: {:.2}
""".format(100 * best_accuracy,
best_hyperparameters["learning_rate"],
best_hyperparameters["batch_size"],
best_hyperparameters["dropout"],
best_hyperparameters["stddev"]))

View file

@ -1,100 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
import argparse
from tensorflow.examples.tutorials.mnist import input_data
import objective
parser = argparse.ArgumentParser(description="Run the hyperparameter "
"optimization example.")
parser.add_argument("--trials", default=2, type=int,
help="The number of random trials to do.")
parser.add_argument("--steps", default=10, type=int,
help="The number of steps of training to do per network.")
parser.add_argument("--redis-address", default=None, type=str,
help="The Redis address of the cluster.")
if __name__ == "__main__":
args = parser.parse_args()
ray.init(redis_address=args.redis_address)
# The number of sets of random hyperparameters to try.
trials = args.trials
# The number of training passes over the dataset to use for network.
steps = args.steps
# Load the mnist data and turn the data into remote objects.
print("Downloading the MNIST dataset. This may take a minute.")
mnist = input_data.read_data_sets("MNIST_data", one_hot=True)
train_images = ray.put(mnist.train.images)
train_labels = ray.put(mnist.train.labels)
validation_images = ray.put(mnist.validation.images)
validation_labels = ray.put(mnist.validation.labels)
# Keep track of the best hyperparameters and the best accuracy.
best_hyperparameters = None
best_accuracy = 0
# This list holds the object IDs for all of the experiments that we have
# launched and that have not yet been processed.
remaining_ids = []
# This is a dictionary mapping the object ID of an experiment to the
# hyerparameters used for that experiment.
hyperparameters_mapping = {}
# A function for generating random hyperparameters.
def generate_hyperparameters():
return {"learning_rate": 10 ** np.random.uniform(-5, 5),
"batch_size": np.random.randint(1, 100),
"dropout": np.random.uniform(0, 1),
"stddev": 10 ** np.random.uniform(-5, 5)}
# Randomly generate some hyperparameters, and launch a task for each set.
for i in range(trials):
hyperparameters = generate_hyperparameters()
accuracy_id = objective.train_cnn_and_compute_accuracy.remote(
hyperparameters, steps, train_images, train_labels,
validation_images, validation_labels)
remaining_ids.append(accuracy_id)
# Keep track of which hyperparameters correspond to this experiment.
hyperparameters_mapping[accuracy_id] = hyperparameters
# Fetch and print the results of the tasks in the order that they complete.
for i in range(trials):
# Use ray.wait to get the object ID of the first task that completes.
ready_ids, remaining_ids = ray.wait(remaining_ids)
# Process the output of this task.
result_id = ready_ids[0]
hyperparameters = hyperparameters_mapping[result_id]
accuracy, _ = ray.get(result_id)
print("""We achieve accuracy {:.3}% with
learning_rate: {:.2}
batch_size: {}
dropout: {:.2}
stddev: {:.2}
""".format(100 * accuracy,
hyperparameters["learning_rate"],
hyperparameters["batch_size"],
hyperparameters["dropout"],
hyperparameters["stddev"]))
if accuracy > best_accuracy:
best_hyperparameters = hyperparameters
best_accuracy = accuracy
# Record the best performing set of hyperparameters.
print("""Best accuracy over {} trials was {:.3} with
learning_rate: {:.2}
batch_size: {}
dropout: {:.2}
stddev: {:.2}
""".format(trials, 100 * best_accuracy,
best_hyperparameters["learning_rate"],
best_hyperparameters["batch_size"],
best_hyperparameters["dropout"],
best_hyperparameters["stddev"]))

View file

@ -1,127 +0,0 @@
# Most of the tensorflow code is adapted from Tensorflow's tutorial on using
# CNNs to train MNIST
# https://www.tensorflow.org/versions/r0.9/tutorials/mnist/pros/index.html#build-a-multilayer-convolutional-network. # noqa: E501
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import ray
import ray.experimental.tf_utils
def get_batch(data, batch_index, batch_size):
# This method currently drops data when num_data is not divisible by
# batch_size.
num_data = data.shape[0]
num_batches = num_data // batch_size
batch_index %= num_batches
return data[(batch_index * batch_size):((batch_index + 1) * batch_size)]
def weight(shape, stddev):
initial = tf.truncated_normal(shape, stddev=stddev)
return tf.Variable(initial)
def bias(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding="SAME")
def max_pool_2x2(x):
return tf.nn.max_pool(
x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME")
def cnn_setup(x, y, keep_prob, lr, stddev):
first_hidden = 32
second_hidden = 64
fc_hidden = 1024
W_conv1 = weight([5, 5, 1, first_hidden], stddev)
B_conv1 = bias([first_hidden])
x_image = tf.reshape(x, [-1, 28, 28, 1])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + B_conv1)
h_pool1 = max_pool_2x2(h_conv1)
W_conv2 = weight([5, 5, first_hidden, second_hidden], stddev)
b_conv2 = bias([second_hidden])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)
W_fc1 = weight([7 * 7 * second_hidden, fc_hidden], stddev)
b_fc1 = bias([fc_hidden])
h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * second_hidden])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
W_fc2 = weight([fc_hidden, 10], stddev)
b_fc2 = bias([10])
y_conv = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)
cross_entropy = tf.reduce_mean(
-tf.reduce_sum(y * tf.log(y_conv), reduction_indices=[1]))
correct_pred = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y, 1))
return (tf.train.AdamOptimizer(lr).minimize(cross_entropy),
tf.reduce_mean(tf.cast(correct_pred, tf.float32)), cross_entropy)
# Define a remote function that takes a set of hyperparameters as well as the
# data, consructs and trains a network, and returns the validation accuracy.
@ray.remote
def train_cnn_and_compute_accuracy(params,
steps,
train_images,
train_labels,
validation_images,
validation_labels,
weights=None):
# Extract the hyperparameters from the params dictionary.
learning_rate = params["learning_rate"]
batch_size = params["batch_size"]
keep = 1 - params["dropout"]
stddev = params["stddev"]
# Create the network and related variables.
with tf.Graph().as_default():
# Create the input placeholders for the network.
x = tf.placeholder(tf.float32, shape=[None, 784])
y = tf.placeholder(tf.float32, shape=[None, 10])
keep_prob = tf.placeholder(tf.float32)
# Create the network.
train_step, accuracy, loss = cnn_setup(x, y, keep_prob, learning_rate,
stddev)
# Do the training and evaluation.
with tf.Session() as sess:
# Use the TensorFlowVariables utility. This is only necessary if we
# want to set and get the weights.
variables = ray.experimental.tf_utils.TensorFlowVariables(
loss, sess)
# Initialize the network weights.
sess.run(tf.global_variables_initializer())
# If some network weights were passed in, set those.
if weights is not None:
variables.set_weights(weights)
# Do some steps of training.
for i in range(1, steps + 1):
# Fetch the next batch of data.
image_batch = get_batch(train_images, i, batch_size)
label_batch = get_batch(train_labels, i, batch_size)
# Do one step of training.
sess.run(
train_step,
feed_dict={
x: image_batch,
y: label_batch,
keep_prob: keep
})
# Training is done, so compute the validation accuracy and the
# current weights and return.
totalacc = accuracy.eval(feed_dict={
x: validation_images,
y: validation_labels,
keep_prob: 1.0
})
new_weights = variables.get_weights()
return float(totalacc), new_weights

34
doc/examples/overview.rst Normal file
View file

@ -0,0 +1,34 @@
Examples Overview
=================
.. customgalleryitem::
:tooltip: Build a simple parameter server using Ray.
:description: :doc:`/auto_examples/plot_parameter_server`
.. customgalleryitem::
:tooltip: Asynchronous Advantage Actor Critic agent using Ray.
:description: :doc:`/auto_examples/plot_example-a3c`
.. customgalleryitem::
:tooltip: Simple parallel asynchronous hyperparameter evaluation.
:description: :doc:`/auto_examples/plot_hyperparameter`
.. customgalleryitem::
:tooltip: Parallelizing a policy gradient calculation on OpenAI Gym Pong.
:description: :doc:`/auto_examples/plot_pong_example`
.. customgalleryitem::
:tooltip: Walkthrough of parallelizing the L-BFGS algorithm.
:description: :doc:`/auto_examples/plot_lbfgs`
.. customgalleryitem::
:tooltip: Implementing a simple news reader using Ray.
:description: :doc:`/auto_examples/plot_newsreader`
.. customgalleryitem::
:tooltip: Using Ray to train ResNet across multiple GPUs.
:description: :doc:`/auto_examples/plot_resnet`
.. customgalleryitem::
:tooltip: Implement a simple streaming application using Rays actors.
:description: :doc:`/auto_examples/plot_streaming`

View file

@ -113,7 +113,6 @@ Driver Code Walkthrough
The driver manages the coordination among workers and handles updating the
global model parameters. The main training script looks like the following.
.. code-block:: python
import numpy as np

View file

@ -0,0 +1,178 @@
"""
Simple Parallel Model Selection
===============================
In this example, we'll demonstrate how to quickly write a hyperparameter
tuning script that evaluates a set of hyperparameters in parallel.
This script will demonstrate how to use two important parts of the Ray API:
using ``ray.remote`` to define remote functions and ``ray.wait`` to wait for
their results to be ready.
.. important:: For a production-grade implementation of distributed
hyperparameter tuning, use `Tune`_, a scalable hyperparameter
tuning library built using Ray's Actor API.
.. _`Tune`: https://ray.readthedocs.io/en/latest/tune.html
"""
import os
import numpy as np
from filelock import FileLock
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import ray
ray.init()
# The number of sets of random hyperparameters to try.
num_evaluations = 10
# A function for generating random hyperparameters.
def generate_hyperparameters():
return {
"learning_rate": 10**np.random.uniform(-5, 1),
"batch_size": np.random.randint(1, 100),
"momentum": np.random.uniform(0, 1)
}
def get_data_loaders(batch_size):
mnist_transforms = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.1307, ), (0.3081, ))])
# We add FileLock here because multiple workers will want to
# download data, and this may cause overwrites since
# DataLoader is not threadsafe.
with FileLock(os.path.expanduser("~/data.lock")):
train_loader = torch.utils.data.DataLoader(
datasets.MNIST(
"~/data",
train=True,
download=True,
transform=mnist_transforms),
batch_size=batch_size,
shuffle=True)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST("~/data", train=False, transform=mnist_transforms),
batch_size=batch_size,
shuffle=True)
return train_loader, test_loader
class ConvNet(nn.Module):
"""Simple two layer Convolutional Neural Network."""
def __init__(self):
super(ConvNet, self).__init__()
self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
self.fc = nn.Linear(192, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 3))
x = x.view(-1, 192)
x = self.fc(x)
return F.log_softmax(x, dim=1)
def train(model, optimizer, train_loader, device=torch.device("cpu")):
"""Optimize the model with one pass over the data.
Cuts off at 1024 samples to simplify training.
"""
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
if batch_idx * len(data) > 1024:
return
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
def test(model, test_loader, device=torch.device("cpu")):
"""Checks the validation accuracy of the model.
Cuts off at 512 samples for simplicity.
"""
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (data, target) in enumerate(test_loader):
if batch_idx * len(data) > 512:
break
data, target = data.to(device), target.to(device)
outputs = model(data)
_, predicted = torch.max(outputs.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
return correct / total
@ray.remote
def evaluate_hyperparameters(config):
model = ConvNet()
train_loader, test_loader = get_data_loaders(config["batch_size"])
optimizer = optim.SGD(
model.parameters(),
lr=config["learning_rate"],
momentum=config["momentum"])
train(model, optimizer, train_loader)
return test(model, test_loader)
# Keep track of the best hyperparameters and the best accuracy.
best_hyperparameters = None
best_accuracy = 0
# A list holding the object IDs for all of the experiments that we have
# launched but have not yet been processed.
remaining_ids = []
# A dictionary mapping an experiment's object ID to its hyperparameters.
# hyerparameters used for that experiment.
hyperparameters_mapping = {}
# Randomly generate sets of hyperparameters and launch a task to test each set.
for i in range(num_evaluations):
hyperparameters = generate_hyperparameters()
accuracy_id = evaluate_hyperparameters.remote(hyperparameters)
remaining_ids.append(accuracy_id)
hyperparameters_mapping[accuracy_id] = hyperparameters
# Fetch and print the results of the tasks in the order that they complete.
while remaining_ids:
# Use ray.wait to get the object ID of the first task that completes.
done_ids, remaining_ids = ray.wait(remaining_ids)
# There is only one return result by default.
result_id = done_ids[0]
hyperparameters = hyperparameters_mapping[result_id]
accuracy = ray.get(result_id)
print("""We achieve accuracy {:.3}% with
learning_rate: {:.2}
batch_size: {}
momentum: {:.2}
""".format(100 * accuracy, hyperparameters["learning_rate"],
hyperparameters["batch_size"], hyperparameters["momentum"]))
if accuracy > best_accuracy:
best_hyperparameters = hyperparameters
best_accuracy = accuracy
# Record the best performing set of hyperparameters.
print("""Best accuracy over {} trials was {:.3} with
learning_rate: {:.2}
batch_size: {}
momentum: {:.2}
""".format(num_evaluations, 100 * best_accuracy,
best_hyperparameters["learning_rate"],
best_hyperparameters["batch_size"],
best_hyperparameters["momentum"]))

View file

@ -0,0 +1,289 @@
"""
Parameter Server
================
The parameter server is a framework for distributed machine learning training.
In the parameter server framework, a centralized server (or group of server
nodes) maintains global shared parameters of a machine-learning model
(e.g., a neural network) while the data and computation of calculating
updates (i.e., gradient descent updates) are distributed over worker nodes.
.. image:: ../images/param_actor.png
:align: center
Parameter servers are a core part of many machine learning applications. This
document walks through how to implement simple synchronous and asynchronous
parameter servers using Ray actors.
To run the application, first install some dependencies.
.. code-block:: bash
pip install torch torchvision filelock
Let's first define some helper functions and import some dependencies.
"""
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from filelock import FileLock
import numpy as np
import ray
def get_data_loader():
"""Safely downloads data. Returns training/validation set dataloader."""
mnist_transforms = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.1307, ), (0.3081, ))])
# We add FileLock here because multiple workers will want to
# download data, and this may cause overwrites since
# DataLoader is not threadsafe.
with FileLock(os.path.expanduser("~/data.lock")):
train_loader = torch.utils.data.DataLoader(
datasets.MNIST(
"~/data",
train=True,
download=True,
transform=mnist_transforms),
batch_size=128,
shuffle=True)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST("~/data", train=False, transform=mnist_transforms),
batch_size=128,
shuffle=True)
return train_loader, test_loader
def evaluate(model, test_loader):
"""Evaluates the accuracy of the model on a validation dataset."""
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (data, target) in enumerate(test_loader):
# This is only set to finish evaluation faster.
if batch_idx * len(data) > 1024:
break
outputs = model(data)
_, predicted = torch.max(outputs.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
return 100. * correct / total
#######################################################################
# Setup: Defining the Neural Network
# ----------------------------------
#
# We define a small neural network to use in training. We provide
# some helper functions for obtaining data, including getter/setter
# methods for gradients and weights.
class ConvNet(nn.Module):
"""Small ConvNet for MNIST."""
def __init__(self):
super(ConvNet, self).__init__()
self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
self.fc = nn.Linear(192, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 3))
x = x.view(-1, 192)
x = self.fc(x)
return F.log_softmax(x, dim=1)
def get_weights(self):
return {k: v.cpu() for k, v in self.state_dict().items()}
def set_weights(self, weights):
self.load_state_dict(weights)
def get_gradients(self):
grads = []
for p in self.parameters():
grad = None if p.grad is None else p.grad.data.cpu().numpy()
grads.append(grad)
return grads
def set_gradients(self, gradients):
for g, p in zip(gradients, self.parameters()):
if g is not None:
p.grad = torch.from_numpy(g)
###########################################################################
# Defining the Parameter Server
# -----------------------------
#
# The parameter server will hold a copy of the model.
# During training, it will:
#
# 1. Receive gradients and apply them to its model.
#
# 2. Send the updated model back to the workers.
#
# The ``@ray.remote`` decorator defines a remote process. It wraps the
# ParameterServer class and allows users to instantiate it as a
# remote actor.
@ray.remote
class ParameterServer(object):
def __init__(self, lr):
self.model = ConvNet()
self.optimizer = torch.optim.SGD(self.model.parameters(), lr=lr)
def apply_gradients(self, *gradients):
summed_gradients = [
np.stack(gradient_zip).sum(axis=0)
for gradient_zip in zip(*gradients)
]
self.optimizer.zero_grad()
self.model.set_gradients(summed_gradients)
self.optimizer.step()
return self.model.get_weights()
def get_weights(self):
return self.model.get_weights()
###########################################################################
# Defining the Worker
# -------------------
# The worker will also hold a copy of the model. During training. it will
# continuously evaluate data and send gradients
# to the parameter server. The worker will synchronize its model with the
# Parameter Server model weights.
@ray.remote
class DataWorker(object):
def __init__(self):
self.model = ConvNet()
self.data_iterator = iter(get_data_loader()[0])
def compute_gradients(self, weights):
self.model.set_weights(weights)
try:
data, target = next(self.data_iterator)
except StopIteration: # When the epoch ends, start a new epoch.
self.data_iterator = iter(get_data_loader()[0])
data, target = next(self.data_iterator)
self.model.zero_grad()
output = self.model(data)
loss = F.nll_loss(output, target)
loss.backward()
return self.model.get_gradients()
###########################################################################
# Synchronous Parameter Server Training
# -------------------------------------
# We'll now create a synchronous parameter server training scheme. We'll first
# instantiate a process for the parameter server, along with multiple
# workers.
iterations = 200
num_workers = 2
ray.init(ignore_reinit_error=True)
ps = ParameterServer.remote(1e-2)
workers = [DataWorker.remote() for i in range(num_workers)]
###########################################################################
# We'll also instantiate a model on the driver process to evaluate the test
# accuracy during training.
model = ConvNet()
test_loader = get_data_loader()[1]
###########################################################################
# Training alternates between:
#
# 1. Computing the gradients given the current weights from the server
# 2. Updating the parameter server's weights with the gradients.
print("Running synchronous parameter server training.")
current_weights = ps.get_weights.remote()
for i in range(iterations):
gradients = [
worker.compute_gradients.remote(current_weights) for worker in workers
]
# Calculate update after all gradients are available.
current_weights = ps.apply_gradients.remote(*gradients)
if i % 10 == 0:
# Evaluate the current model.
model.set_weights(ray.get(current_weights))
accuracy = evaluate(model, test_loader)
print("Iter {}: \taccuracy is {:.1f}".format(i, accuracy))
print("Final accuracy is {:.1f}.".format(accuracy))
# Clean up Ray resources and processes before the next example.
ray.shutdown()
###########################################################################
# Asynchronous Parameter Server Training
# --------------------------------------
# We'll now create a synchronous parameter server training scheme. We'll first
# instantiate a process for the parameter server, along with multiple
# workers.
print("Running Asynchronous Parameter Server Training.")
ray.init(ignore_reinit_error=True)
ps = ParameterServer.remote(1e-2)
workers = [DataWorker.remote() for i in range(num_workers)]
###########################################################################
# Here, workers will asynchronously compute the gradients given its
# current weights and send these gradients to the parameter server as
# soon as they are ready. When the Parameter server finishes applying the
# new gradient, the server will send back a copy of the current weights to the
# worker. The worker will then update the weights and repeat.
current_weights = ps.get_weights.remote()
gradients = {}
for worker in workers:
gradients[worker.compute_gradients.remote(current_weights)] = worker
for i in range(iterations * num_workers):
ready_gradient_list, _ = ray.wait(list(gradients))
ready_gradient_id = ready_gradient_list[0]
worker = gradients.pop(ready_gradient_id)
# Compute and apply gradients.
current_weights = ps.apply_gradients.remote(*[ready_gradient_id])
gradients[worker.compute_gradients.remote(current_weights)] = worker
if i % 10 == 0:
# Evaluate the current model after every 10 updates.
model.set_weights(ray.get(current_weights))
accuracy = evaluate(model, test_loader)
print("Iter {}: \taccuracy is {:.1f}".format(i, accuracy))
print("Final accuracy is {:.1f}.".format(accuracy))
##############################################################################
# Final Thoughts
# --------------
#
# This approach is powerful because it enables you to implement a parameter
# server with a few lines of code as part of a Python application.
# As a result, this simplifies the deployment of applications that use
# parameter servers and to modify the behavior of the parameter server.
#
# For example, sharding the parameter server, changing the update rule,
# switch between asynchronous and synchronous updates, ignoring
# straggler workers, or any number of other customizations,
# will only require a few extra lines of code.

View file

@ -0,0 +1,293 @@
# flake8: noqa
"""
Learning to Play Pong
=====================
In this example, we'll train a **very simple** neural network to play Pong using
the OpenAI Gym.
At a high level, we will use multiple Ray actors to obtain simulation rollouts
and calculate gradient simultaneously. We will then centralize these
gradients and update the neural network. The updated neural network will
then be passed back to each Ray actor for more gradient calculation.
This application is adapted, with minimal modifications, from
Andrej Karpathy's `source code`_ (see the accompanying `blog post`_).
To run the application, first install some dependencies.
.. code-block:: bash
pip install gym[atari]
At the moment, on a large machine with 64 physical cores, computing an update
with a batch of size 1 takes about 1 second, a batch of size 10 takes about 2.5
seconds. A batch of size 60 takes about 3 seconds. On a cluster with 11 nodes,
each with 18 physical cores, a batch of size 300 takes about 10 seconds. If the
numbers you see differ from these by much, take a look at the
**Troubleshooting** section at the bottom of this page and consider `submitting
an issue`_.
.. _`source code`: https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5
.. _`blog post`: http://karpathy.github.io/2016/05/31/rl/
.. _`submitting an issue`: https://github.com/ray-project/ray/issues
**Note** that these times depend on how long the rollouts take, which in turn
depends on how well the policy is doing. For example, a really bad policy will
lose very quickly. As the policy learns, we should expect these numbers to
increase.
"""
import numpy as np
import os
import ray
import time
import gym
##############################################################################
# Hyperparameters
# ---------------
#
# Here we'll define a couple of the hyperparameters that are used.
H = 200 # The number of hidden layer neurons.
gamma = 0.99 # The discount factor for reward.
decay_rate = 0.99 # The decay factor for RMSProp leaky sum of grad^2.
D = 80 * 80 # The input dimensionality: 80x80 grid.
learning_rate = 1e-4 # Magnitude of the update.
#############################################################################
# Helper Functions
# ----------------
#
# We first define a few helper functions:
#
# 1. Preprocessing: The ``preprocess`` function will
# preprocess the original 210x160x3 uint8 frame into a one-dimensional 6400
# float vector.
#
# 2. Reward Processing: The ``process_rewards`` function will calculate
# a discounted reward. This formula states that the "value" of a
# sampled action is the weighted sum of all rewards afterwards,
# but later rewards are exponentially less important.
#
# 3. Rollout: The ``rollout`` function plays an entire game of Pong (until
# either the computer or the RL agent loses).
def preprocess(img):
# Crop the image.
img = img[35:195]
# Downsample by factor of 2.
img = img[::2, ::2, 0]
# Erase background (background type 1).
img[img == 144] = 0
# Erase background (background type 2).
img[img == 109] = 0
# Set everything else (paddles, ball) to 1.
img[img != 0] = 1
return img.astype(np.float).ravel()
def process_rewards(r):
"""Compute discounted reward from a vector of rewards."""
discounted_r = np.zeros_like(r)
running_add = 0
for t in reversed(range(0, r.size)):
# Reset the sum, since this was a game boundary (pong specific!).
if r[t] != 0:
running_add = 0
running_add = running_add * gamma + r[t]
discounted_r[t] = running_add
return discounted_r
def rollout(model, env):
"""Evaluates env and model until the env returns "Done".
Returns:
xs: A list of observations
hs: A list of model hidden states per observation
dlogps: A list of gradients
drs: A list of rewards.
"""
# Reset the game.
observation = env.reset()
# Note that prev_x is used in computing the difference frame.
prev_x = None
xs, hs, dlogps, drs = [], [], [], []
done = False
while not done:
cur_x = preprocess(observation)
x = cur_x - prev_x if prev_x is not None else np.zeros(D)
prev_x = cur_x
aprob, h = model.policy_forward(x)
# Sample an action.
action = 2 if np.random.uniform() < aprob else 3
# The observation.
xs.append(x)
# The hidden state.
hs.append(h)
y = 1 if action == 2 else 0 # A "fake label".
# The gradient that encourages the action that was taken to be
# taken (see http://cs231n.github.io/neural-networks-2/#losses if
# confused).
dlogps.append(y - aprob)
observation, reward, done, info = env.step(action)
# Record reward (has to be done after we call step() to get reward
# for previous action).
drs.append(reward)
return xs, hs, dlogps, drs
##############################################################################
# Neural Network
# --------------
# Here, a neural network is used to define a "policy"
# for playing Pong (that is, a function that chooses an action given a state).
#
# To implement a neural network in NumPy, we need to provide helper functions
# for calculating updates and computing the output of the neural network
# given an input, which in our case is an observation.
class Model():
"""This class holds the neural network weights."""
def __init__(self):
self.weights = {}
self.weights["W1"] = np.random.randn(H, D) / np.sqrt(D)
self.weights["W2"] = np.random.randn(H) / np.sqrt(H)
def policy_forward(self, x):
h = np.dot(self.weights["W1"], x)
h[h < 0] = 0 # ReLU nonlinearity.
logp = np.dot(self.weights["W2"], h)
# Softmax
p = 1.0 / (1.0 + np.exp(-logp))
# Return probability of taking action 2, and hidden state.
return p, h
def policy_backward(self, eph, epx, epdlogp):
"""Backward pass to calculate gradients.
Arguments:
eph: Array of intermediate hidden states.
epx: Array of experiences (observations.
epdlogp: Array of logps (output of last layer before softmax/
"""
dW2 = np.dot(eph.T, epdlogp).ravel()
dh = np.outer(epdlogp, self.weights["W2"])
# Backprop relu.
dh[eph <= 0] = 0
dW1 = np.dot(dh.T, epx)
return {"W1": dW1, "W2": dW2}
def update(self, grad_buffer, rmsprop_cache, lr, decay):
"""Applies the gradients to the model parameters with RMSProp."""
for k, v in self.weights.items():
g = grad_buffer[k]
rmsprop_cache[k] = (decay * rmsprop_cache[k] + (1 - decay) * g**2)
self.weights[k] += lr * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
def zero_grads(grad_buffer):
"""Reset the batch gradient buffer."""
for k, v in grad_buffer.items():
grad_buffer[k] = np.zeros_like(v)
#############################################################################
# Parallelizing Gradients
# -----------------------
# We define an **actor**, which is responsible for taking a model and an env
# and performing a rollout + computing a gradient update.
ray.init()
@ray.remote
class RolloutWorker(object):
def __init__(self):
# Tell numpy to only use one core. If we don't do this, each actor may
# try to use all of the cores and the resulting contention may result
# in no speedup over the serial version. Note that if numpy is using
# OpenBLAS, then you need to set OPENBLAS_NUM_THREADS=1, and you
# probably need to do it from the command line (so it happens before
# numpy is imported).
os.environ["MKL_NUM_THREADS"] = "1"
self.env = gym.make("Pong-v0")
def compute_gradient(self, model):
# Compute a simulation episode.
xs, hs, dlogps, drs = rollout(model, self.env)
reward_sum = sum(drs)
# Vectorize the arrays.
epx = np.vstack(xs)
eph = np.vstack(hs)
epdlogp = np.vstack(dlogps)
epr = np.vstack(drs)
# Compute the discounted reward backward through time.
discounted_epr = process_rewards(epr)
# Standardize the rewards to be unit normal (helps control the gradient
# estimator variance).
discounted_epr -= np.mean(discounted_epr)
discounted_epr /= np.std(discounted_epr)
# Modulate the gradient with advantage (the policy gradient magic
# happens right here).
epdlogp *= discounted_epr
return model.policy_backward(eph, epx, epdlogp), reward_sum
#############################################################################
# Running
# -------
#
# This example is easy to parallelize because the network can play ten games
# in parallel and no information needs to be shared between the games.
#
# In the loop, the network repeatedly plays games of Pong and
# records a gradient from each game. Every ten games, the gradients are
# combined together and used to update the network.
iterations = 20
batch_size = 4
model = Model()
actors = [RolloutWorker.remote() for _ in range(batch_size)]
running_reward = None
# "Xavier" initialization.
# Update buffers that add up gradients over a batch.
grad_buffer = {k: np.zeros_like(v) for k, v in model.weights.items()}
# Update the rmsprop memory.
rmsprop_cache = {k: np.zeros_like(v) for k, v in model.weights.items()}
for i in range(1, 1 + iterations):
model_id = ray.put(model)
gradient_ids = []
# Launch tasks to compute gradients from multiple rollouts in parallel.
start_time = time.time()
gradient_ids = [
actor.compute_gradient.remote(model_id) for actor in actors
]
for batch in range(batch_size):
[grad_id], gradient_ids = ray.wait(gradient_ids)
grad, reward_sum = ray.get(grad_id)
# Accumulate the gradient over batch.
for k in model.weights:
grad_buffer[k] += grad[k]
running_reward = (reward_sum if running_reward is None else
running_reward * 0.99 + reward_sum * 0.01)
end_time = time.time()
print("Batch {} computed {} rollouts in {} seconds, "
"running mean is {}".format(i, batch_size, end_time - start_time,
running_reward))
model.update(grad_buffer, rmsprop_cache, learning_rate, decay_rate)
zero_grads(grad_buffer)

View file

@ -34,8 +34,8 @@ def build_data(data_path, size, dataset):
def load_transform(value):
# Convert these examples to dense labels and processed images.
record = tf.reshape(tf.decode_raw(value, tf.uint8), [record_bytes])
label = tf.cast(tf.slice(record, [label_offset], [label_bytes]),
tf.int32)
label = tf.cast(
tf.slice(record, [label_offset], [label_bytes]), tf.int32)
# Convert from string to [depth * height * width] to
# [depth, height, width].
depth_major = tf.reshape(
@ -44,10 +44,11 @@ def build_data(data_path, size, dataset):
# Convert from [depth, height, width] to [height, width, depth].
image = tf.cast(tf.transpose(depth_major, [1, 2, 0]), tf.float32)
return (image, label)
# Read examples from files in the filename queue.
data_files = tf.gfile.Glob(data_path)
data = tf.contrib.data.FixedLengthRecordDataset(data_files,
record_bytes=record_bytes)
data = tf.contrib.data.FixedLengthRecordDataset(
data_files, record_bytes=record_bytes)
data = data.map(load_transform)
data = data.batch(size)
iterator = data.make_one_shot_iterator()
@ -102,8 +103,7 @@ def build_input(data, batch_size, dataset, train):
labels = tf.reshape(labels, [batch_size, 1])
indices = tf.reshape(tf.range(0, batch_size, 1), [batch_size, 1])
labels = tf.sparse_to_dense(
tf.concat([indices, labels], 1),
[batch_size, num_classes], 1.0, 0.0)
tf.concat([indices, labels], 1), [batch_size, num_classes], 1.0, 0.0)
assert len(images.get_shape()) == 4
assert images.get_shape()[0] == batch_size

View file

@ -1,213 +0,0 @@
# This code is copied and adapted from Andrej Karpathy's code for learning to
# play Pong https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import numpy as np
import os
import ray
import time
import gym
# Define some hyperparameters.
# The number of hidden layer neurons.
H = 200
learning_rate = 1e-4
# Discount factor for reward.
gamma = 0.99
# The decay factor for RMSProp leaky sum of grad^2.
decay_rate = 0.99
# The input dimensionality: 80x80 grid.
D = 80 * 80
def sigmoid(x):
# Sigmoid "squashing" function to interval [0, 1].
return 1.0 / (1.0 + np.exp(-x))
def preprocess(img):
"""Preprocess 210x160x3 uint8 frame into 6400 (80x80) 1D float vector."""
# Crop the image.
img = img[35:195]
# Downsample by factor of 2.
img = img[::2, ::2, 0]
# Erase background (background type 1).
img[img == 144] = 0
# Erase background (background type 2).
img[img == 109] = 0
# Set everything else (paddles, ball) to 1.
img[img != 0] = 1
return img.astype(np.float).ravel()
def discount_rewards(r):
"""take 1D float array of rewards and compute discounted reward"""
discounted_r = np.zeros_like(r)
running_add = 0
for t in reversed(range(0, r.size)):
# Reset the sum, since this was a game boundary (pong specific!).
if r[t] != 0:
running_add = 0
running_add = running_add * gamma + r[t]
discounted_r[t] = running_add
return discounted_r
def policy_forward(x, model):
h = np.dot(model["W1"], x)
h[h < 0] = 0 # ReLU nonlinearity.
logp = np.dot(model["W2"], h)
p = sigmoid(logp)
# Return probability of taking action 2, and hidden state.
return p, h
def policy_backward(eph, epx, epdlogp, model):
"""backward pass. (eph is array of intermediate hidden states)"""
dW2 = np.dot(eph.T, epdlogp).ravel()
dh = np.outer(epdlogp, model["W2"])
# Backprop relu.
dh[eph <= 0] = 0
dW1 = np.dot(dh.T, epx)
return {"W1": dW1, "W2": dW2}
@ray.remote
class PongEnv(object):
def __init__(self):
# Tell numpy to only use one core. If we don't do this, each actor may
# try to use all of the cores and the resulting contention may result
# in no speedup over the serial version. Note that if numpy is using
# OpenBLAS, then you need to set OPENBLAS_NUM_THREADS=1, and you
# probably need to do it from the command line (so it happens before
# numpy is imported).
os.environ["MKL_NUM_THREADS"] = "1"
self.env = gym.make("Pong-v0")
def compute_gradient(self, model):
# Reset the game.
observation = self.env.reset()
# Note that prev_x is used in computing the difference frame.
prev_x = None
xs, hs, dlogps, drs = [], [], [], []
reward_sum = 0
done = False
while not done:
cur_x = preprocess(observation)
x = cur_x - prev_x if prev_x is not None else np.zeros(D)
prev_x = cur_x
aprob, h = policy_forward(x, model)
# Sample an action.
action = 2 if np.random.uniform() < aprob else 3
# The observation.
xs.append(x)
# The hidden state.
hs.append(h)
y = 1 if action == 2 else 0 # A "fake label".
# The gradient that encourages the action that was taken to be
# taken (see http://cs231n.github.io/neural-networks-2/#losses if
# confused).
dlogps.append(y - aprob)
observation, reward, done, info = self.env.step(action)
reward_sum += reward
# Record reward (has to be done after we call step() to get reward
# for previous action).
drs.append(reward)
epx = np.vstack(xs)
eph = np.vstack(hs)
epdlogp = np.vstack(dlogps)
epr = np.vstack(drs)
# Reset the array memory.
xs, hs, dlogps, drs = [], [], [], []
# Compute the discounted reward backward through time.
discounted_epr = discount_rewards(epr)
# Standardize the rewards to be unit normal (helps control the gradient
# estimator variance).
discounted_epr -= np.mean(discounted_epr)
discounted_epr /= np.std(discounted_epr)
# Modulate the gradient with advantage (the policy gradient magic
# happens right here).
epdlogp *= discounted_epr
return policy_backward(eph, epx, epdlogp, model), reward_sum
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Train an RL agent on Pong.")
parser.add_argument(
"--batch-size",
default=10,
type=int,
help="The number of rollouts to do per batch.")
parser.add_argument(
"--redis-address",
default=None,
type=str,
help="The Redis address of the cluster.")
parser.add_argument(
"--iterations",
default=-1,
type=int,
help="The number of model updates to perform. By "
"default, training will not terminate.")
args = parser.parse_args()
batch_size = args.batch_size
ray.init(redis_address=args.redis_address)
# Run the reinforcement learning.
running_reward = None
batch_num = 1
model = {}
# "Xavier" initialization.
model["W1"] = np.random.randn(H, D) / np.sqrt(D)
model["W2"] = np.random.randn(H) / np.sqrt(H)
# Update buffers that add up gradients over a batch.
grad_buffer = {k: np.zeros_like(v) for k, v in model.items()}
# Update the rmsprop memory.
rmsprop_cache = {k: np.zeros_like(v) for k, v in model.items()}
actors = [PongEnv.remote() for _ in range(batch_size)]
iteration = 0
while iteration != args.iterations:
iteration += 1
model_id = ray.put(model)
actions = []
# Launch tasks to compute gradients from multiple rollouts in parallel.
start_time = time.time()
for i in range(batch_size):
action_id = actors[i].compute_gradient.remote(model_id)
actions.append(action_id)
for i in range(batch_size):
action_id, actions = ray.wait(actions)
grad, reward_sum = ray.get(action_id[0])
# Accumulate the gradient over batch.
for k in model:
grad_buffer[k] += grad[k]
running_reward = (reward_sum if running_reward is None else
running_reward * 0.99 + reward_sum * 0.01)
end_time = time.time()
print("Batch {} computed {} rollouts in {} seconds, "
"running mean is {}".format(batch_num, batch_size,
end_time - start_time,
running_reward))
for k, v in model.items():
g = grad_buffer[k]
rmsprop_cache[k] = (
decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2)
model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
# Reset the batch gradient buffer.
grad_buffer[k] = np.zeros_like(v)
batch_num += 1

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

View file

@ -12,10 +12,13 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
import glob
import shutil
import sys
import os
import urllib
import shlex
sys.path.insert(0, os.path.abspath('.'))
from custom_directives import CustomGalleryItemDirective
# These lines added to enable Sphinx to work without installing Ray.
import mock
@ -67,13 +70,33 @@ sys.path.insert(0, os.path.abspath("../../python/"))
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.viewcode',
'sphinx.ext.napoleon',
'sphinx_click.ext',
'sphinx-jsonschema',
'sphinx.ext.autodoc', 'sphinx.ext.viewcode', 'sphinx.ext.napoleon',
'sphinx_click.ext', 'sphinx-jsonschema', 'sphinx_gallery.gen_gallery'
]
sphinx_gallery_conf = {
"examples_dirs": ["../examples"], # path to example scripts
"gallery_dirs": ["auto_examples"], # path where to save generated examples
"ignore_pattern": "../examples/doc_code/",
"plot_gallery": "False",
# "filename_pattern": "tutorial.py",
"backreferences_dir": False
# "show_memory': False,
# 'min_reported_time': False
}
for i in range(len(sphinx_gallery_conf["examples_dirs"])):
gallery_dir = sphinx_gallery_conf["gallery_dirs"][i]
source_dir = sphinx_gallery_conf["examples_dirs"][i]
try:
os.mkdir(gallery_dir)
except OSError:
pass
# Copy rst files from source dir to gallery dir.
for f in glob.glob(os.path.join(source_dir, '*.rst')):
shutil.copy(f, gallery_dir)
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
@ -95,7 +118,7 @@ master_doc = 'index'
# General information about the project.
project = u'Ray'
copyright = u'2016, The Ray Team'
copyright = u'2019, The Ray Team'
author = u'The Ray Team'
# The version info for the project you're documenting, acts as replacement for
@ -123,6 +146,8 @@ language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = ['_build']
exclude_patterns += sphinx_gallery_conf['examples_dirs']
exclude_patterns += ["*/README.rst"]
# The reST default role (used for this markup: `text`) to use for all
# documents.
@ -354,5 +379,10 @@ def update_context(app, pagename, templatename, context, doctree):
pagename)
# see also http://searchvoidstar.tumblr.com/post/125486358368/making-pdfs-from-markdown-on-readthedocsorg-using
def setup(app):
app.connect('html-page-context', update_context)
# Custom directives
app.add_directive('customgalleryitem', CustomGalleryItemDirective)

View file

@ -0,0 +1,94 @@
# Originally from:
# github.com/pytorch/tutorials/blob/60d6ef365e36f3ba82c2b61bf32cc40ac4e86c7b/custom_directives.py # noqa
from docutils.parsers.rst import Directive, directives
from docutils.statemachine import StringList
from docutils import nodes
import os
import sphinx_gallery
try:
FileNotFoundError
except NameError:
FileNotFoundError = IOError
GALLERY_TEMPLATE = """
.. raw:: html
<div class="sphx-glr-thumbcontainer" tooltip="{tooltip}">
.. only:: html
.. figure:: {thumbnail}
{description}
.. raw:: html
</div>
"""
class CustomGalleryItemDirective(Directive):
"""Create a sphinx gallery style thumbnail.
tooltip and figure are self explanatory. Description could be a link to
a document like in below example.
Example usage:
.. customgalleryitem::
:tooltip: I am writing this tutorial to focus specifically on NLP.
:figure: /_static/img/thumbnails/babel.jpg
:description: :doc:`/beginner/deep_learning_nlp_tutorial`
If figure is specified, a thumbnail will be made out of it and stored in
_static/thumbs. Therefore, consider _static/thumbs as a "built" directory.
"""
required_arguments = 0
optional_arguments = 0
final_argument_whitespace = True
option_spec = {
"tooltip": directives.unchanged,
"figure": directives.unchanged,
"description": directives.unchanged
}
has_content = False
add_index = False
def run(self):
# Cutoff the `tooltip` after 195 chars.
if "tooltip" in self.options:
tooltip = self.options["tooltip"]
if len(self.options["tooltip"]) > 195:
tooltip = tooltip[:195] + "..."
else:
raise ValueError("Need to provide :tooltip: under "
"`.. customgalleryitem::`.")
# Generate `thumbnail` used in the gallery.
if "figure" in self.options:
env = self.state.document.settings.env
rel_figname, figname = env.relfn2path(self.options["figure"])
thumbnail = os.path.join("_static/thumbs/",
os.path.basename(figname))
os.makedirs("_static/thumbs", exist_ok=True)
sphinx_gallery.gen_rst.scale_image(figname, thumbnail, 400, 280)
else:
thumbnail = "/_static/img/thumbnails/default.png"
if "description" in self.options:
description = self.options["description"]
else:
raise ValueError("Need to provide :description: under "
"`customgalleryitem::`.")
thumbnail_rst = GALLERY_TEMPLATE.format(
tooltip=tooltip, thumbnail=thumbnail, description=description)
thumbnail = StringList(thumbnail_rst.split("\n"))
thumb = nodes.paragraph()
self.state.nested_parse(thumbnail, self.content_offset, thumb)
return [thumb]

View file

@ -1,127 +0,0 @@
Parameter Server
================
This document walks through how to implement simple synchronous and asynchronous
parameter servers using actors. To run the application, first install some
dependencies.
.. code-block:: bash
pip install tensorflow
You can view the `code for this example`_.
.. _`code for this example`: https://github.com/ray-project/ray/tree/master/doc/examples/parameter_server
The examples can be run as follows.
.. code-block:: bash
# Run the asynchronous parameter server.
python ray/doc/examples/parameter_server/async_parameter_server.py --num-workers=4
# Run the synchronous parameter server.
python ray/doc/examples/parameter_server/sync_parameter_server.py --num-workers=4
Note that this examples uses distributed actor handles, which are still
considered experimental.
Asynchronous Parameter Server
-----------------------------
The asynchronous parameter server itself is implemented as an actor, which
exposes the methods ``push`` and ``pull``.
.. code-block:: python
@ray.remote
class ParameterServer(object):
def __init__(self, keys, values):
values = [value.copy() for value in values]
self.weights = dict(zip(keys, values))
def push(self, keys, values):
for key, value in zip(keys, values):
self.weights[key] += value
def pull(self, keys):
return [self.weights[key] for key in keys]
We then define a worker task, which take a parameter server as an argument and
submits tasks to it. The structure of the code looks as follows.
.. code-block:: python
@ray.remote
def worker_task(ps):
while True:
# Get the latest weights from the parameter server.
weights = ray.get(ps.pull.remote(keys))
# Compute an update.
...
# Push the update to the parameter server.
ps.push.remote(keys, update)
Then we can create a parameter server and initiate training as follows.
.. code-block:: python
ps = ParameterServer.remote(keys, initial_values)
worker_tasks = [worker_task.remote(ps) for _ in range(4)]
Synchronous Parameter Server
----------------------------
The parameter server is implemented as an actor, which exposes the
methods ``apply_gradients`` and ``get_weights``. A constant linear scaling
rule is applied by scaling the learning rate by the number of workers.
.. code-block:: python
@ray.remote
class ParameterServer(object):
def __init__(self, learning_rate):
self.net = model.SimpleCNN(learning_rate=learning_rate)
def apply_gradients(self, *gradients):
self.net.apply_gradients(np.mean(gradients, axis=0))
return self.net.variables.get_flat()
def get_weights(self):
return self.net.variables.get_flat()
Workers are actors which expose the method ``compute_gradients``.
.. code-block:: python
@ray.remote
class Worker(object):
def __init__(self, worker_index, batch_size=50):
self.worker_index = worker_index
self.batch_size = batch_size
self.mnist = input_data.read_data_sets("MNIST_data", one_hot=True,
seed=worker_index)
self.net = model.SimpleCNN()
def compute_gradients(self, weights):
self.net.variables.set_flat(weights)
xs, ys = self.mnist.train.next_batch(self.batch_size)
return self.net.compute_gradients(xs, ys)
Training alternates between computing the gradients given the current weights
from the parameter server and updating the parameter server's weights with the
resulting gradients.
.. code-block:: python
while True:
gradients = [worker.compute_gradients.remote(current_weights)
for worker in workers]
current_weights = ps.apply_gradients.remote(*gradients)
Both of these examples implement the parameter server using a single actor,
however they can be easily extended to **split the parameters across multiple
actors**.

View file

@ -1,118 +0,0 @@
Learning to Play Pong
=====================
In this example, we'll train a **very simple** neural network to play Pong using
the OpenAI Gym. This application is adapted, with minimal modifications, from
Andrej Karpathy's `code`_ (see the accompanying `blog post`_).
You can view the `code for this example`_.
To run the application, first install some dependencies.
.. code-block:: bash
pip install gym[atari]
Then you can run the example as follows.
.. code-block:: bash
python ray/doc/examples/rl_pong/driver.py --batch-size=10
To run the example on a cluster, simply pass in the flag
``--address=<address>``.
At the moment, on a large machine with 64 physical cores, computing an update
with a batch of size 1 takes about 1 second, a batch of size 10 takes about 2.5
seconds. A batch of size 60 takes about 3 seconds. On a cluster with 11 nodes,
each with 18 physical cores, a batch of size 300 takes about 10 seconds. If the
numbers you see differ from these by much, take a look at the
**Troubleshooting** section at the bottom of this page and consider `submitting
an issue`_.
.. _`code`: https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5
.. _`blog post`: http://karpathy.github.io/2016/05/31/rl/
.. _`code for this example`: https://github.com/ray-project/ray/tree/master/doc/examples/rl_pong
.. _`submitting an issue`: https://github.com/ray-project/ray/issues
**Note** that these times depend on how long the rollouts take, which in turn
depends on how well the policy is doing. For example, a really bad policy will
lose very quickly. As the policy learns, we should expect these numbers to
increase.
The distributed version
-----------------------
At the core of Andrej's `code`_, a neural network is used to define a "policy"
for playing Pong (that is, a function that chooses an action given a state). In
the loop, the network repeatedly plays games of Pong and records a gradient from
each game. Every ten games, the gradients are combined together and used to
update the network.
This example is easy to parallelize because the network can play ten games in
parallel and no information needs to be shared between the games.
We define an **actor** for the Pong environment, which includes a method for
performing a rollout and computing a gradient update. Below is pseudocode for
the actor.
.. code-block:: python
@ray.remote
class PongEnv(object):
def __init__(self):
# Tell numpy to only use one core. If we don't do this, each actor may try
# to use all of the cores and the resulting contention may result in no
# speedup over the serial version. Note that if numpy is using OpenBLAS,
# then you need to set OPENBLAS_NUM_THREADS=1, and you probably need to do
# it from the command line (so it happens before numpy is imported).
os.environ["MKL_NUM_THREADS"] = "1"
self.env = gym.make("Pong-v0")
def compute_gradient(self, model):
# Reset the game.
observation = self.env.reset()
while not done:
# Choose an action using policy_forward.
# Take the action and observe the new state of the world.
# Compute a gradient using policy_backward. Return the gradient and reward.
return [gradient, reward_sum]
We then create a number of actors, so that we can perform rollouts in parallel.
.. code-block:: python
actors = [PongEnv() for _ in range(batch_size)]
Calling this remote function inside of a for loop, we launch multiple tasks to
perform rollouts and compute gradients in parallel.
.. code-block:: python
model_id = ray.put(model)
actions = []
# Launch tasks to compute gradients from multiple rollouts in parallel.
for i in range(batch_size):
action_id = actors[i].compute_gradient.remote(model_id)
actions.append(action_id)
Troubleshooting
---------------
If you are not seeing any speedup from Ray (and assuming you're using a
multicore machine), the problem may be that numpy is trying to use multiple
threads. When many processes are each trying to use multiple threads, the result
is often no speedup. When running this example, try opening up ``top`` and
seeing if some python processes are using more than 100% CPU. If yes, then this
is likely the problem.
The example tries to set ``MKL_NUM_THREADS=1`` in the actor. However, that only
works if the numpy on your machine is actually using MKL. If it's using
OpenBLAS, then you'll need to set ``OPENBLAS_NUM_THREADS=1``. In fact, you may
have to do this **before** running the script (it may need to happen before
numpy is imported).
.. code-block:: python
export OPENBLAS_NUM_THREADS=1

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

View file

@ -254,13 +254,15 @@ Getting Involved
:maxdepth: -1
:caption: Examples
example-rl-pong.rst
example-parameter-server.rst
example-newsreader.rst
example-resnet.rst
example-a3c.rst
example-lbfgs.rst
example-streaming.rst
auto_examples/overview.rst
auto_examples/plot_lbfgs.rst
auto_examples/plot_newsreader.rst
auto_examples/plot_hyperparameter.rst
auto_examples/plot_pong_example.rst
auto_examples/plot_resnet.rst
auto_examples/plot_streaming.rst
auto_examples/plot_parameter_server.rst
auto_examples/plot_example-a3c.rst
using-ray-with-tensorflow.rst
using-ray-with-pytorch.rst