mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
297 lines
10 KiB
Python
297 lines
10 KiB
Python
# flake8: noqa
|
|
"""
|
|
Learning to Play Pong
|
|
=====================
|
|
|
|
In this example, we'll train a **very simple** neural network to play Pong using
|
|
the OpenAI Gym.
|
|
|
|
At a high level, we will use multiple Ray actors to obtain simulation rollouts
|
|
and calculate gradient simultaneously. We will then centralize these
|
|
gradients and update the neural network. The updated neural network will
|
|
then be passed back to each Ray actor for more gradient calculation.
|
|
|
|
This application is adapted, with minimal modifications, from
|
|
Andrej Karpathy's `source code`_ (see the accompanying `blog post`_).
|
|
|
|
.. image:: ../images/pong-arch.svg
|
|
:align: center
|
|
|
|
|
|
To run the application, first install some dependencies.
|
|
|
|
.. code-block:: bash
|
|
|
|
pip install gym[atari]
|
|
|
|
At the moment, on a large machine with 64 physical cores, computing an update
|
|
with a batch of size 1 takes about 1 second, a batch of size 10 takes about 2.5
|
|
seconds. A batch of size 60 takes about 3 seconds. On a cluster with 11 nodes,
|
|
each with 18 physical cores, a batch of size 300 takes about 10 seconds. If the
|
|
numbers you see differ from these by much, take a look at the
|
|
**Troubleshooting** section at the bottom of this page and consider `submitting
|
|
an issue`_.
|
|
|
|
.. _`source code`: https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5
|
|
.. _`blog post`: http://karpathy.github.io/2016/05/31/rl/
|
|
.. _`submitting an issue`: https://github.com/ray-project/ray/issues
|
|
|
|
**Note** that these times depend on how long the rollouts take, which in turn
|
|
depends on how well the policy is doing. For example, a really bad policy will
|
|
lose very quickly. As the policy learns, we should expect these numbers to
|
|
increase.
|
|
"""
|
|
import numpy as np
|
|
import os
|
|
import ray
|
|
import time
|
|
|
|
import gym
|
|
|
|
##############################################################################
|
|
# Hyperparameters
|
|
# ---------------
|
|
#
|
|
# Here we'll define a couple of the hyperparameters that are used.
|
|
|
|
H = 200 # The number of hidden layer neurons.
|
|
gamma = 0.99 # The discount factor for reward.
|
|
decay_rate = 0.99 # The decay factor for RMSProp leaky sum of grad^2.
|
|
D = 80 * 80 # The input dimensionality: 80x80 grid.
|
|
learning_rate = 1e-4 # Magnitude of the update.
|
|
|
|
#############################################################################
|
|
# Helper Functions
|
|
# ----------------
|
|
#
|
|
# We first define a few helper functions:
|
|
#
|
|
# 1. Preprocessing: The ``preprocess`` function will
|
|
# preprocess the original 210x160x3 uint8 frame into a one-dimensional 6400
|
|
# float vector.
|
|
#
|
|
# 2. Reward Processing: The ``process_rewards`` function will calculate
|
|
# a discounted reward. This formula states that the "value" of a
|
|
# sampled action is the weighted sum of all rewards afterwards,
|
|
# but later rewards are exponentially less important.
|
|
#
|
|
# 3. Rollout: The ``rollout`` function plays an entire game of Pong (until
|
|
# either the computer or the RL agent loses).
|
|
|
|
|
|
def preprocess(img):
|
|
# Crop the image.
|
|
img = img[35:195]
|
|
# Downsample by factor of 2.
|
|
img = img[::2, ::2, 0]
|
|
# Erase background (background type 1).
|
|
img[img == 144] = 0
|
|
# Erase background (background type 2).
|
|
img[img == 109] = 0
|
|
# Set everything else (paddles, ball) to 1.
|
|
img[img != 0] = 1
|
|
return img.astype(np.float).ravel()
|
|
|
|
|
|
def process_rewards(r):
|
|
"""Compute discounted reward from a vector of rewards."""
|
|
discounted_r = np.zeros_like(r)
|
|
running_add = 0
|
|
for t in reversed(range(0, r.size)):
|
|
# Reset the sum, since this was a game boundary (pong specific!).
|
|
if r[t] != 0:
|
|
running_add = 0
|
|
running_add = running_add * gamma + r[t]
|
|
discounted_r[t] = running_add
|
|
return discounted_r
|
|
|
|
|
|
def rollout(model, env):
|
|
"""Evaluates env and model until the env returns "Done".
|
|
|
|
Returns:
|
|
xs: A list of observations
|
|
hs: A list of model hidden states per observation
|
|
dlogps: A list of gradients
|
|
drs: A list of rewards.
|
|
|
|
"""
|
|
# Reset the game.
|
|
observation = env.reset()
|
|
# Note that prev_x is used in computing the difference frame.
|
|
prev_x = None
|
|
xs, hs, dlogps, drs = [], [], [], []
|
|
done = False
|
|
while not done:
|
|
cur_x = preprocess(observation)
|
|
x = cur_x - prev_x if prev_x is not None else np.zeros(D)
|
|
prev_x = cur_x
|
|
|
|
aprob, h = model.policy_forward(x)
|
|
# Sample an action.
|
|
action = 2 if np.random.uniform() < aprob else 3
|
|
|
|
# The observation.
|
|
xs.append(x)
|
|
# The hidden state.
|
|
hs.append(h)
|
|
y = 1 if action == 2 else 0 # A "fake label".
|
|
# The gradient that encourages the action that was taken to be
|
|
# taken (see http://cs231n.github.io/neural-networks-2/#losses if
|
|
# confused).
|
|
dlogps.append(y - aprob)
|
|
|
|
observation, reward, done, info = env.step(action)
|
|
|
|
# Record reward (has to be done after we call step() to get reward
|
|
# for previous action).
|
|
drs.append(reward)
|
|
return xs, hs, dlogps, drs
|
|
|
|
|
|
##############################################################################
|
|
# Neural Network
|
|
# --------------
|
|
# Here, a neural network is used to define a "policy"
|
|
# for playing Pong (that is, a function that chooses an action given a state).
|
|
#
|
|
# To implement a neural network in NumPy, we need to provide helper functions
|
|
# for calculating updates and computing the output of the neural network
|
|
# given an input, which in our case is an observation.
|
|
|
|
|
|
class Model(object):
|
|
"""This class holds the neural network weights."""
|
|
|
|
def __init__(self):
|
|
self.weights = {}
|
|
self.weights["W1"] = np.random.randn(H, D) / np.sqrt(D)
|
|
self.weights["W2"] = np.random.randn(H) / np.sqrt(H)
|
|
|
|
def policy_forward(self, x):
|
|
h = np.dot(self.weights["W1"], x)
|
|
h[h < 0] = 0 # ReLU nonlinearity.
|
|
logp = np.dot(self.weights["W2"], h)
|
|
# Softmax
|
|
p = 1.0 / (1.0 + np.exp(-logp))
|
|
# Return probability of taking action 2, and hidden state.
|
|
return p, h
|
|
|
|
def policy_backward(self, eph, epx, epdlogp):
|
|
"""Backward pass to calculate gradients.
|
|
|
|
Arguments:
|
|
eph: Array of intermediate hidden states.
|
|
epx: Array of experiences (observations.
|
|
epdlogp: Array of logps (output of last layer before softmax/
|
|
|
|
"""
|
|
dW2 = np.dot(eph.T, epdlogp).ravel()
|
|
dh = np.outer(epdlogp, self.weights["W2"])
|
|
# Backprop relu.
|
|
dh[eph <= 0] = 0
|
|
dW1 = np.dot(dh.T, epx)
|
|
return {"W1": dW1, "W2": dW2}
|
|
|
|
def update(self, grad_buffer, rmsprop_cache, lr, decay):
|
|
"""Applies the gradients to the model parameters with RMSProp."""
|
|
for k, v in self.weights.items():
|
|
g = grad_buffer[k]
|
|
rmsprop_cache[k] = (decay * rmsprop_cache[k] + (1 - decay) * g**2)
|
|
self.weights[k] += lr * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
|
|
|
|
|
|
def zero_grads(grad_buffer):
|
|
"""Reset the batch gradient buffer."""
|
|
for k, v in grad_buffer.items():
|
|
grad_buffer[k] = np.zeros_like(v)
|
|
|
|
|
|
#############################################################################
|
|
# Parallelizing Gradients
|
|
# -----------------------
|
|
# We define an **actor**, which is responsible for taking a model and an env
|
|
# and performing a rollout + computing a gradient update.
|
|
|
|
ray.init()
|
|
|
|
|
|
@ray.remote
|
|
class RolloutWorker(object):
|
|
def __init__(self):
|
|
# Tell numpy to only use one core. If we don't do this, each actor may
|
|
# try to use all of the cores and the resulting contention may result
|
|
# in no speedup over the serial version. Note that if numpy is using
|
|
# OpenBLAS, then you need to set OPENBLAS_NUM_THREADS=1, and you
|
|
# probably need to do it from the command line (so it happens before
|
|
# numpy is imported).
|
|
os.environ["MKL_NUM_THREADS"] = "1"
|
|
self.env = gym.make("Pong-v0")
|
|
|
|
def compute_gradient(self, model):
|
|
# Compute a simulation episode.
|
|
xs, hs, dlogps, drs = rollout(model, self.env)
|
|
reward_sum = sum(drs)
|
|
# Vectorize the arrays.
|
|
epx = np.vstack(xs)
|
|
eph = np.vstack(hs)
|
|
epdlogp = np.vstack(dlogps)
|
|
epr = np.vstack(drs)
|
|
|
|
# Compute the discounted reward backward through time.
|
|
discounted_epr = process_rewards(epr)
|
|
# Standardize the rewards to be unit normal (helps control the gradient
|
|
# estimator variance).
|
|
discounted_epr -= np.mean(discounted_epr)
|
|
discounted_epr /= np.std(discounted_epr)
|
|
# Modulate the gradient with advantage (the policy gradient magic
|
|
# happens right here).
|
|
epdlogp *= discounted_epr
|
|
return model.policy_backward(eph, epx, epdlogp), reward_sum
|
|
|
|
|
|
#############################################################################
|
|
# Running
|
|
# -------
|
|
#
|
|
# This example is easy to parallelize because the network can play ten games
|
|
# in parallel and no information needs to be shared between the games.
|
|
#
|
|
# In the loop, the network repeatedly plays games of Pong and
|
|
# records a gradient from each game. Every ten games, the gradients are
|
|
# combined together and used to update the network.
|
|
|
|
iterations = 20
|
|
batch_size = 4
|
|
model = Model()
|
|
actors = [RolloutWorker.remote() for _ in range(batch_size)]
|
|
|
|
running_reward = None
|
|
# "Xavier" initialization.
|
|
# Update buffers that add up gradients over a batch.
|
|
grad_buffer = {k: np.zeros_like(v) for k, v in model.weights.items()}
|
|
# Update the rmsprop memory.
|
|
rmsprop_cache = {k: np.zeros_like(v) for k, v in model.weights.items()}
|
|
|
|
for i in range(1, 1 + iterations):
|
|
model_id = ray.put(model)
|
|
gradient_ids = []
|
|
# Launch tasks to compute gradients from multiple rollouts in parallel.
|
|
start_time = time.time()
|
|
gradient_ids = [
|
|
actor.compute_gradient.remote(model_id) for actor in actors
|
|
]
|
|
for batch in range(batch_size):
|
|
[grad_id], gradient_ids = ray.wait(gradient_ids)
|
|
grad, reward_sum = ray.get(grad_id)
|
|
# Accumulate the gradient over batch.
|
|
for k in model.weights:
|
|
grad_buffer[k] += grad[k]
|
|
running_reward = (reward_sum if running_reward is None else
|
|
running_reward * 0.99 + reward_sum * 0.01)
|
|
end_time = time.time()
|
|
print("Batch {} computed {} rollouts in {} seconds, "
|
|
"running mean is {}".format(i, batch_size, end_time - start_time,
|
|
running_reward))
|
|
model.update(grad_buffer, rmsprop_cache, learning_rate, decay_rate)
|
|
zero_grads(grad_buffer)
|