From 882a649f0c7326089f0f89cd8444520037a55e4e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 15 Mar 2018 15:57:31 -0700 Subject: [PATCH] [rllib] [docs] Cleanup RLlib API and make docs consistent with upcoming blog post (#1708) * wip * more work * fix apex * docs * apex doc * pool comment * clean up * make wrap stack pluggable * Mon Mar 12 21:45:50 PDT 2018 * clean up comment * table * Mon Mar 12 22:51:57 PDT 2018 * Mon Mar 12 22:53:05 PDT 2018 * Mon Mar 12 22:55:03 PDT 2018 * Mon Mar 12 22:56:18 PDT 2018 * Mon Mar 12 22:59:54 PDT 2018 * Update apex_optimizer.py * Update index.rst * Update README.rst * Update README.rst * comments * Wed Mar 14 19:01:02 PDT 2018 --- README.rst | 2 +- doc/source/conf.py | 3 + doc/source/index.rst | 3 +- doc/source/policy-optimizers.rst | 67 +++++++++++++++++++ doc/source/rllib-dev.rst | 4 +- doc/source/rllib.rst | 35 +++++----- python/ray/rllib/README.rst | 27 +++++--- python/ray/rllib/__init__.py | 7 +- python/ray/rllib/a3c/a3c_evaluator.py | 6 +- python/ray/rllib/bc/bc_evaluator.py | 6 +- python/ray/rllib/dqn/common/wrappers.py | 2 +- python/ray/rllib/dqn/dqn.py | 9 +-- python/ray/rllib/dqn/dqn_evaluator.py | 17 +++-- python/ray/rllib/optimizers/__init__.py | 7 +- python/ray/rllib/optimizers/apex_optimizer.py | 46 +++++++++---- .../{async.py => async_optimizer.py} | 8 +-- python/ray/rllib/optimizers/local_sync.py | 8 +-- .../ray/rllib/optimizers/local_sync_replay.py | 14 ++-- python/ray/rllib/optimizers/multi_gpu.py | 8 +-- .../{evaluator.py => policy_evaluator.py} | 61 ++++++++++++----- .../{optimizer.py => policy_optimizer.py} | 33 +++++++-- python/ray/rllib/pg/pg_evaluator.py | 6 +- python/ray/rllib/ppo/ppo_evaluator.py | 4 +- python/ray/rllib/test/mock_evaluator.py | 2 +- .../ray/rllib/tuned_examples/pong-apex.yaml | 7 +- python/ray/rllib/utils/actors.py | 2 + .../{dqn/common => utils}/atari_wrappers.py | 16 +++-- 27 files changed, 282 insertions(+), 128 deletions(-) create mode 100644 doc/source/policy-optimizers.rst rename python/ray/rllib/optimizers/{async.py => async_optimizer.py} (91%) rename python/ray/rllib/optimizers/{evaluator.py => policy_evaluator.py} (58%) rename python/ray/rllib/optimizers/{optimizer.py => policy_optimizer.py} (66%) rename python/ray/rllib/{dqn/common => utils}/atari_wrappers.py (94%) diff --git a/README.rst b/README.rst index e6604756b..b7a2b2784 100644 --- a/README.rst +++ b/README.rst @@ -38,7 +38,7 @@ Example Use Ray comes with libraries that accelerate deep learning and reinforcement learning development: - `Ray Tune`_: Hyperparameter Optimization Framework -- `Ray RLlib`_: A Scalable Reinforcement Learning Library +- `Ray RLlib`_: Scalable Reinforcement Learning .. _`Ray Tune`: http://ray.readthedocs.io/en/latest/tune.html .. _`Ray RLlib`: http://ray.readthedocs.io/en/latest/rllib.html diff --git a/doc/source/conf.py b/doc/source/conf.py index a56966936..f98e1ca5b 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -320,4 +320,7 @@ texinfo_documents = [ # pcmoritz: To make the following work, you have to run # sudo pip install recommonmark +# Python methods should be presented in source code order +autodoc_member_order = 'bysource' + # see also http://searchvoidstar.tumblr.com/post/125486358368/making-pdfs-from-markdown-on-readthedocsorg-using diff --git a/doc/source/index.rst b/doc/source/index.rst index 4be274087..056cf7a8a 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -41,7 +41,7 @@ View the `codebase on GitHub`_. Ray comes with libraries that accelerate deep learning and reinforcement learning development: - `Ray Tune`_: Hyperparameter Optimization Framework -- `Ray RLlib`_: A Scalable Reinforcement Learning Library +- `Ray RLlib`_: Scalable Reinforcement Learning .. _`Ray Tune`: tune.html .. _`Ray RLlib`: rllib.html @@ -78,6 +78,7 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin :caption: Ray RLlib rllib.rst + policy-optimizers.rst rllib-dev.rst .. toctree:: diff --git a/doc/source/policy-optimizers.rst b/doc/source/policy-optimizers.rst new file mode 100644 index 000000000..c3ec1c7f8 --- /dev/null +++ b/doc/source/policy-optimizers.rst @@ -0,0 +1,67 @@ +Policy Optimizers +================= + +RLlib supports using its distributed policy optimizer implementations from external algorithms. + +Example of constructing and using a policy optimizer `(link to full example) `__: + +.. code-block:: python + + ray.init() + env_creator = lambda env_config: gym.make("PongNoFrameskip-v4") + optimizer = LocalSyncReplayOptimizer.make( + YourEvaluatorClass, [env_creator], num_workers=0, optimizer_config={}) + + i = 0 + while optimizer.num_steps_sampled < 100000: + i += 1 + print("== optimizer step {} ==".format(i)) + optimizer.step() + print("optimizer stats", optimizer.stats()) + print("local evaluator stats", optimizer.local_evaluator.stats()) + +Here are the steps for using a RLlib policy optimizer with an existing algorithm. + +1. Implement the `Policy evaluator interface `__. + + - Here is an example of porting a `PyTorch Rainbow implementation `__. + + - Another example porting a `TensorFlow DQN implementation `__. + +2. Pick a `Policy optimizer class `__. The `LocalSyncOptimizer `__ is a reasonable choice for local testing. You can also implement your own. Policy optimizers can be constructed using their ``make`` method (e.g., ``LocalSyncOptimizer.make(evaluator_cls, evaluator_args, num_workers, optimizer_config)``), or you can construct them by passing in a list of evaluators instantiated as Ray actors. + + - Here is code showing the `simple Policy Gradient agent `__ using ``make()``. + + - A different example showing an `A3C agent `__ passing in Ray actors directly. + +3. Decide how you want to drive the training loop. + + - Option 1: call ``optimizer.step()`` from some existing training code. Training statistics can be retrieved by querying the ``optimizer.local_evaluator`` evaluator instance, or mapping over the remote evaluators (e.g., ``ray.get([ev.some_fn.remote() for ev in optimizer.remote_evaluators])``) if you are running with multiple workers. + + - Option 2: define a full RLlib `Agent class `__. This might be preferable if you don't have an existing training harness or want to use features provided by `Ray Tune `__. + +Available Policy Optimizers +--------------------------- + ++-----------------------------+---------------------+-----------------+------------------------------+ +| **Policy optimizer class** | **Operating range** | **Works with** | **Description** | ++=============================+=====================+=================+==============================+ +|AsyncOptimizer |1-10s of CPUs |(any) |Asynchronous gradient-based | +| | | |optimization (e.g., A3C) | ++-----------------------------+---------------------+-----------------+------------------------------+ +|LocalSyncOptimizer |0-1 GPUs + |(any) |Synchronous gradient-based | +| |1-100s of CPUs | |optimization with parallel | +| | | |sample collection | ++-----------------------------+---------------------+-----------------+------------------------------+ +|LocalSyncReplayOptimizer |0-1 GPUs + | Off-policy |Adds a replay buffer | +| |1-100s of CPUs | algorithms |to LocalSyncOptimizer | ++-----------------------------+---------------------+-----------------+------------------------------+ +|LocalMultiGPUOptimizer |0-10 GPUs + | Algorithms |Implements data-parallel | +| |1-100s of CPUs | written in |optimization over multiple | +| | | TensorFlow |GPUs, e.g., for PPO | ++-----------------------------+---------------------+-----------------+------------------------------+ +|ApexOptimizer |1 GPU + | Off-policy |Implements the Ape-X | +| |10-100s of CPUs | algorithms |distributed prioritization | +| | | w/sample |algorithm | +| | | prioritization | | ++-----------------------------+---------------------+-----------------+------------------------------+ diff --git a/doc/source/rllib-dev.rst b/doc/source/rllib-dev.rst index 2b438ca60..83237e707 100644 --- a/doc/source/rllib-dev.rst +++ b/doc/source/rllib-dev.rst @@ -42,10 +42,10 @@ a common base class: Policy Evaluators and Optimizers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. autoclass:: ray.rllib.optimizers.evaluator.Evaluator +.. autoclass:: ray.rllib.optimizers.policy_evaluator.PolicyEvaluator :members: -.. autoclass:: ray.rllib.optimizers.optimizer.Optimizer +.. autoclass:: ray.rllib.optimizers.policy_optimizer.PolicyOptimizer :members: Sample Batches diff --git a/doc/source/rllib.rst b/doc/source/rllib.rst index 6d055a5db..fc5ea7c0a 100644 --- a/doc/source/rllib.rst +++ b/doc/source/rllib.rst @@ -1,21 +1,16 @@ -Ray RLlib: A Scalable Reinforcement Learning Library -==================================================== +Ray RLlib: Scalable Reinforcement Learning +========================================== -Ray RLlib is a reinforcement learning library that aims to provide both performance and composability: +Ray RLlib is an RL execution toolkit built on the Ray distributed execution framework. RLlib implements a collection of distributed *policy optimizers* that make it easy to use a variety of training strategies with existing RL algorithms written in frameworks such as PyTorch, TensorFlow, and Theano. This enables complex architectures for RL training (e.g., Ape-X, IMPALA), to be implemented once and reused many times across different RL algorithms and libraries. -- Performance - - High performance algorithm implementions - - Pluggable distributed RL execution strategies +You can find the code for RLlib `here on GitHub `__, and the paper `here `__. -- Composability - - Integration with the `Ray Tune `__ hyperparam tuning tool - - Support for multiple frameworks (TensorFlow, PyTorch) - - Scalable primitives for developing new algorithms - - Shared models between algorithms +.. note:: -You can find the code for RLlib `here on GitHub `__, and the NIPS symposium paper `here `__. + To use RLlib's policy optimizers outside of RLlib, see the `RLlib policy optimizers documentation `__. -RLlib currently provides the following algorithms: + +RLlib's policy optimizers serve as the basis for RLlib's reference algorithms, which include: - `Proximal Policy Optimization (PPO) `__ which is a proximal variant of `TRPO `__. @@ -24,6 +19,8 @@ RLlib currently provides the following algorithms: - `Deep Q Networks (DQN) `__. +- `Ape-X Distributed Prioritized Experience Replay `__. + - Evolution Strategies, as described in `this paper `__. Our implementation is adapted from @@ -80,7 +77,7 @@ The ``train.py`` script has a number of options you can show by running The most important options are for choosing the environment with ``--env`` (any OpenAI gym environment including ones registered by the user can be used) and for choosing the algorithm with ``--run`` -(available options are ``PPO``, ``A3C``, ``ES`` and ``DQN``). +(available options are ``PPO``, ``A3C``, ``ES``, ``DQN`` and ``APEX``). Specifying Parameters ~~~~~~~~~~~~~~~~~~~~~ @@ -89,8 +86,9 @@ Each algorithm has specific hyperparameters that can be set with ``--config`` - ``DEFAULT_CONFIG`` variable in `PPO `__, `A3C `__, -`ES `__ and -`DQN `__. +`ES `__, +`DQN `__ and +`APEX `__. In an example below, we train A3C by specifying 8 workers through the config flag. function that creates the env to refer to it by name. The contents of the env_config agent config field will be passed to that function to allow the environment to be configured. The return type should be an OpenAI gym.Env. For example: @@ -325,6 +323,11 @@ in the ``config`` section of the experiments. For an advanced example of using Population Based Training (PBT) with RLlib, see the `PPO + PBT Walker2D training example `__. +Using Policy Optimizers outside of RLlib +---------------------------------------- + +See the `RLlib policy optimizers documentation `__. + Contributing to RLlib --------------------- diff --git a/python/ray/rllib/README.rst b/python/ray/rllib/README.rst index 1cbe9bbd5..e9ac3e2ea 100644 --- a/python/ray/rllib/README.rst +++ b/python/ray/rllib/README.rst @@ -1,9 +1,11 @@ -Ray RLlib: A Scalable Reinforcement Learning Library -==================================================== +Ray RLlib: Scalable Reinforcement Learning +========================================== -This README provides a brief technical overview of RLlib. See also the `user documentation `__ and `NIPS symposium paper `__. +This README provides a brief technical overview of RLlib. See also the `user documentation `__ and `paper `__. -RLlib currently provides the following algorithms: +Ray RLlib is an RL execution toolkit built on the Ray distributed execution framework. RLlib implements a collection of distributed *policy optimizers* that make it easy to use a variety of training strategies with existing RL algorithms written in frameworks such as PyTorch, TensorFlow, and Theano. This enables complex architectures for RL training (e.g., Ape-X, IMPALA), to be implemented *once* and reused many times across different RL algorithms and libraries. + +RLlib's policy optimizers serve as the basis for RLlib's reference algorithms, which include: - `Proximal Policy Optimization (PPO) `__ which is a proximal variant of `TRPO `__. @@ -12,6 +14,8 @@ RLlib currently provides the following algorithms: - `Deep Q Networks (DQN) `__. +- `Ape-X Distributed Prioritized Experience Replay `__. + - Evolution Strategies, as described in `this paper `__. Our implementation is adapted from @@ -19,6 +23,8 @@ RLlib currently provides the following algorithms: These algorithms can be run on any OpenAI Gym MDP, including custom ones written and registered by the user. +RLlib's distributed policy optimizers can also be used by any existing algorithm or RL library that implements the policy evaluator interface (optimizers/policy_evaluator.py). + Training API ------------ @@ -33,19 +39,20 @@ All RLlib algorithms implement a common training API (agent.py), which enables m # Integration with ray.tune for hyperparam evaluation python train.py -f tuned_examples/cartpole-grid-search-example.yaml -Policy Evaluator and Optimizer abstractions -------------------------------------------- +Policy Optimizer abstraction +---------------------------- -RLlib's gradient-based algorithms are composed using two abstractions: Evaluators (evaluator.py) and Optimizers (optimizers/optimizer.py). Optimizers encapsulate a particular distributed optimization strategy for RL. Evaluators encapsulate the model graph, and once implemented, any Optimizer may be "plugged in" to any algorithm that implements the Evaluator interface. +RLlib's gradient-based algorithms are composed using two abstractions: policy evaluators (optimizers/policy_evaluator.py) and policy optimizers (optimizers/policy_optimizer.py). Policy optimizers serve as the "control plane" of algorithms and implement a particular distributed optimization strategy for RL. Evaluators implement the algorithm "data plane" and encapsulate the model graph. Once an evaluator for an algorithm is implemented, it is compatible with any policy optimizer. -This pluggability enables optimization strategies to be re-used and improved across different algorithms and deep learning frameworks (RLlib's optimizers work with both TensorFlow and PyTorch, though currently only A3C has a PyTorch graph implementation). +This pluggability enables complex architectures for distributed training to be defined _once_ and reused many times across different algorithms and RL libraries. These are the currently available optimizers: - ``AsyncOptimizer`` is an asynchronous RL optimizer, i.e. like A3C. It asynchronously pulls and applies gradients from evaluators, sending updated weights back as needed. - ``LocalSyncOptimizer`` is a simple synchronous RL optimizer. It pulls samples from remote evaluators, concatenates them, and then updates a local model. The updated model weights are then broadcast to all remote evalutaors. -- ``LocalMultiGPUOptimizer`` (currently available for PPO) This optimizer performs SGD over a number of local GPUs, and pins experience data in GPU memory to amortize the copy overhead for multiple SGD passes. -- ``AllReduceOptimizer`` (planned) This optimizer would use the Allreduce primitive to scalably synchronize weights among a number of remote GPU workers. +- ``LocalSyncReplayOptimizer`` adds experience replay to LocalSyncOptimizer (e.g., for DQNs). +- ``LocalMultiGPUOptimizer`` This optimizer performs SGD over a number of local GPUs, and pins experience data in GPU memory to amortize the copy overhead for multiple SGD passes. +- ``ApexOptimizer`` This implements the distributed experience replay algorithm for DQN and DDPG and is designed to run in a cluster setting. Common utilities ---------------- diff --git a/python/ray/rllib/__init__.py b/python/ray/rllib/__init__.py index 6bbe33a53..1815c6108 100644 --- a/python/ray/rllib/__init__.py +++ b/python/ray/rllib/__init__.py @@ -10,11 +10,8 @@ from ray.tune.registry import register_trainable def _register_all(): for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "__fake", "__sigmoid_fake_data", "__parameter_tuning"]: - try: - from ray.rllib.agent import get_agent_class - register_trainable(key, get_agent_class(key)) - except ImportError as e: - print("Warning: could not import {}: {}".format(key, e)) + from ray.rllib.agent import get_agent_class + register_trainable(key, get_agent_class(key)) _register_all() diff --git a/python/ray/rllib/a3c/a3c_evaluator.py b/python/ray/rllib/a3c/a3c_evaluator.py index 95360bf76..9b0522dfd 100644 --- a/python/ray/rllib/a3c/a3c_evaluator.py +++ b/python/ray/rllib/a3c/a3c_evaluator.py @@ -6,14 +6,14 @@ import pickle import ray from ray.rllib.models import ModelCatalog -from ray.rllib.optimizers import Evaluator +from ray.rllib.optimizers import PolicyEvaluator from ray.rllib.a3c.common import get_policy_cls from ray.rllib.utils.filter import get_filter from ray.rllib.utils.sampler import AsyncSampler from ray.rllib.utils.process_rollout import process_rollout -class A3CEvaluator(Evaluator): +class A3CEvaluator(PolicyEvaluator): """Actor object to start running simulation on workers. The gradient computation is also executed from this object. @@ -65,7 +65,7 @@ class A3CEvaluator(Evaluator): def compute_gradients(self, samples): gradient, info = self.policy.compute_gradients(samples) - return gradient + return gradient, {} def apply_gradients(self, grads): self.policy.apply_gradients(grads) diff --git a/python/ray/rllib/bc/bc_evaluator.py b/python/ray/rllib/bc/bc_evaluator.py index 902068b3b..8499ba1e0 100644 --- a/python/ray/rllib/bc/bc_evaluator.py +++ b/python/ray/rllib/bc/bc_evaluator.py @@ -9,10 +9,10 @@ import ray from ray.rllib.bc.experience_dataset import ExperienceDataset from ray.rllib.bc.policy import BCPolicy from ray.rllib.models import ModelCatalog -from ray.rllib.optimizers import Evaluator +from ray.rllib.optimizers import PolicyEvaluator -class BCEvaluator(Evaluator): +class BCEvaluator(PolicyEvaluator): def __init__(self, registry, env_creator, config, logdir): env = ModelCatalog.get_preprocessor_as_wrapper(registry, env_creator( config["env_config"]), config["model"]) @@ -31,7 +31,7 @@ class BCEvaluator(Evaluator): gradient, info = self.policy.compute_gradients(samples) self.metrics_queue.put( {"num_samples": info["num_samples"], "loss": info["loss"]}) - return gradient + return gradient, {} def apply_gradients(self, grads): self.policy.apply_gradients(grads) diff --git a/python/ray/rllib/dqn/common/wrappers.py b/python/ray/rllib/dqn/common/wrappers.py index 3a8fd68ae..9a2d0d764 100644 --- a/python/ray/rllib/dqn/common/wrappers.py +++ b/python/ray/rllib/dqn/common/wrappers.py @@ -3,7 +3,7 @@ from __future__ import division from __future__ import print_function from ray.rllib.models import ModelCatalog -from ray.rllib.dqn.common.atari_wrappers import wrap_deepmind +from ray.rllib.utils.atari_wrappers import wrap_deepmind def wrap_dqn(registry, env, options, random_starts): diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index dc0bb3f4e..100225474 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -103,7 +103,7 @@ DEFAULT_CONFIG = dict( # === Parallelism === # Number of workers for collecting samples with. This only makes sense # to increase if your environment is particularly slow to sample, or if - # you're using the Ape-X optimizer. + # you're using the Async or Ape-X optimizers. num_workers=0, # Whether to allocate GPUs for workers (if > 0). num_gpus_per_worker=0, @@ -221,13 +221,6 @@ class DQNAgent(Agent): return result - def _populate_replay_buffer(self): - if self.remote_evaluators: - for e in self.remote_evaluators: - e.sample.remote(no_replay=True) - else: - self.local_evaluator.sample(no_replay=True) - def _stop(self): # workaround for https://github.com/ray-project/ray/issues/1516 for ev in self.remote_evaluators: diff --git a/python/ray/rllib/dqn/dqn_evaluator.py b/python/ray/rllib/dqn/dqn_evaluator.py index 088840161..758dc5f81 100644 --- a/python/ray/rllib/dqn/dqn_evaluator.py +++ b/python/ray/rllib/dqn/dqn_evaluator.py @@ -11,7 +11,7 @@ from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.dqn import models from ray.rllib.dqn.common.wrappers import wrap_dqn from ray.rllib.dqn.common.schedules import ConstantSchedule, LinearSchedule -from ray.rllib.optimizers import SampleBatch, Evaluator +from ray.rllib.optimizers import SampleBatch, PolicyEvaluator from ray.rllib.utils.compression import pack @@ -43,7 +43,7 @@ def adjust_nstep(n_step, gamma, obs, actions, rewards, new_obs, dones): del arr[new_len:] -class DQNEvaluator(Evaluator): +class DQNEvaluator(PolicyEvaluator): """The DQN Evaluator. TODO(rliaw): Support observation/reward filters?""" @@ -136,13 +136,20 @@ class DQNEvaluator(Evaluator): return batch + def compute_gradients(self, samples): + td_err, grads = self.dqn_graph.compute_gradients( + self.sess, samples["obs"], samples["actions"], samples["rewards"], + samples["new_obs"], samples["dones"], samples["weights"]) + return grads, {"td_error": td_err} + + def apply_gradients(self, grads): + self.dqn_graph.apply_gradients(self.sess, grads) + def compute_apply(self, samples): - if samples is None: - return None td_error = self.dqn_graph.compute_apply( self.sess, samples["obs"], samples["actions"], samples["rewards"], samples["new_obs"], samples["dones"], samples["weights"]) - return td_error + return {"td_error": td_error} def get_weights(self): return self.variables.get_weights() diff --git a/python/ray/rllib/optimizers/__init__.py b/python/ray/rllib/optimizers/__init__.py index 55bbb2670..95be536c0 100644 --- a/python/ray/rllib/optimizers/__init__.py +++ b/python/ray/rllib/optimizers/__init__.py @@ -1,13 +1,14 @@ from ray.rllib.optimizers.apex_optimizer import ApexOptimizer -from ray.rllib.optimizers.async import AsyncOptimizer +from ray.rllib.optimizers.async_optimizer import AsyncOptimizer from ray.rllib.optimizers.local_sync import LocalSyncOptimizer from ray.rllib.optimizers.local_sync_replay import LocalSyncReplayOptimizer from ray.rllib.optimizers.multi_gpu import LocalMultiGPUOptimizer from ray.rllib.optimizers.sample_batch import SampleBatch -from ray.rllib.optimizers.evaluator import Evaluator, TFMultiGPUSupport +from ray.rllib.optimizers.policy_evaluator import PolicyEvaluator, \ + TFMultiGPUSupport __all__ = [ "ApexOptimizer", "AsyncOptimizer", "LocalSyncOptimizer", "LocalSyncReplayOptimizer", "LocalMultiGPUOptimizer", "SampleBatch", - "Evaluator", "TFMultiGPUSupport"] + "PolicyEvaluator", "TFMultiGPUSupport"] diff --git a/python/ray/rllib/optimizers/apex_optimizer.py b/python/ray/rllib/optimizers/apex_optimizer.py index cadcc28f2..ded738f62 100644 --- a/python/ray/rllib/optimizers/apex_optimizer.py +++ b/python/ray/rllib/optimizers/apex_optimizer.py @@ -1,3 +1,7 @@ +"""Implements Distributed Prioritized Experience Replay. + +https://arxiv.org/abs/1803.00933""" + from __future__ import absolute_import from __future__ import division from __future__ import print_function @@ -11,7 +15,7 @@ import threading import numpy as np import ray -from ray.rllib.optimizers.optimizer import Optimizer +from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.optimizers.replay_buffer import PrioritizedReplayBuffer from ray.rllib.optimizers.sample_batch import SampleBatch from ray.rllib.utils.actors import TaskPool, create_colocated @@ -25,6 +29,11 @@ LEARNER_QUEUE_MAX_SIZE = 16 @ray.remote class ReplayActor(object): + """A replay buffer shard. + + Ray actors are single-threaded, so for scalability multiple replay actors + may be created to increase parallelism.""" + def __init__( self, num_shards, learning_starts, buffer_size, train_batch_size, prioritized_replay_alpha, prioritized_replay_beta, @@ -89,7 +98,15 @@ class ReplayActor(object): return stat -class GenericLearner(threading.Thread): +class LearnerThread(threading.Thread): + """Background thread that updates the local model from replay data. + + The learner thread communicates with the main thread through Queues. This + is needed since Ray operations can only be run on the main thread. In + addition, moving heavyweight gradient ops session runs off the main thread + improves overall throughput. + """ + def __init__(self, local_evaluator): threading.Thread.__init__(self) self.learner_queue_size = WindowStat("size", 50) @@ -110,13 +127,22 @@ class GenericLearner(threading.Thread): ra, replay = self.inqueue.get() if replay is not None: with self.grad_timer: - td_error = self.local_evaluator.compute_apply(replay) + td_error = self.local_evaluator.compute_apply(replay)[ + "td_error"] self.outqueue.put((ra, replay, td_error)) self.learner_queue_size.push(self.inqueue.qsize()) self.weights_updated = True -class ApexOptimizer(Optimizer): +class ApexOptimizer(PolicyOptimizer): + """Main event loop of the Ape-X optimizer. + + This class coordinates the data transfers between the learner thread, + remote evaluators (Ape-X actors), and replay buffer actors. + + This optimizer requires that policy evaluators return an additional + "td_error" array in the info return of compute_gradients(). This error + term will be used for sample prioritization.""" def _init( self, learning_starts=1000, buffer_size=10000, @@ -134,7 +160,7 @@ class ApexOptimizer(Optimizer): self.sample_batch_size = sample_batch_size self.max_weight_sync_delay = max_weight_sync_delay - self.learner = GenericLearner(self.local_evaluator) + self.learner = LearnerThread(self.local_evaluator) self.learner.start() self.replay_actors = create_colocated( @@ -189,10 +215,7 @@ class ApexOptimizer(Optimizer): weights = None with self.timers["sample_processing"]: - i = 0 - num_weight_syncs = 0 for ev, sample_batch in self.sample_tasks.completed(): - i += 1 sample_timesteps += self.sample_batch_size # Send the data to the replay buffer @@ -211,16 +234,13 @@ class ApexOptimizer(Optimizer): self.local_evaluator.get_weights()) ev.set_weights.remote(weights) self.num_weight_syncs += 1 - num_weight_syncs += 1 self.steps_since_update[ev] = 0 # Kick off another sample request self.sample_tasks.add(ev, ev.sample.remote()) with self.timers["replay_processing"]: - i = 0 for ra, replay in self.replay_tasks.completed(): - i += 1 self.replay_tasks.add(ra, ra.replay.remote()) with self.timers["get_samples"]: samples = ray.get(replay) @@ -228,9 +248,7 @@ class ApexOptimizer(Optimizer): self.learner.inqueue.put((ra, samples)) with self.timers["update_priorities"]: - i = 0 while not self.learner.outqueue.empty(): - i += 1 ra, replay, td_error = self.learner.outqueue.get() ra.update_priorities.remote(replay["batch_indexes"], td_error) train_timesteps += self.train_batch_size @@ -262,4 +280,4 @@ class ApexOptimizer(Optimizer): } if self.debug: stats.update(debug_stats) - return dict(Optimizer.stats(self), **stats) + return dict(PolicyOptimizer.stats(self), **stats) diff --git a/python/ray/rllib/optimizers/async.py b/python/ray/rllib/optimizers/async_optimizer.py similarity index 91% rename from python/ray/rllib/optimizers/async.py rename to python/ray/rllib/optimizers/async_optimizer.py index b51e96ba5..93c363345 100644 --- a/python/ray/rllib/optimizers/async.py +++ b/python/ray/rllib/optimizers/async_optimizer.py @@ -3,11 +3,11 @@ from __future__ import division from __future__ import print_function import ray -from ray.rllib.optimizers.optimizer import Optimizer +from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.utils.timer import TimerStat -class AsyncOptimizer(Optimizer): +class AsyncOptimizer(PolicyOptimizer): """An asynchronous RL optimizer, e.g. for implementing A3C. This optimizer asynchronously pulls and applies gradients from remote @@ -37,7 +37,7 @@ class AsyncOptimizer(Optimizer): while gradient_queue: with self.wait_timer: fut, e = gradient_queue.pop(0) - gradient = ray.get(fut) + gradient, _ = ray.get(fut) if gradient is not None: with self.apply_timer: @@ -54,7 +54,7 @@ class AsyncOptimizer(Optimizer): self.num_steps_trained += self.grads_per_step * self.batch_size def stats(self): - return dict(Optimizer.stats(), **{ + return dict(PolicyOptimizer.stats(), **{ "wait_time_ms": round(1000 * self.wait_timer.mean, 3), "apply_time_ms": round(1000 * self.apply_timer.mean, 3), "dispatch_time_ms": round(1000 * self.dispatch_timer.mean, 3), diff --git a/python/ray/rllib/optimizers/local_sync.py b/python/ray/rllib/optimizers/local_sync.py index 0ca8cb39e..3f71cce4e 100644 --- a/python/ray/rllib/optimizers/local_sync.py +++ b/python/ray/rllib/optimizers/local_sync.py @@ -3,13 +3,13 @@ from __future__ import division from __future__ import print_function import ray -from ray.rllib.optimizers.optimizer import Optimizer +from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.optimizers.sample_batch import SampleBatch from ray.rllib.utils.filter import RunningStat from ray.rllib.utils.timer import TimerStat -class LocalSyncOptimizer(Optimizer): +class LocalSyncOptimizer(PolicyOptimizer): """A simple synchronous RL optimizer. In each step, this optimizer pulls samples from a number of remote @@ -40,7 +40,7 @@ class LocalSyncOptimizer(Optimizer): samples = self.local_evaluator.sample() with self.grad_timer: - grad = self.local_evaluator.compute_gradients(samples) + grad, _ = self.local_evaluator.compute_gradients(samples) self.local_evaluator.apply_gradients(grad) self.grad_timer.push_units_processed(samples.count) @@ -48,7 +48,7 @@ class LocalSyncOptimizer(Optimizer): self.num_steps_trained += samples.count def stats(self): - return dict(Optimizer.stats(self), **{ + return dict(PolicyOptimizer.stats(self), **{ "sample_time_ms": round(1000 * self.sample_timer.mean, 3), "grad_time_ms": round(1000 * self.grad_timer.mean, 3), "update_time_ms": round(1000 * self.update_weights_timer.mean, 3), diff --git a/python/ray/rllib/optimizers/local_sync_replay.py b/python/ray/rllib/optimizers/local_sync_replay.py index a2015e873..30af1f704 100644 --- a/python/ray/rllib/optimizers/local_sync_replay.py +++ b/python/ray/rllib/optimizers/local_sync_replay.py @@ -7,14 +7,18 @@ import numpy as np import ray from ray.rllib.optimizers.replay_buffer import ReplayBuffer, \ PrioritizedReplayBuffer -from ray.rllib.optimizers.optimizer import Optimizer +from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.optimizers.sample_batch import SampleBatch from ray.rllib.utils.filter import RunningStat from ray.rllib.utils.timer import TimerStat -class LocalSyncReplayOptimizer(Optimizer): - """Variant of the local sync optimizer that supports replay (for DQN).""" +class LocalSyncReplayOptimizer(PolicyOptimizer): + """Variant of the local sync optimizer that supports replay (for DQN). + + This optimizer requires that policy evaluators return an additional + "td_error" array in the info return of compute_gradients(). This error + term will be used for sample prioritization.""" def _init( self, learning_starts=1000, buffer_size=10000, @@ -88,7 +92,7 @@ class LocalSyncReplayOptimizer(Optimizer): "batch_indexes": batch_indexes}) with self.grad_timer: - td_error = self.local_evaluator.compute_apply(samples) + td_error = self.local_evaluator.compute_apply(samples)["td_error"] new_priorities = ( np.abs(td_error) + self.prioritized_replay_eps) if isinstance(self.replay_buffer, PrioritizedReplayBuffer): @@ -99,7 +103,7 @@ class LocalSyncReplayOptimizer(Optimizer): self.num_steps_trained += samples.count def stats(self): - return dict(Optimizer.stats(self), **{ + return dict(PolicyOptimizer.stats(self), **{ "sample_time_ms": round(1000 * self.sample_timer.mean, 3), "replay_time_ms": round(1000 * self.replay_timer.mean, 3), "grad_time_ms": round(1000 * self.grad_timer.mean, 3), diff --git a/python/ray/rllib/optimizers/multi_gpu.py b/python/ray/rllib/optimizers/multi_gpu.py index 05274133d..f9d3f4a85 100644 --- a/python/ray/rllib/optimizers/multi_gpu.py +++ b/python/ray/rllib/optimizers/multi_gpu.py @@ -7,14 +7,14 @@ import os import tensorflow as tf import ray -from ray.rllib.optimizers.evaluator import TFMultiGPUSupport -from ray.rllib.optimizers.optimizer import Optimizer +from ray.rllib.optimizers.policy_evaluator import TFMultiGPUSupport +from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.optimizers.sample_batch import SampleBatch from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer from ray.rllib.utils.timer import TimerStat -class LocalMultiGPUOptimizer(Optimizer): +class LocalMultiGPUOptimizer(PolicyOptimizer): """A synchronous optimizer that uses multiple local GPUs. Samples are pulled synchronously from multiple remote evaluators, @@ -102,7 +102,7 @@ class LocalMultiGPUOptimizer(Optimizer): self.num_steps_trained += samples.count def stats(self): - return dict(Optimizer.stats(), **{ + return dict(PolicyOptimizer.stats(), **{ "sample_time_ms": round(1000 * self.sample_timer.mean, 3), "load_time_ms": round(1000 * self.load_timer.mean, 3), "grad_time_ms": round(1000 * self.grad_timer.mean, 3), diff --git a/python/ray/rllib/optimizers/evaluator.py b/python/ray/rllib/optimizers/policy_evaluator.py similarity index 58% rename from python/ray/rllib/optimizers/evaluator.py rename to python/ray/rllib/optimizers/policy_evaluator.py index ee079fcd7..615cedf9f 100644 --- a/python/ray/rllib/optimizers/evaluator.py +++ b/python/ray/rllib/optimizers/policy_evaluator.py @@ -5,22 +5,26 @@ from __future__ import print_function import os -class Evaluator(object): - """Algorithms implement this interface to leverage RLlib optimizers. +class PolicyEvaluator(object): + """Algorithms implement this interface to leverage policy optimizers. - Any algorithm that implements Evaluator can plug in any RLLib optimizer, - e.g. async SGD, local multi-GPU SGD, etc. + Policy evaluators are the "data plane" of an algorithm. + + Any algorithm that implements Evaluator can plug in any PolicyOptimizer, + e.g. async SGD, Ape-X, local multi-GPU SGD, etc. """ def sample(self): - """Returns experience samples from this Evaluator. + """Returns a batch of experience sampled from this evaluator. + + This method must be implemented by subclasses. Returns: - SampleBatch: A columnar batch of experiences. + SampleBatch: A columnar batch of experiences (e.g., tensors). Examples: >>> print(ev.sample()) - SampleBatch({"a": [1, 2, 3], "b": [4, 5, 6]}) + SampleBatch({"obs": [1, 2, 3], "action": [0, 1, 0], ...}) """ raise NotImplementedError @@ -28,18 +32,27 @@ class Evaluator(object): def compute_gradients(self, samples): """Returns a gradient computed w.r.t the specified samples. + This method must be implemented by subclasses. + Returns: object: A gradient that can be applied on a compatible evaluator. + info: dictionary of extra metadata. + + Examples: + >>> batch = ev.sample() + >>> grads, info = ev2.compute_gradients(samples) """ raise NotImplementedError def apply_gradients(self, grads): - """Applies the given gradients to this Evaluator's weights. + """Applies the given gradients to this evaluator's weights. + + This method must be implemented by subclasses. Examples: >>> samples = ev1.sample() - >>> grads = ev2.compute_gradients(samples) + >>> grads, info = ev2.compute_gradients(samples) >>> ev1.apply_gradients(grads) """ @@ -48,8 +61,14 @@ class Evaluator(object): def get_weights(self): """Returns the model weights of this Evaluator. + This method must be implemented by subclasses. + Returns: object: weights that can be set on a compatible evaluator. + info: dictionary of extra metadata. + + Examples: + >>> weights = ev1.get_weights() """ raise NotImplementedError @@ -57,6 +76,8 @@ class Evaluator(object): def set_weights(self, weights): """Sets the model weights of this Evaluator. + This method must be implemented by subclasses. + Examples: >>> weights = ev1.get_weights() >>> ev2.set_weights(weights) @@ -65,27 +86,31 @@ class Evaluator(object): raise NotImplementedError def compute_apply(self, samples): - """Fused compute and apply gradients on given samples. + """Fused compute gradients and apply gradients call. Returns: - The result of calling compute_gradients(samples) + info: dictionary of extra metadata from compute_gradients(). + + Examples: + >>> batch = ev.sample() + >>> ev.compute_apply(samples) """ - grads = self.compute_gradients(samples) + grads, info = self.compute_gradients(samples) self.apply_gradients(grads) - return grads + return info def get_host(self): - """Returns hostname of actor.""" + """Returns the hostname of the process running this evaluator.""" return os.uname()[1] -class TFMultiGPUSupport(Evaluator): - """The multi-GPU TF optimizer requires additional TF-specific supportt. +class TFMultiGPUSupport(PolicyEvaluator): + """The multi-GPU TF optimizer requires additional TF-specific support. Attributes: - sess (Session) the tensorflow session associated with this evaluator + sess (Session): the tensorflow session associated with this evaluator. """ def tf_loss_inputs(self): @@ -102,7 +127,7 @@ class TFMultiGPUSupport(Evaluator): >>> print(ev.tf_loss_inputs()) [("action", action_placeholder), ("reward", reward_placeholder)] - >>> print(ev.sample().data.keys()) + >>> print(ev.sample()[0].data.keys()) ["action", "reward"] """ diff --git a/python/ray/rllib/optimizers/optimizer.py b/python/ray/rllib/optimizers/policy_optimizer.py similarity index 66% rename from python/ray/rllib/optimizers/optimizer.py rename to python/ray/rllib/optimizers/policy_optimizer.py index 2be7a2a86..cf3f5d755 100644 --- a/python/ray/rllib/optimizers/optimizer.py +++ b/python/ray/rllib/optimizers/policy_optimizer.py @@ -5,17 +5,27 @@ from __future__ import print_function import ray -class Optimizer(object): - """RLlib optimizers encapsulate distributed RL optimization strategies. +class PolicyOptimizer(object): + """Policy optimizers encapsulate distributed RL optimization strategies. + + Policy optimizers serve as the "control plane" of algorithms. For example, AsyncOptimizer is used for A3C, and LocalMultiGPUOptimizer is used for PPO. These optimizers are all pluggable, and it is possible to mix and match as needed. In order for an algorithm to use an RLlib optimizer, it must implement - the Evaluator interface and pass a number of Evaluators to its Optimizer - of choice. The Optimizer uses these Evaluators to sample from the - environment and compute model gradient updates. + the PolicyEvaluator interface and pass a PolicyEvaluator class or set of + PolicyEvaluators to its PolicyOptimizer of choice. The PolicyOptimizer + uses these Evaluators to sample from the environment and compute model + gradient updates. + + Attributes: + config (dict): The JSON configuration passed to this optimizer. + local_evaluator (PolicyEvaluator): The embedded evaluator instance. + remote_evaluators (list): List of remote evaluator replicas, or []. + num_steps_trained (int): Number of timesteps trained on so far. + num_steps_sampled (int): Number of timesteps sampled so far. """ @classmethod @@ -59,10 +69,17 @@ class Optimizer(object): self.num_steps_sampled = 0 def _init(self): + """Subclasses should prefer overriding this instead of __init__.""" + pass def step(self): - """Takes a logical optimization step.""" + """Takes a logical optimization step. + + This should run for long enough to minimize call overheads (i.e., at + least a couple seconds), but short enough to return control + periodically to callers (i.e., at most a few tens of seconds). + """ raise NotImplementedError @@ -75,8 +92,12 @@ class Optimizer(object): } def save(self): + """Returns a serializable object representing the optimizer state.""" + return [self.num_steps_trained, self.num_steps_sampled] def restore(self, data): + """Restores optimizer state from the given data object.""" + self.num_steps_trained = data[0] self.num_steps_sampled = data[1] diff --git a/python/ray/rllib/pg/pg_evaluator.py b/python/ray/rllib/pg/pg_evaluator.py index 4fa58b4bd..1f217ba02 100644 --- a/python/ray/rllib/pg/pg_evaluator.py +++ b/python/ray/rllib/pg/pg_evaluator.py @@ -3,14 +3,14 @@ from __future__ import division from __future__ import print_function from ray.rllib.models.catalog import ModelCatalog -from ray.rllib.optimizers import Evaluator +from ray.rllib.optimizers import PolicyEvaluator from ray.rllib.pg.policy import PGPolicy from ray.rllib.utils.filter import NoFilter from ray.rllib.utils.process_rollout import process_rollout from ray.rllib.utils.sampler import SyncSampler -class PGEvaluator(Evaluator): +class PGEvaluator(PolicyEvaluator): """Evaluator for simple policy gradient.""" def __init__(self, registry, env_creator, config): @@ -41,7 +41,7 @@ class PGEvaluator(Evaluator): def compute_gradients(self, samples): """ Returns gradient w.r.t. samples.""" gradient, info = self.policy.compute_gradients(samples) - return gradient + return gradient, {} def apply_gradients(self, grads): """Applies gradients to evaluator weights.""" diff --git a/python/ray/rllib/ppo/ppo_evaluator.py b/python/ray/rllib/ppo/ppo_evaluator.py index d1520ab4a..f012d1c24 100644 --- a/python/ray/rllib/ppo/ppo_evaluator.py +++ b/python/ray/rllib/ppo/ppo_evaluator.py @@ -11,7 +11,7 @@ from tensorflow.python import debug as tf_debug import numpy as np import ray -from ray.rllib.optimizers import Evaluator, SampleBatch +from ray.rllib.optimizers import PolicyEvaluator, SampleBatch from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer from ray.rllib.models import ModelCatalog from ray.rllib.utils.sampler import SyncSampler @@ -21,7 +21,7 @@ from ray.rllib.ppo.loss import ProximalPolicyLoss # TODO(rliaw): Move this onto LocalMultiGPUOptimizer -class PPOEvaluator(Evaluator): +class PPOEvaluator(PolicyEvaluator): """ Runner class that holds the simulator environment and the policy. diff --git a/python/ray/rllib/test/mock_evaluator.py b/python/ray/rllib/test/mock_evaluator.py index b70eb87cc..4762bb877 100644 --- a/python/ray/rllib/test/mock_evaluator.py +++ b/python/ray/rllib/test/mock_evaluator.py @@ -28,7 +28,7 @@ class _MockEvaluator(object): return SampleBatch(samples_dict) def compute_gradients(self, samples): - return self._grad * samples.count + return self._grad * samples.count, {} def apply_gradients(self, grads): self._weights += self._grad diff --git a/python/ray/rllib/tuned_examples/pong-apex.yaml b/python/ray/rllib/tuned_examples/pong-apex.yaml index f3f00c339..d63b2dea1 100644 --- a/python/ray/rllib/tuned_examples/pong-apex.yaml +++ b/python/ray/rllib/tuned_examples/pong-apex.yaml @@ -1,5 +1,6 @@ -# This can be expected to reach 20.8 reward within an hour when using -# a V100 GPU (e.g. p3.2xl instance on AWS, and m4.4xl workers). +# This can be expected to reach 20.8 reward within an hour when using a V100 GPU +# (e.g. p3.2xl instance on AWS, and m4.4xl workers). It also can reach ~21 reward +# within an hour with fewer workers (e.g. 4-8) but less reliably. pong-apex: env: PongNoFrameskip-v4 run: APEX @@ -8,7 +9,7 @@ pong-apex: eval: spec.config.num_workers gpu: 1 config: - force_evaluators_remote: True # requires cluster + force_evaluators_remote: True # set to False if you're running on a single node target_network_update_freq: 50000 num_workers: 32 lr: .0001 diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py index f463d4c4e..d42114cb0 100644 --- a/python/ray/rllib/utils/actors.py +++ b/python/ray/rllib/utils/actors.py @@ -7,6 +7,8 @@ import ray class TaskPool(object): + """Helper class for tracking the status of many in-flight actor tasks.""" + def __init__(self): self._tasks = {} diff --git a/python/ray/rllib/dqn/common/atari_wrappers.py b/python/ray/rllib/utils/atari_wrappers.py similarity index 94% rename from python/ray/rllib/dqn/common/atari_wrappers.py rename to python/ray/rllib/utils/atari_wrappers.py index 4726a4f99..ac2ebb705 100644 --- a/python/ray/rllib/dqn/common/atari_wrappers.py +++ b/python/ray/rllib/utils/atari_wrappers.py @@ -144,11 +144,11 @@ class MaxAndSkipEnv(gym.Wrapper): class WarpFrame(gym.ObservationWrapper): - def __init__(self, env): - """Warp frames to 84x84 as done in the Nature paper and later work.""" + def __init__(self, env, dim): + """Warp frames to the specified size (dim x dim).""" gym.ObservationWrapper.__init__(self, env) - self.width = 80 # in rllib we use 80 - self.height = 80 + self.width = dim # in rllib we use 80 + self.height = dim self.observation_space = spaces.Box( low=0, high=255, shape=(self.height, self.width, 1)) @@ -185,10 +185,14 @@ class FrameStack(gym.Wrapper): return np.concatenate(self.frames, axis=2) -def wrap_deepmind(env, random_starts): +def wrap_deepmind(env, random_starts=True, dim=80): """Configure environment for DeepMind-style Atari. Note that we assume reward clipping is done outside the wrapper. + + Args: + random_starts (bool): Start with random actions instead of noops. + dim (int): Dimension to resize observations to (dim x dim). """ env = NoopResetEnv(env, noop_max=30, random_starts=random_starts) if 'NoFrameskip' in env.spec.id: @@ -196,7 +200,7 @@ def wrap_deepmind(env, random_starts): env = EpisodicLifeEnv(env) if 'FIRE' in env.unwrapped.get_action_meanings(): env = FireResetEnv(env) - env = WarpFrame(env) + env = WarpFrame(env, dim) # env = ClipRewardEnv(env) # reward clipping is handled by DQN replay env = FrameStack(env, 4) return env