[docs] rewrite (#5175)

This commit is contained in:
Richard Liaw 2019-08-05 23:33:14 -07:00 committed by Eric Liang
parent 5d7afe8092
commit a08ea09760
32 changed files with 1456 additions and 1627 deletions

View file

@ -1,28 +1,14 @@
Actors How-to: Using Actors
====== ====================
Remote functions in Ray should be thought of as functional and side-effect free. An actor is essentially a stateful worker (or a service). When a new actor is
Restricting ourselves only to remote functions gives us distributed functional instantiated, a new worker is created, and methods of the actor are scheduled on
programming, which is great for many use cases, but in practice is a bit that specific worker and can access and mutate the state of that worker.
limited.
Ray extends the dataflow model with **actors**. An actor is essentially a Creating an actor
stateful worker (or a service). When a new actor is instantiated, a new worker -----------------
is created, and methods of the actor are scheduled on that specific worker and
can access and mutate the state of that worker.
Suppose we've already started Ray. You can convert a standard Python class into a Ray actor class as follows:
.. code-block:: python
import ray
ray.init()
Defining and creating an actor
------------------------------
Consider the following simple example. The ``ray.remote`` decorator indicates
that instances of the ``Counter`` class will be actors.
.. code-block:: python .. code-block:: python
@ -35,234 +21,121 @@ that instances of the ``Counter`` class will be actors.
self.value += 1 self.value += 1
return self.value return self.value
To actually create an actor, we can instantiate this class by calling Note that the above is equivalent to the following:
``Counter.remote()``.
.. code-block:: python .. code-block:: python
a1 = Counter.remote() class Counter(object):
a2 = Counter.remote() def __init__(self):
self.value = 0
When an actor is instantiated, the following events happen. def increment(self):
self.value += 1
return self.value
Counter = ray.remote(Counter)
When the above actor is instantiated, the following events happen.
1. A node in the cluster is chosen and a worker process is created on that node 1. A node in the cluster is chosen and a worker process is created on that node
(by the raylet on that node) for the purpose of running methods for the purpose of running methods called on the actor.
called on the actor.
2. A ``Counter`` object is created on that worker and the ``Counter`` 2. A ``Counter`` object is created on that worker and the ``Counter``
constructor is run. constructor is run.
Using an actor Any method of the actor can return multiple object IDs with the ``ray.method`` decorator:
--------------
We can schedule tasks on the actor by calling its methods.
.. code-block:: python .. code-block:: python
a1.increment.remote() # ray.get returns 1
a2.increment.remote() # ray.get returns 1
When ``a1.increment.remote()`` is called, the following events happens.
1. A task is created.
2. The task is assigned directly to the raylet responsible for the
actor by the driver's raylet.
3. An object ID is returned.
We can then call ``ray.get`` on the object ID to retrieve the actual value.
Similarly, the call to ``a2.increment.remote()`` generates a task that is
scheduled on the second ``Counter`` actor. Since these two tasks run on
different actors, they can be executed in parallel (note that only actor
methods will be scheduled on actor workers, regular remote functions will not
be).
On the other hand, methods called on the same ``Counter`` actor are executed
serially in the order that they are called. They can thus share state with
one another, as shown below.
.. code-block:: python
# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]
# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results) # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results) # prints [2, 3, 4, 5, 6]
A More Interesting Actor Example
--------------------------------
A common pattern is to use actors to encapsulate the mutable state managed by an
external library or service.
`Gym`_ provides an interface to a number of simulated environments for testing
and training reinforcement learning agents. These simulators are stateful, and
tasks that use these simulators must mutate their state. We can use actors to
encapsulate the state of these simulators.
.. _`Gym`: https://gym.openai.com/
.. code-block:: python
import gym
@ray.remote @ray.remote
class GymEnvironment(object): class Foo(object):
def __init__(self, name):
self.env = gym.make(name)
self.env.reset()
def step(self, action): @ray.method(num_return_vals=2)
return self.env.step(action) def bar(self):
return 1, 2
def reset(self): f = Foo.remote()
self.env.reset()
We can then instantiate an actor and schedule a task on that actor as follows. obj_id1, obj_id2 = f.bar.remote()
assert ray.get(obj_id1) == 1
assert ray.get(obj_id2) == 2
.. code-block:: python Resources with Actors
---------------------
pong = GymEnvironment.remote("Pong-v0") You can specify that an actor requires CPUs or GPUs in the decorator. While Ray has built-in support for CPUs and GPUs, Ray can also handle custom resources.
pong.step.remote(0) # Take action 0 in the simulator.
Using GPUs on actors When using GPUs, Ray will automatically set the environment variable ``CUDA_VISIBLE_DEVICES`` for the actor after instantiated. The actor will have access to a list of the IDs of the GPUs
--------------------
A common use case is for an actor to contain a neural network. For example,
suppose we have imported Tensorflow and have created a method for constructing
a neural net.
.. code-block:: python
import tensorflow as tf
def construct_network():
x = tf.placeholder(tf.float32, [None, 784])
y_ = tf.placeholder(tf.float32, [None, 10])
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.nn.softmax(tf.matmul(x, W) + b)
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
return x, y_, train_step, accuracy
We can then define an actor for this network as follows.
.. code-block:: python
import os
# Define an actor that runs on GPUs. If there are no GPUs, then simply use
# ray.remote without any arguments and no parentheses.
@ray.remote(num_gpus=1)
class NeuralNetOnGPU(object):
def __init__(self):
# Set an environment variable to tell TensorFlow which GPUs to use. Note
# that this must be done before the call to tf.Session.
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()])
with tf.Graph().as_default():
with tf.device("/gpu:0"):
self.x, self.y_, self.train_step, self.accuracy = construct_network()
# Allow this to run on CPUs if there aren't any GPUs.
config = tf.ConfigProto(allow_soft_placement=True)
self.sess = tf.Session(config=config)
# Initialize the network.
init = tf.global_variables_initializer()
self.sess.run(init)
To indicate that an actor requires one GPU, we pass in ``num_gpus=1`` to
``ray.remote``. Note that in order for this to work, Ray must have been started
with some GPUs, e.g., via ``ray.init(num_gpus=2)``. Otherwise, when you try to
instantiate the GPU version with ``NeuralNetOnGPU.remote()``, an exception will
be thrown saying that there aren't enough GPUs in the system.
When the actor is created, it will have access to a list of the IDs of the GPUs
that it is allowed to use via ``ray.get_gpu_ids()``. This is a list of integers, that it is allowed to use via ``ray.get_gpu_ids()``. This is a list of integers,
like ``[]``, or ``[1]``, or ``[2, 5, 6]``. Since we passed in like ``[]``, or ``[1]``, or ``[2, 5, 6]``.
``ray.remote(num_gpus=1)``, this list will have length one.
We can put this all together as follows.
.. code-block:: python .. code-block:: python
import os @ray.remote(num_cpus=2, num_gpus=1)
import ray class GPUActor(object):
import tensorflow as tf pass
from tensorflow.examples.tutorials.mnist import input_data
ray.init(num_gpus=8) When an ``GPUActor`` instance is created, it will be placed on a node that has
at least 1 GPU, and the GPU will be reserved for the actor for the duration of
the actor's lifetime (even if the actor is not executing tasks). The GPU
resources will be released when the actor terminates.
def construct_network(): If you want to use custom resources, make sure your cluster is configured to
x = tf.placeholder(tf.float32, [None, 784]) have these resources (see `configuration instructions
y_ = tf.placeholder(tf.float32, [None, 10]) <configure.html#cluster-resources>`__):
W = tf.Variable(tf.zeros([784, 10])) .. important::
b = tf.Variable(tf.zeros([10]))
y = tf.nn.softmax(tf.matmul(x, W) + b)
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) * If you specify resource requirements in an actor class's remote decorator,
train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy) then the actor will acquire those resources for its entire lifetime (if you
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1)) do not specify CPU resources, the default is 1), even if it is not executing
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) any methods. The actor will not acquire any additional resources when
executing methods.
* If you do not specify any resource requirements in the actor class's remote
decorator, then by default, the actor will not acquire any resources for its
lifetime, but every time it executes a method, it will need to acquire 1 CPU
resource.
return x, y_, train_step, accuracy If you need to instantiate many copies of the same actor with varying resource
requirements, you can do so as follows.
@ray.remote(num_gpus=1) .. code-block:: python
class NeuralNetOnGPU(object):
def __init__(self, mnist_data):
self.mnist = mnist_data
# Set an environment variable to tell TensorFlow which GPUs to use. Note
# that this must be done before the call to tf.Session.
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()])
with tf.Graph().as_default():
with tf.device("/gpu:0"):
self.x, self.y_, self.train_step, self.accuracy = construct_network()
# Allow this to run on CPUs if there aren't any GPUs.
config = tf.ConfigProto(allow_soft_placement=True)
self.sess = tf.Session(config=config)
# Initialize the network.
init = tf.global_variables_initializer()
self.sess.run(init)
def train(self, num_steps): a1 = Counter._remote(num_cpus=1, resources={"Custom1": 1})
for _ in range(num_steps): a2 = Counter._remote(num_cpus=2, resources={"Custom2": 1})
batch_xs, batch_ys = self.mnist.train.next_batch(100) a3 = Counter._remote(num_cpus=3, resources={"Custom3": 1})
self.sess.run(self.train_step, feed_dict={self.x: batch_xs, self.y_: batch_ys})
def get_accuracy(self): Note that to create these actors successfully, Ray will need to be started with
return self.sess.run(self.accuracy, feed_dict={self.x: self.mnist.test.images, sufficient CPU resources and the relevant custom resources.
self.y_: self.mnist.test.labels})
.. code-block:: python
@ray.remote(resources={'Resource2': 1})
class GPUActor(object):
pass
# Load the MNIST dataset and tell Ray how to serialize the custom classes. Terminating Actors
mnist = input_data.read_data_sets("MNIST_data", one_hot=True) ------------------
# Create the actor. Actor processes will be terminated automatically when the initial actor handle
nn = NeuralNetOnGPU.remote(mnist) goes out of scope in Python. If we create an actor with ``actor_handle =
Counter.remote()``, then when ``actor_handle`` goes out of scope and is
destructed, the actor process will be terminated. Note that this only applies to
the original actor handle created for the actor and not to subsequent actor
handles created by passing the actor handle to other tasks.
# Run a few steps of training and print the accuracy. If necessary, you can manually terminate an actor by calling
nn.train.remote(100) ``ray.actor.exit_actor()`` from within one of the actor methods. This will kill
accuracy = ray.get(nn.get_accuracy.remote()) the actor process and release resources associated/assigned to the actor. This
print("Accuracy is {}.".format(accuracy)) approach should generally not be necessary as actors are automatically garbage
collected.
Passing Around Actor Handles (Experimental) Passing Around Actor Handles
------------------------------------------- ----------------------------
Actor handles can be passed into other tasks. To see an example of this, take a Actor handles can be passed into other tasks. To see an example of this, take a
look at the `asynchronous parameter server example`_. To illustrate this with look at the `asynchronous parameter server example`_. To illustrate this with a
a simple example, consider a simple actor definition. This functionality is simple example, consider a simple actor definition.
currently **experimental** and subject to the limitations described below.
.. code-block:: python .. code-block:: python
@ -281,9 +154,12 @@ We can define remote functions (or actor methods) that use actor handles.
.. code-block:: python .. code-block:: python
import time
@ray.remote @ray.remote
def f(counter): def f(counter):
while True: for _ in range(1000):
time.sleep(0.1)
counter.inc.remote() counter.inc.remote()
If we instantiate an actor, we can pass the handle around to various tasks. If we instantiate an actor, we can pass the handle around to various tasks.
@ -293,35 +169,11 @@ If we instantiate an actor, we can pass the handle around to various tasks.
counter = Counter.remote() counter = Counter.remote()
# Start some tasks that use the actor. # Start some tasks that use the actor.
[f.remote(counter) for _ in range(4)] [f.remote(counter) for _ in range(3)]
# Print the counter value. # Print the counter value.
for _ in range(10): for _ in range(10):
time.sleep(1)
print(ray.get(counter.get_counter.remote())) print(ray.get(counter.get_counter.remote()))
Current Actor Limitations
-------------------------
We are working to address the following issues.
1. **Actor lifetime management:** Currently, when the original actor handle for
an actor goes out of scope, a task is scheduled on that actor that kills the
actor process (this new task will run once all previous tasks have finished
running). This could be an issue if the original actor handle goes out of
scope, but the actor is still being used by tasks that have been passed the
actor handle.
2. **Returning actor handles:** Actor handles currently cannot be returned from
a remote function or actor method. Similarly, ``ray.put`` cannot be called on
an actor handle.
3. **Reconstruction of evicted actor objects:** If ``ray.get`` is called on an
evicted object that was created by an actor method, Ray currently will not
reconstruct the object. For more information, see the documentation on
`fault tolerance`_.
4. **Deterministic reconstruction of lost actors:** If an actor is lost due to
node failure, the actor is reconstructed on a new node, following the order
of initial execution. However, new tasks that are scheduled onto the actor
in the meantime may execute in between re-executed tasks. This could be an
issue if your application has strict requirements for state consistency.
.. _`asynchronous parameter server example`: http://ray.readthedocs.io/en/latest/example-parameter-server.html .. _`asynchronous parameter server example`: http://ray.readthedocs.io/en/latest/example-parameter-server.html
.. _`fault tolerance`: http://ray.readthedocs.io/en/latest/fault-tolerance.html

207
doc/source/advanced.rst Normal file
View file

@ -0,0 +1,207 @@
Advanced Usage
==============
This page will cover some more advanced examples of using Ray's flexible programming model.
Nested Remote Functions
-----------------------
Remote functions can call other remote functions, resulting in nested tasks.
For example, consider the following.
.. code:: python
@ray.remote
def f():
return 1
@ray.remote
def g():
# Call f 4 times and return the resulting object IDs.
return [f.remote() for _ in range(4)]
@ray.remote
def h():
# Call f 4 times, block until those 4 tasks finish,
# retrieve the results, and return the values.
return ray.get([f.remote() for _ in range(4)])
Then calling ``g`` and ``h`` produces the following behavior.
.. code:: python
>>> ray.get(g.remote())
[ObjectID(b1457ba0911ae84989aae86f89409e953dd9a80e),
ObjectID(7c14a1d13a56d8dc01e800761a66f09201104275),
ObjectID(99763728ffc1a2c0766a2000ebabded52514e9a6),
ObjectID(9c2f372e1933b04b2936bb6f58161285829b9914)]
>>> ray.get(h.remote())
[1, 1, 1, 1]
**One limitation** is that the definition of ``f`` must come before the
definitions of ``g`` and ``h`` because as soon as ``g`` is defined, it
will be pickled and shipped to the workers, and so if ``f`` hasn't been
defined yet, the definition will be incomplete.
Circular Dependencies
---------------------
Consider the following remote function.
.. code-block:: python
@ray.remote(num_cpus=1, num_gpus=1)
def g():
return ray.get(f.remote())
When a ``g`` task is executing, it will release its CPU resources when it gets
blocked in the call to ``ray.get``. It will reacquire the CPU resources when
``ray.get`` returns. It will retain its GPU resources throughout the lifetime of
the task because the task will most likely continue to use GPU memory.
Cython Code in Ray
------------------
To use Cython code in Ray, run the following from directory ``$RAY_HOME/examples/cython``:
.. code-block:: bash
pip install scipy # For BLAS example
pip install -e .
python cython_main.py --help
You can import the ``cython_examples`` module from a Python script or interpreter.
Notes
~~~~~
* You **must** include the following two lines at the top of any ``*.pyx`` file:
.. code-block:: python
#!python
# cython: embedsignature=True, binding=True
* You cannot decorate Cython functions within a ``*.pyx`` file (there are ways around this, but creates a leaky abstraction between Cython and Python that would be very challenging to support generally). Instead, prefer the following in your Python code:
.. code-block:: python
some_cython_func = ray.remote(some_cython_module.some_cython_func)
* You cannot transfer memory buffers to a remote function (see ``example8``, which currently fails); your remote function must return a value
* Have a look at ``cython_main.py``, ``cython_simple.pyx``, and ``setup.py`` for examples of how to call, define, and build Cython code, respectively. The Cython `documentation <http://cython.readthedocs.io/>`_ is also very helpful.
* Several limitations come from Cython's own `unsupported <https://github.com/cython/cython/wiki/Unsupported>`_ Python features.
* We currently do not support compiling and distributing Cython code to ``ray`` clusters. In other words, Cython developers are responsible for compiling and distributing any Cython code to their cluster (much as would be the case for users who need Python packages like ``scipy``).
* For most simple use cases, developers need not worry about Python 2 or 3, but users who do need to care can have a look at the ``language_level`` Cython compiler directive (see `here <http://cython.readthedocs.io/en/latest/src/reference/compilation.html>`_).
Serialization
-------------
There are a number of situations in which Ray will place objects in the object
store. Once an object is placed in the object store, it is immutable. Situations include:
1. The return values of a remote function.
2. The value ``x`` in a call to ``ray.put(x)``.
3. Arguments to remote functions (except for simple arguments like ints or
floats).
A Python object may have an arbitrary number of pointers with arbitrarily deep
nesting. To place an object in the object store or send it between processes,
it must first be converted to a contiguous string of bytes. Serialization and deserialization can often be a bottleneck.
Pickle is standard Python serialization library. However, for numerical workloads, pickling and unpickling can be inefficient. For example, if multiple processes want to access a Python list of numpy arrays, each process must unpickle the list and create its own new copies of the arrays. This can lead to high memory overheads, even when all processes are read-only and could easily share memory.
In Ray, we optimize for numpy arrays by using the `Apache Arrow`_ data format.
When we deserialize a list of numpy arrays from the object store, we still
create a Python list of numpy array objects. However, rather than copy each
numpy array, each numpy array object holds a pointer to the relevant array held
in shared memory. There are some advantages to this form of serialization.
- Deserialization can be very fast.
- Memory is shared between processes so worker processes can all read the same
data without having to copy it.
.. _`Apache Arrow`: https://arrow.apache.org/
What Objects Does Ray Handle
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Ray does not currently support serialization of arbitrary Python objects. The
set of Python objects that Ray can serialize using Arrow includes the following.
1. Primitive types: ints, floats, longs, bools, strings, unicode, and numpy
arrays.
2. Any list, dictionary, or tuple whose elements can be serialized by Ray.
For a more general object, Ray will first attempt to serialize the object by
unpacking the object as a dictionary of its fields. This behavior is not
correct in all cases. If Ray cannot serialize the object as a dictionary of its
fields, Ray will fall back to using pickle. However, using pickle will likely
be inefficient.
Notes and limitations
~~~~~~~~~~~~~~~~~~~~~
- We currently handle certain patterns incorrectly, according to Python
semantics. For example, a list that contains two copies of the same list will
be serialized as if the two lists were distinct.
.. code-block:: python
l1 = [0]
l2 = [l1, l1]
l3 = ray.get(ray.put(l2))
l2[0] is l2[1] # True.
l3[0] is l3[1] # False.
- For reasons similar to the above example, we also do not currently handle
objects that recursively contain themselves (this may be common in graph-like
data structures).
.. code-block:: python
l = []
l.append(l)
# Try to put this list that recursively contains itself in the object store.
ray.put(l)
This will throw an exception with a message like the following.
.. code-block:: bash
This object exceeds the maximum recursion depth. It may contain itself recursively.
- Whenever possible, use numpy arrays for maximum performance.
Last Resort Workaround
~~~~~~~~~~~~~~~~~~~~~~
If you find cases where Ray serialization doesn't work or does something
unexpected, please `let us know`_ so we can fix it. In the meantime, you may
have to resort to writing custom serialization and deserialization code (e.g.,
calling pickle by hand).
.. _`let us know`: https://github.com/ray-project/ray/issues
.. code-block:: python
import pickle
@ray.remote
def f(complicated_object):
# Deserialize the object manually.
obj = pickle.loads(complicated_object)
return "Successfully passed {} into f.".format(obj)
# Define a complicated object.
l = []
l.append(l)
# Manually serialize the object and pass it in as a string.
ray.get(f.remote(pickle.dumps(l))) # prints 'Successfully passed [[...]] into f.'
**Note:** If you have trouble with pickle, you may have better luck with
cloudpickle.

View file

@ -1,5 +1,5 @@
Cluster Setup and Auto-Scaling Built-in Autoscaling
============================== =====================
This document provides instructions for launching a Ray cluster either privately, on AWS, or on GCP. This document provides instructions for launching a Ray cluster either privately, on AWS, or on GCP.

183
doc/source/configure.rst Normal file
View file

@ -0,0 +1,183 @@
Configuring Ray
===============
This page discusses the various way to configure Ray, both from the Python API
and from the command line. Take a look at the ``ray.init`` `documentation
<package-ref.html#ray.init>`__ for a complete overview of the configurations.
Cluster Resources
-----------------
Ray by default detects available resources.
.. code-block:: python
# This automatically detects available resources in the single machine.
ray.init()
If not running cluster mode, you can specify cluster resources overrides through ``ray.init`` as follows.
.. code-block:: python
# If not connecting to an existing cluster, you can specify resources overrides:
ray.init(num_cpus=8, num_gpus=1)
# Specifying custom resources
ray.init(num_gpus=1, resources={'Resource1': 4, 'Resource2': 16})
When starting Ray from the command line, pass the ``--num-cpus`` and ``--num-cpus`` flags into ``ray start``. You can also specify custom resources.
.. code-block:: bash
# To start a head node.
$ ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>
# To start a non-head node.
$ ray start --redis-address=<redis-address> --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>
# Specifying custom resources
ray start [--head] --num-cpus=<NUM_CPUS> --resources='{"Resource1": 4, "Resource2": 16}'
If using the command line, connect to the Ray cluster as follow:
.. code-block:: python
# Connect to ray. Notice if connected to existing cluster, you don't specify resources.
ray.init(redis_address=<redis-address>)
Logging and Debugging
---------------------
Each Ray session will have a unique name. By default, the name is
``session_{timestamp}_{pid}``. The format of ``timestamp`` is
``%Y-%m-%d_%H-%M-%S_%f`` (See `Python time format <strftime.org>`__ for details);
the pid belongs to the startup process (the process calling ``ray.init()`` or
the Ray process executed by a shell in ``ray start``).
For each session, Ray will place all its temporary files under the
*session directory*. A *session directory* is a subdirectory of the
*root temporary path* (``/tmp/ray`` by default),
so the default session directory is ``/tmp/ray/{ray_session_name}``.
You can sort by their names to find the latest session.
Change the *root temporary directory* in one of these ways:
* Pass ``--temp-dir={your temp path}`` to ``ray start``
* Specify ``temp_dir`` when call ``ray.init()``
You can also use ``default_worker.py --temp-dir={your temp path}`` to
start a new worker with the given *root temporary directory*.
**Layout of logs**:
.. code-block:: text
/tmp
└── ray
└── session_{datetime}_{pid}
├── logs # for logging
│   ├── log_monitor.err
│   ├── log_monitor.out
│   ├── monitor.err
│   ├── monitor.out
│   ├── plasma_store.err # outputs of the plasma store
│   ├── plasma_store.out
│   ├── raylet.err # outputs of the raylet process
│   ├── raylet.out
│   ├── redis-shard_0.err # outputs of redis shards
│   ├── redis-shard_0.out
│   ├── redis.err # redis
│   ├── redis.out
│   ├── webui.err # ipython notebook web ui
│   ├── webui.out
│   ├── worker-{worker_id}.err # redirected output of workers
│   ├── worker-{worker_id}.out
│   └── {other workers}
└── sockets # for sockets
├── plasma_store
└── raylet # this could be deleted by Ray's shutdown cleanup.
Redis Port Authentication
-------------------------
Ray instances should run on a secure network without public facing ports.
The most common threat for Ray instances is unauthorized access to Redis,
which can be exploited to gain shell access and run arbitrary code.
The best fix is to run Ray instances on a secure, trusted network.
Running Ray on a secured network is not always feasible.
To prevent exploits via unauthorized Redis access, Ray provides the option to
password-protect Redis ports. While this is not a replacement for running Ray
behind a firewall, this feature is useful for instances exposed to the internet
where configuring a firewall is not possible. Because Redis is
very fast at serving queries, the chosen password should be long.
Redis authentication is only supported on the raylet code path.
To add authentication via the Python API, start Ray using:
.. code-block:: python
ray.init(redis_password="password")
To add authentication via the CLI or to connect to an existing Ray instance with
password-protected Redis ports:
.. code-block:: bash
ray start [--head] --redis-password="password"
While Redis port authentication may protect against external attackers,
Ray does not encrypt traffic between nodes so man-in-the-middle attacks are
possible for clusters on untrusted networks.
See the `Redis security documentation <https://redis.io/topics/security>`__
for more information.
Using the Object Store with Huge Pages
--------------------------------------
Plasma is a high-performance shared memory object store originally developed in
Ray and now being developed in `Apache Arrow`_. See the `relevant
documentation`_.
On Linux, it is possible to increase the write throughput of the Plasma object
store by using huge pages. You first need to create a file system and activate
huge pages as follows.
.. code-block:: shell
sudo mkdir -p /mnt/hugepages
gid=`id -g`
uid=`id -u`
sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages
sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group"
# This typically corresponds to 20000 2MB pages (about 40GB), but this
# depends on the platform.
sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages"
**Note:** Once you create the huge pages, they will take up memory which will
never be freed unless you remove the huge pages. If you run into memory issues,
that may be the issue.
You need root access to create the file system, but not for running the object
store.
You can then start Ray with huge pages on a single machine as follows.
.. code-block:: python
ray.init(huge_pages=True, plasma_directory="/mnt/hugepages")
In the cluster case, you can do it by passing ``--huge-pages`` and
``--plasma-directory=/mnt/hugepages`` into ``ray start`` on any machines where
huge pages should be enabled.
See the relevant `Arrow documentation for huge pages`_.
.. _`Apache Arrow`: https://arrow.apache.org/
.. _`relevant documentation`: https://arrow.apache.org/docs/python/plasma.html#the-plasma-in-memory-object-store
.. _`Arrow documentation for huge pages`: https://arrow.apache.org/docs/python/plasma.html#using-plasma-with-huge-pages

View file

@ -1,13 +0,0 @@
Contact
=======
The following are good places to discuss Ray.
1. `ray-dev@googlegroups.com`_: For discussions about development or any general
questions.
2. `StackOverflow`_: For questions about how to use Ray.
3. `GitHub Issues`_: For bug reports and feature requests.
.. _`ray-dev@googlegroups.com`: https://groups.google.com/forum/#!forum/ray-dev
.. _`GitHub Issues`: https://github.com/ray-project/ray/issues
.. _`StackOverflow`: https://stackoverflow.com/questions/tagged/ray

81
doc/source/contrib.rst Normal file
View file

@ -0,0 +1,81 @@
Contributing to Ray
====================
We welcome (and encourage!) all forms of contributions to Ray, including and not limited to:
- Code reviewing of patches and PRs.
- Pushing patches.
- Documentation and examples.
- Community participation in forums and issues.
- Code readability and code comments to improve readability.
- Test cases to make the codebase more robust.
- Tutorials, blog posts, talks that promote the project.
What can I work on?
-------------------
We use Github to track issues, feature requests, and bugs. Take a look at the
ones labeled `"good first issue" <https://github.com/ray-project/ray/issues?utf8=%E2%9C%93&q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22>`__ and `"help wanted" <https://github.com/ray-project/ray/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22>`__ for a place to start.
Submitting and Merging a Contribution
-------------------------------------
There are a couple steps to merge a contribution.
1. First rebase your development branch on the most recent version of master.
.. code:: bash
git remote add upstream https://github.com/ray-project/ray.git
git fetch upstream
git rebase upstream/master
2. Make sure all existing tests `pass <contrib.html#testing>`__.
3. If introducing a new feature or patching a bug, be sure to add new test cases
in the relevant file in `ray/python/ray/tests/`.
4. Document the code. Public functions need to be documented, and remember to provide an usage
example if applicable.
5. Request code reviews from other contributors and address their comments. One fast way to get reviews is
to help review others' code so that they return the favor. You should aim to improve the code as much as
possible before the review. We highly value patches that can get in without extensive reviews.
6. Reviewers will merge and approve the pull request; be sure to ping them if
the pull request is getting stale.
Testing
-------
Even though we have hooks to run unit tests automatically for each pull request,
we recommend you to run unit tests locally beforehand to reduce reviewers
burden and speedup review process.
.. code-block:: shell
pytest ray/python/ray/Ray/tests/
Documentation should be documented in `Google style <https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html>`__ format.
We also have tests for code formatting and linting that need to pass before merge.
Install ``yapf==0.23, flake8, flake8-quotes``. You can run the following locally:
.. code-block:: shell
ray/scripts/format.sh
Becoming a Reviewer
-------------------
We identify reviewers from active contributors. Reviewers are individuals who
not only actively contribute to the project and are also willing
to participate in the code review of new contributions.
A pull request to the project has to be reviewed by at least one reviewer in order to be merged.
There is currently no formal process, but active contributors to Ray will be
solicited by current reviewers.
.. note::
These tips are based off of the TVM `contributor guide <https://github.com/dmlc/tvm>`__.

View file

@ -159,13 +159,13 @@ Linting
------- -------
**Running linter locally:** To run the Python linter on a specific file, run **Running linter locally:** To run the Python linter on a specific file, run
something like ``flake8 ray/python/ray/worker.py``. You may need to first run something like ``flake8 ray/python/ray/worker.py``. You may need to first run
``pip install flake8``. ``pip install flake8``.
**Autoformatting code**. We use ``yapf`` https://github.com/google/yapf for **Autoformatting code**. We use `yapf <https://github.com/google/yapf>`_ for
linting, and the config file is located at ``.style.yapf``. We recommend linting, and the config file is located at ``.style.yapf``. We recommend
running ``scripts/yapf.sh`` prior to pushing to format changed files. running ``scripts/yapf.sh`` prior to pushing to format changed files.
Note that some projects such as dataframes and rllib are currently excluded. Note that some projects such as dataframes and rllib are currently excluded.

View file

@ -1,37 +0,0 @@
Cython
======
Getting Started
---------------
This document provides examples of using Cython-generated code in ``ray``. To
get started, run the following from directory ``$RAY_HOME/examples/cython``:
.. code-block:: bash
pip install scipy # For BLAS example
pip install -e .
python cython_main.py --help
You can import the ``cython_examples`` module from a Python script or interpreter.
Notes
-----
* You **must** include the following two lines at the top of any ``*.pyx`` file:
.. code-block:: python
#!python
# cython: embedsignature=True, binding=True
* You cannot decorate Cython functions within a ``*.pyx`` file (there are ways around this, but creates a leaky abstraction between Cython and Python that would be very challenging to support generally). Instead, prefer the following in your Python code:
.. code-block:: python
some_cython_func = ray.remote(some_cython_module.some_cython_func)
* You cannot transfer memory buffers to a remote function (see ``example8``, which currently fails); your remote function must return a value
* Have a look at ``cython_main.py``, ``cython_simple.pyx``, and ``setup.py`` for examples of how to call, define, and build Cython code, respectively. The Cython `documentation <http://cython.readthedocs.io/>`_ is also very helpful.
* Several limitations come from Cython's own `unsupported <https://github.com/cython/cython/wiki/Unsupported>`_ Python features.
* We currently do not support compiling and distributing Cython code to ``ray`` clusters. In other words, Cython developers are responsible for compiling and distributing any Cython code to their cluster (much as would be the case for users who need Python packages like ``scipy``).
* For most simple use cases, developers need not worry about Python 2 or 3, but users who do need to care can have a look at the ``language_level`` Cython compiler directive (see `here <http://cython.readthedocs.io/en/latest/src/reference/compilation.html>`_).

View file

@ -1,95 +0,0 @@
Evolution Strategies
====================
This document provides a walkthrough of the evolution strategies example.
To run the application, first install some dependencies.
.. code-block:: bash
pip install tensorflow
pip install gym
You can view the `code for this example`_.
.. _`code for this example`: https://github.com/ray-project/ray/tree/master/rllib/agents/es
The script can be run as follows. Note that the configuration is tuned to work
on the ``Humanoid-v1`` gym environment.
.. code-block:: bash
rllib train --env=Humanoid-v1 --run=ES
To train a policy on a cluster (e.g., using 900 workers), run the following.
.. code-block:: bash
rllib train \
--env=Humanoid-v1 \
--run=ES \
--redis-address=<redis-address> \
--config='{"num_workers": 900, "episodes_per_batch": 10000, "train_batch_size": 100000}'
At the heart of this example, we define a ``Worker`` class. These workers have
a method ``do_rollouts``, which will be used to perform simulate randomly
perturbed policies in a given environment.
.. code-block:: python
@ray.remote
class Worker(object):
def __init__(self, config, policy_params, env_name, noise):
self.env = # Initialize environment.
self.policy = # Construct policy.
# Details omitted.
def do_rollouts(self, params):
perturbation = # Generate a random perturbation to the policy.
self.policy.set_weights(params + perturbation)
# Do rollout with the perturbed policy.
self.policy.set_weights(params - perturbation)
# Do rollout with the perturbed policy.
# Return the rewards.
In the main loop, we create a number of actors with this class.
.. code-block:: python
workers = [Worker.remote(config, policy_params, env_name, noise_id)
for _ in range(num_workers)]
We then enter an infinite loop in which we use the actors to perform rollouts
and use the rewards from the rollouts to update the policy.
.. code-block:: python
while True:
# Get the current policy weights.
theta = policy.get_weights()
# Put the current policy weights in the object store.
theta_id = ray.put(theta)
# Use the actors to do rollouts, note that we pass in the ID of the policy
# weights.
rollout_ids = [worker.do_rollouts.remote(theta_id), for worker in workers]
# Get the results of the rollouts.
results = ray.get(rollout_ids)
# Update the policy.
optimizer.update(...)
In addition, note that we create a large object representing a shared block of
random noise. We then put the block in the object store so that each ``Worker``
actor can use it without creating its own copy.
.. code-block:: python
@ray.remote
def create_shared_noise():
noise = np.random.randn(250000000)
return noise
noise_id = create_shared_noise.remote()
Recall that the ``noise_id`` argument is passed into the actor constructor.

View file

@ -1,42 +0,0 @@
Policy Gradient Methods
=======================
This code shows how to do reinforcement learning with policy gradient methods.
View the `code for this example`_.
.. note::
For an overview of Ray's reinforcement learning library, see `RLlib <http://ray.readthedocs.io/en/latest/rllib.html>`__.
To run this example, you will need to install `TensorFlow with GPU support`_ (at
least version ``1.0.0``) and a few other dependencies.
.. code-block:: bash
pip install gym[atari]
pip install tensorflow
Then you can run the example as follows.
.. code-block:: bash
rllib train --env=Pong-ram-v4 --run=PPO
This will train an agent on the ``Pong-ram-v4`` Atari environment. You can also
try passing in the ``Pong-v0`` environment or the ``CartPole-v0`` environment.
If you wish to use a different environment, you will need to change a few lines
in ``example.py``.
Current and historical training progress can be monitored by pointing
TensorBoard to the log output directory as follows.
.. code-block:: bash
tensorboard --logdir=~/ray_results
Many of the TensorBoard metrics are also printed to the console, but you might
find it easier to visualize and compare between runs using the TensorBoard UI.
.. _`TensorFlow with GPU support`: https://www.tensorflow.org/install/
.. _`code for this example`: https://github.com/ray-project/ray/tree/master/rllib/agents/ppo

14
doc/source/examples.rst Normal file
View file

@ -0,0 +1,14 @@
Examples
========
MapReduce
---------
Parameter Server
----------------
Deep Learning
-------------
Asynchronous Advantage Actor-Critic (A3C)
-----------------------------------------

View file

@ -9,71 +9,177 @@ Ray
*Ray is a fast and simple framework for building and running distributed applications.* *Ray is a fast and simple framework for building and running distributed applications.*
Ray is easy to install: ``pip install ray``
Example Use
-----------
+------------------------------------------------+----------------------------------------------------+
| **Basic Python** | **Distributed with Ray** |
+------------------------------------------------+----------------------------------------------------+
|.. code-block:: python |.. code-block:: python |
| | |
| # Execute f serially. | # Execute f in parallel. |
| | |
| | @ray.remote |
| def f(): | def f(): |
| time.sleep(1) | time.sleep(1) |
| return 1 | return 1 |
| | |
| | |
| | ray.init() |
| results = [f() for i in range(4)] | results = ray.get([f.remote() for i in range(4)]) |
+------------------------------------------------+----------------------------------------------------+
To launch a Ray cluster, either privately, on AWS, or on GCP, `follow these instructions <autoscaling.html>`_.
View the `codebase on GitHub`_.
.. _`codebase on GitHub`: https://github.com/ray-project/ray
Ray comes with libraries that accelerate deep learning and reinforcement learning development: Ray comes with libraries that accelerate deep learning and reinforcement learning development:
- `Tune`_: Scalable Hyperparameter Search - `Tune`_: Scalable Hyperparameter Search
- `RLlib`_: Scalable Reinforcement Learning - `RLlib`_: Scalable Reinforcement Learning
- `Distributed Training <distributed_training.html>`__ - `Distributed Training <distributed_training.html>`__
Install Ray with: ``pip install ray``. For nightly wheels, see the `Installation page <installation.html>`__.
View the `codebase on GitHub`_.
.. _`codebase on GitHub`: https://github.com/ray-project/ray
Quick Start
-----------
.. code-block:: python
ray.init()
@ray.remote
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
To use Ray's actor model:
.. code-block:: python
ray.init()
@ray.remote
class Counter():
def __init__(self):
self.n = 0
def inc(self):
self.n += 1
def read(self):
return self.n
counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))
Ray programs can run on a single machine, and can also seamlessly scale to large clusters. To execute the above Ray script in the cloud, just download `this configuration file <https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/aws/example-full.yaml>`__, and run:
``ray submit [CLUSTER.YAML] example.py --start``
See more details in the `Cluster Launch page <autoscaling.html>`_.
Tune Quick Start
----------------
`Tune`_ is a scalable framework for hyperparameter search built on top of Ray with a focus on deep learning and deep reinforcement learning.
.. code-block:: python
import torch.optim as optim
from ray import tune
from ray.tune.examples.mnist_pytorch import get_data_loaders, Net, train, test
def train_mnist(config):
train_loader, test_loader = get_data_loaders()
model = Net(config)
optimizer = optim.SGD(model.parameters(), lr=config["lr"])
for i in range(10):
train(model, optimizer, train_loader)
acc = test(model, test_loader)
tune.track.log(mean_accuracy=acc)
analysis = tune.run(
train_mnist,
stop={"mean_accuracy": 0.98},
config={"lr": tune.grid_search([0.001, 0.01, 0.1])})
print("Best config: ", analysis.get_best_config())
.. _`Tune`: tune.html .. _`Tune`: tune.html
RLlib Quick Start
-----------------
`RLlib`_ is an open-source library for reinforcement learning built on top of Ray that offers both high scalability and a unified API for a variety of applications.
.. code-block:: bash
pip install tensorflow # or tensorflow-gpu
pip install ray[rllib] # also recommended: ray[debug]
.. code-block:: python
import gym
from gym.spaces import Discrete, Box
from ray import tune
class SimpleCorridor(gym.Env):
def __init__(self, config):
self.end_pos = config["corridor_length"]
self.cur_pos = 0
self.action_space = Discrete(2)
self.observation_space = Box(0.0, self.end_pos, shape=(1, ))
def reset(self):
self.cur_pos = 0
return [self.cur_pos]
def step(self, action):
if action == 0 and self.cur_pos > 0:
self.cur_pos -= 1
elif action == 1:
self.cur_pos += 1
done = self.cur_pos >= self.end_pos
return [self.cur_pos], 1 if done else 0, done, {}
tune.run(
"PPO",
config={
"env": SimpleCorridor,
"num_workers": 4,
"env_config": {"corridor_length": 5}})
.. _`RLlib`: rllib.html .. _`RLlib`: rllib.html
Contact
-------
The following are good places to discuss Ray.
1. `ray-dev@googlegroups.com`_: For discussions about development or any general
questions.
2. `StackOverflow`_: For questions about how to use Ray.
3. `GitHub Issues`_: For bug reports and feature requests.
.. _`ray-dev@googlegroups.com`: https://groups.google.com/forum/#!forum/ray-dev
.. _`GitHub Issues`: https://github.com/ray-project/ray/issues
.. _`StackOverflow`: https://stackoverflow.com/questions/tagged/ray
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
:caption: Installation :caption: Installation
installation.rst installation.rst
deploy-on-kubernetes.rst
install-on-docker.rst
installation-troubleshooting.rst
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
:caption: Getting Started :caption: Using Ray
tutorial.rst walkthrough.rst
api.rst
actors.rst actors.rst
using-ray-with-gpus.rst using-ray-with-gpus.rst
signals.rst user-profiling.rst
async_api.rst inspect.rst
configure.rst
advanced.rst
troubleshooting.rst
package-ref.rst
examples.rst
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
:caption: Cluster Usage :caption: Cluster Setup
autoscaling.rst autoscaling.rst
using-ray-on-a-cluster.rst using-ray-on-a-cluster.rst
deploy-on-kubernetes.rst
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
@ -107,45 +213,33 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
:caption: Other Libraries :caption: Experimental
distributed_training.rst distributed_training.rst
pandas_on_ray.rst pandas_on_ray.rst
signals.rst
async_api.rst
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
:caption: Examples :caption: Examples
example-rl-pong.rst example-rl-pong.rst
example-policy-gradient.rst
example-parameter-server.rst example-parameter-server.rst
example-newsreader.rst example-newsreader.rst
example-resnet.rst example-resnet.rst
example-a3c.rst example-a3c.rst
example-lbfgs.rst example-lbfgs.rst
example-evolution-strategies.rst
example-cython.rst
example-streaming.rst example-streaming.rst
using-ray-with-tensorflow.rst using-ray-with-tensorflow.rst
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
:caption: Design :caption: Development and Internals
internals-overview.rst install-source.rst
serialization.rst
fault-tolerance.rst
plasma-object-store.rst
resources.rst
tempfile.rst
.. toctree::
:maxdepth: 1
:caption: Help
troubleshooting.rst
user-profiling.rst
security.rst
development.rst development.rst
profiling.rst profiling.rst
contact.rst internals-overview.rst
fault-tolerance.rst
contrib.rst

87
doc/source/inspect.rst Normal file
View file

@ -0,0 +1,87 @@
How-To: Inspect Cluster State
=============================
Applications written on top of Ray will often want to have some information
or diagnostics about the cluster. Some common questions include:
1. How many nodes are in my autoscaling cluster?
2. What resources are currently available in my cluster, both used and total?
3. What are the objects currently in my cluster?
For this, you can use the global state API.
Context: Ray Processes
----------------------
For context, when using Ray, several processes are involved.
- Multiple **worker** processes execute tasks and store results in object
stores. Each worker is a separate process.
- One **object store** per node stores immutable objects in shared memory and
allows workers to efficiently share objects on the same node with minimal
copying and deserialization.
- One **raylet** per node assigns tasks to workers on the same node.
- A **driver** is the Python process that the user controls. For example, if the
user is running a script or using a Python shell, then the driver is the Python
process that runs the script or the shell. A driver is similar to a worker in
that it can submit tasks to its raylet and get objects from the object
store, but it is different in that the raylet will not assign tasks to
the driver to be executed.
- A **Redis server** maintains much of the system's state. For example, it keeps
track of which objects live on which machines and of the task specifications
(but not data). It can also be queried directly for debugging purposes.
Node Information
----------------
To get information about the current nodes in your cluster, you can use ``ray.nodes()``:
.. autofunction:: ray.nodes
:noindex:
.. code-block:: ipython3
>>> import ray
>>> ray.init()
>>> ray.nodes()
[{'ClientID': 'a9e430719685f3862ed7ba411259d4138f8afb1e',
'IsInsertion': True,
'NodeManagerAddress': '192.168.19.108',
'NodeManagerPort': 37428,
'ObjectManagerPort': 43415,
'ObjectStoreSocketName': '/tmp/ray/session_2019-07-28_17-03-53_955034_24883/sockets/plasma_store',
'RayletSocketName': '/tmp/ray/session_2019-07-28_17-03-53_955034_24883/sockets/raylet',
'Resources': {'CPU': 4.0},
'alive': True}]
The above information includes:
- `ClientID`: A unique identifier for the raylet.
- `alive`: Whether the node is still alive.
- `NodeManagerAddress`: PrivateIP of the node that the raylet is on.
- `Resources`: The total resource capacity on the node.
Resource Information
--------------------
To get information about the current total resource capacity of your cluster, you can use ``ray.cluster_resources()``.
.. autofunction:: ray.cluster_resources
:noindex:
To get information about the current available resource capacity of your cluster, you can use ``ray.available_resources()``.
.. autofunction:: ray.cluster_resources
:noindex:
Object Information
------------------
To get information about the current objects that have been placed in the Ray object store across the cluster, you can use ``ray.objects()``.
.. autofunction:: ray.objects
:noindex:

View file

@ -1,182 +0,0 @@
Installation on Docker
======================
You can install Ray from source on any platform that runs Docker. We do not presently
publish Docker images for Ray, but you can build them yourself using the Ray
distribution.
Using Docker can streamline the build process and provide a reliable way to get
up and running quickly.
Install Docker
--------------
Mac, Linux, Windows platforms
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The Docker Platform release is available for Mac, Windows, and Linux platforms.
Please download the appropriate version from the `Docker website`_ and follow
the corresponding installation instructions. Linux user may find these
`alternate instructions`_ helpful.
.. _`Docker website`: https://www.docker.com/products/overview#/install_the_platform
.. _`alternate instructions`: https://www.digitalocean.com/community/tutorials/how-to-install-and-use-docker-on-ubuntu-16-04
Docker installation on EC2 with Ubuntu
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. note:: The Ray `autoscaler <autoscaling.html#common-cluster-configurations>`_ can automatically install Docker on all of the nodes of your cluster.
The instructions below show in detail how to prepare an Amazon EC2 instance
running Ubuntu 16.04 for use with Docker.
Apply initialize the package repository and apply system updates:
.. code-block:: bash
sudo apt-get update
sudo apt-get -y dist-upgrade
Install Docker and start the service:
.. code-block:: bash
sudo apt-get install -y docker.io
sudo service docker start
Add the ``ubuntu`` user to the ``docker`` group to allow running Docker commands
without sudo:
.. code-block:: bash
sudo usermod -a -G docker ubuntu
Initiate a new login to gain group permissions (alternatively, log out and log
back in again):
.. code-block:: bash
exec sudo su -l ubuntu
Confirm that docker is running:
.. code-block:: bash
docker images
Should produce an empty table similar to the following:
.. code-block:: bash
REPOSITORY TAG IMAGE ID CREATED SIZE
Clone the Ray repository
------------------------
.. code-block:: bash
git clone https://github.com/ray-project/ray.git
Build Docker images
-------------------
Run the script to create Docker images.
.. code-block:: bash
cd ray
./build-docker.sh
This script creates several Docker images:
- The ``ray-project/deploy`` image is a self-contained copy of code and binaries
suitable for end users.
- The ``ray-project/examples`` adds additional libraries for running examples.
- The ``ray-project/base-deps`` image builds from Ubuntu Xenial and includes
Anaconda and other basic dependencies and can serve as a starting point for
developers.
Review images by listing them:
.. code-block:: bash
docker images
Output should look something like the following:
.. code-block:: bash
REPOSITORY TAG IMAGE ID CREATED SIZE
ray-project/examples latest 7584bde65894 4 days ago 3.257 GB
ray-project/deploy latest 970966166c71 4 days ago 2.899 GB
ray-project/base-deps latest f45d66963151 4 days ago 2.649 GB
ubuntu xenial f49eec89601e 3 weeks ago 129.5 MB
Launch Ray in Docker
--------------------
Start out by launching the deployment container.
.. code-block:: bash
docker run --shm-size=<shm-size> -t -i ray-project/deploy
Replace ``<shm-size>`` with a limit appropriate for your system, for example
``512M`` or ``2G``. The ``-t`` and ``-i`` options here are required to support
interactive use of the container.
**Note:** Ray requires a **large** amount of shared memory because each object
store keeps all of its objects in shared memory, so the amount of shared memory
will limit the size of the object store.
You should now see a prompt that looks something like:
.. code-block:: bash
root@ebc78f68d100:/ray#
Test if the installation succeeded
----------------------------------
To test if the installation was successful, try running some tests. Within the
container shell enter the following commands:
.. code-block:: bash
python -m pytest -v test/mini_test.py # This tests some basic functionality.
You are now ready to continue with the `tutorial`_.
.. _`tutorial`: http://ray.readthedocs.io/en/latest/tutorial.html
Running examples in Docker
--------------------------
Ray includes a Docker image that includes dependencies necessary for running
some of the examples. This can be an easy way to see Ray in action on a variety
of workloads.
Launch the examples container.
.. code-block:: bash
docker run --shm-size=1024m -t -i ray-project/examples
Batch L-BFGS
~~~~~~~~~~~~
.. code-block:: bash
python /ray/examples/lbfgs/driver.py
Learning to play Pong
~~~~~~~~~~~~~~~~~~~~~
.. code-block:: bash
python /ray/examples/rl_pong/driver.py

View file

@ -0,0 +1,179 @@
Installing Ray from Source
==========================
If you want to use the latest version of Ray, you can build it from source.
Below, we have instructions for building from source for both Linux and MacOS.
Dependencies
~~~~~~~~~~~~
To build Ray, first install the following dependencies. We recommend using
`Anaconda`_.
.. _`Anaconda`: https://www.continuum.io/downloads
For Ubuntu, run the following commands:
.. code-block:: bash
sudo apt-get update
sudo apt-get install -y build-essential curl unzip psmisc
# If you are not using Anaconda, you need the following.
sudo apt-get install python-dev # For Python 2.
sudo apt-get install python3-dev # For Python 3.
pip install cython==0.29.0
For MacOS, run the following commands:
.. code-block:: bash
brew update
brew install wget
pip install cython==0.29.0
If you are using Anaconda, you may also need to run the following.
.. code-block:: bash
conda install libgcc
Install Ray
~~~~~~~~~~~
Ray can be built from the repository as follows.
.. code-block:: bash
git clone https://github.com/ray-project/ray.git
# Install Bazel.
ray/ci/travis/install-bazel.sh
cd ray/python
pip install -e . --verbose # Add --user if you see a permission denied error.
Alternatively, Ray can be built from the repository without cloning using pip.
.. code-block:: bash
pip install git+https://github.com/ray-project/ray.git#subdirectory=python
Cleaning the source tree
~~~~~~~~~~~~~~~~~~~~~~~~
The source tree can be cleaned by running
.. code-block:: bash
git clean -f -f -x -d
in the ``ray/`` directory. Warning: this command will delete all untracked files
and directories and will reset the repository to its checked out state.
For a shallower working directory cleanup, you may want to try:
.. code-block:: bash
rm -rf ./build
under ``ray/``. Incremental builds should work as follows:
.. code-block:: bash
pushd ./build && make && popd
under ``ray/``.
Docker Source Images
--------------------
Run the script to create Docker images.
.. code-block:: bash
cd ray
./build-docker.sh
This script creates several Docker images:
- The ``ray-project/deploy`` image is a self-contained copy of code and binaries
suitable for end users.
- The ``ray-project/examples`` adds additional libraries for running examples.
- The ``ray-project/base-deps`` image builds from Ubuntu Xenial and includes
Anaconda and other basic dependencies and can serve as a starting point for
developers.
Review images by listing them:
.. code-block:: bash
docker images
Output should look something like the following:
.. code-block:: bash
REPOSITORY TAG IMAGE ID CREATED SIZE
ray-project/examples latest 7584bde65894 4 days ago 3.257 GB
ray-project/deploy latest 970966166c71 4 days ago 2.899 GB
ray-project/base-deps latest f45d66963151 4 days ago 2.649 GB
ubuntu xenial f49eec89601e 3 weeks ago 129.5 MB
Launch Ray in Docker
~~~~~~~~~~~~~~~~~~~~
Start out by launching the deployment container.
.. code-block:: bash
docker run --shm-size=<shm-size> -t -i ray-project/deploy
Replace ``<shm-size>`` with a limit appropriate for your system, for example
``512M`` or ``2G``. The ``-t`` and ``-i`` options here are required to support
interactive use of the container.
**Note:** Ray requires a **large** amount of shared memory because each object
store keeps all of its objects in shared memory, so the amount of shared memory
will limit the size of the object store.
You should now see a prompt that looks something like:
.. code-block:: bash
root@ebc78f68d100:/ray#
Test if the installation succeeded
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To test if the installation was successful, try running some tests. This assumes
that you've cloned the git repository.
.. code-block:: bash
python -m pytest -v python/ray/tests/test_mini.py
Troubleshooting installing Arrow
--------------------------------
Some candidate possibilities.
You have a different version of Flatbuffers installed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Arrow pulls and builds its own copy of Flatbuffers, but if you already have
Flatbuffers installed, Arrow may find the wrong version. If a directory like
``/usr/local/include/flatbuffers`` shows up in the output, this may be the
problem. To solve it, get rid of the old version of flatbuffers.
There is some problem with Boost
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If a message like ``Unable to find the requested Boost libraries`` appears when
installing Arrow, there may be a problem with Boost. This can happen if you
installed Boost using MacPorts. This is sometimes solved by using Brew instead.

View file

@ -1,40 +0,0 @@
Installation Troubleshooting
============================
Trouble installing Arrow
-------------------------
Some candidate possibilities.
You have a different version of Flatbuffers installed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Arrow pulls and builds its own copy of Flatbuffers, but if you already have
Flatbuffers installed, Arrow may find the wrong version. If a directory like
``/usr/local/include/flatbuffers`` shows up in the output, this may be the
problem. To solve it, get rid of the old version of flatbuffers.
There is some problem with Boost
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If a message like ``Unable to find the requested Boost libraries`` appears when
installing Arrow, there may be a problem with Boost. This can happen if you
installed Boost using MacPorts. This is sometimes solved by using Brew instead.
Trouble installing or running Ray
---------------------------------
One of the Ray libraries is compiled against the wrong version of Python
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If there is a segfault or a sigabort immediately upon importing Ray, one of the
components may have been compiled against the wrong Python libraries. Bazel
should normally find the right version of Python, but this process is not
completely reliable. In this case, check the Bazel output from installation and
make sure that the version of the Python libraries that were found match the
version of Python that you're using.
Note that it's common to have multiple versions of Python on your machine (for
example both Python 2 and Python 3). Ray will be compiled against whichever
version of Python is found when you run the ``python`` command from the
command line, so make sure this is the version you wish to use.

View file

@ -1,8 +1,8 @@
Installing Ray Installing Ray
============== ==============
Ray should work with Python 2 and Python 3. We have tested Ray on Ubuntu 14.04, Ubuntu 16.04, Ubuntu 18.04, Ray supports Python 2 and Python 3 as well as MacOS and Linux. Windows support
MacOS 10.11, 10.12, 10.13, and 10.14. is planned for the future.
Latest stable version Latest stable version
--------------------- ---------------------
@ -16,7 +16,8 @@ You can install the latest stable version of Ray as follows.
Trying snapshots from master Trying snapshots from master
---------------------------- ----------------------------
Here are links to the latest wheels (which are built off of master). To install these wheels, run the following command: Here are links to the latest wheels (which are built for each commit on the
master branch). To install these wheels, run the following command:
.. code-block:: bash .. code-block:: bash
@ -41,104 +42,3 @@ Here are links to the latest wheels (which are built off of master). To install
.. _`MacOS Python 3.6`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev2-cp36-cp36m-macosx_10_6_intel.whl .. _`MacOS Python 3.6`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev2-cp36-cp36m-macosx_10_6_intel.whl
.. _`MacOS Python 3.5`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev2-cp35-cp35m-macosx_10_6_intel.whl .. _`MacOS Python 3.5`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev2-cp35-cp35m-macosx_10_6_intel.whl
.. _`MacOS Python 2.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev2-cp27-cp27m-macosx_10_6_intel.whl .. _`MacOS Python 2.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev2-cp27-cp27m-macosx_10_6_intel.whl
Building Ray from source
------------------------
If you want to use the latest version of Ray, you can build it from source.
Below, we have instructions for building from source for both Linux and MacOS.
Dependencies
~~~~~~~~~~~~
To build Ray, first install the following dependencies. We recommend using
`Anaconda`_.
.. _`Anaconda`: https://www.continuum.io/downloads
For Ubuntu, run the following commands:
.. code-block:: bash
sudo apt-get update
sudo apt-get install -y build-essential curl unzip psmisc
# If you are not using Anaconda, you need the following.
sudo apt-get install python-dev # For Python 2.
sudo apt-get install python3-dev # For Python 3.
pip install cython==0.29.0
For MacOS, run the following commands:
.. code-block:: bash
brew update
brew install wget
pip install cython==0.29.0
If you are using Anaconda, you may also need to run the following.
.. code-block:: bash
conda install libgcc
Install Ray
~~~~~~~~~~~
Ray can be built from the repository as follows.
.. code-block:: bash
git clone https://github.com/ray-project/ray.git
# Install Bazel.
ray/ci/travis/install-bazel.sh
cd ray/python
pip install -e . --verbose # Add --user if you see a permission denied error.
Alternatively, Ray can be built from the repository without cloning using pip.
.. code-block:: bash
pip install git+https://github.com/ray-project/ray.git#subdirectory=python
Test if the installation succeeded
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To test if the installation was successful, try running some tests. This assumes
that you've cloned the git repository.
.. code-block:: bash
python -m pytest -v python/ray/tests/test_mini.py
Cleaning the source tree
~~~~~~~~~~~~~~~~~~~~~~~~
The source tree can be cleaned by running
.. code-block:: bash
git clean -f -f -x -d
in the ``ray/`` directory. Warning: this command will delete all untracked files
and directories and will reset the repository to its checked out state.
For a shallower working directory cleanup, you may want to try:
.. code-block:: bash
rm -rf ./build
under ``ray/``. Incremental builds should work as follows:
.. code-block:: bash
pushd ./build && make && popd
under ``ray/``.

View file

@ -1,8 +1,7 @@
An Overview of the Internals An Overview of the Internals
============================ ============================
In this document, we trace through in more detail what happens at the system In this document, we overview the internal architecture of Ray.
level when certain API calls are made.
Connecting to Ray Connecting to Ray
----------------- -----------------
@ -32,6 +31,28 @@ similarly the processes will continue running when the script exits. In this
case, all processes except workers that correspond to actors are shared between case, all processes except workers that correspond to actors are shared between
different driver processes. different driver processes.
Ray Processes
-------------
When using Ray, several processes are involved.
- Multiple **worker** processes execute tasks and store results in object
stores. Each worker is a separate process.
- One **object store** per node stores immutable objects in shared memory and
allows workers to efficiently share objects on the same node with minimal
copying and deserialization.
- One **raylet** per node assigns tasks to workers on the same node.
- A **driver** is the Python process that the user controls. For example, if the
user is running a script or using a Python shell, then the driver is the Python
process that runs the script or the shell. A driver is similar to a worker in
that it can submit tasks to its raylet and get objects from the object
store, but it is different in that the raylet will not assign tasks to
the driver to be executed.
- A **Redis server** maintains much of the system's state. For example, it keeps
track of which objects live on which machines and of the task specifications
(but not data). It can also be queried directly for debugging purposes.
Defining a remote function Defining a remote function
-------------------------- --------------------------
@ -54,12 +75,7 @@ Now, consider a remote function definition as below.
return x + 1 return x + 1
When the remote function is defined as above, the function is immediately When the remote function is defined as above, the function is immediately
pickled, assigned a unique ID, and stored in a Redis server. You can view the pickled, assigned a unique ID, and stored in a Redis server.
remote functions in the centralized control plane as below.
.. code-block:: python
TODO: Fill this in.
Each worker process has a separate thread running in the background that Each worker process has a separate thread running in the background that
listens for the addition of remote functions to the centralized control state. listens for the addition of remote functions to the centralized control state.
@ -95,9 +111,8 @@ When a driver or worker invokes a remote function, a number of things happen.
raylet. This is done by peer-to-peer connection between raylets. raylet. This is done by peer-to-peer connection between raylets.
The task table can be inspected as follows. The task table can be inspected as follows.
.. code-block:: python .. autofunction:: ray.tasks
:noindex:
TODO: Fill this in.
- Once a task has been scheduled to a raylet, the raylet queues - Once a task has been scheduled to a raylet, the raylet queues
the task for execution. A task is assigned to a worker when enough resources the task for execution. A task is assigned to a worker when enough resources
@ -109,11 +124,10 @@ When a driver or worker invokes a remote function, a number of things happen.
state, to reflect the fact that it contains the newly created objects. The state, to reflect the fact that it contains the newly created objects. The
object table can be viewed as follows. object table can be viewed as follows.
.. code-block:: python .. autofunction:: ray.objects
:noindex:
TODO: Fill this in. - When the task's return values are placed into the object store, they are first
When the task's return values are placed into the object store, they are first
serialized into a contiguous blob of bytes using the `Apache Arrow`_ data serialized into a contiguous blob of bytes using the `Apache Arrow`_ data
layout, which is helpful for efficiently sharing data between processes using layout, which is helpful for efficiently sharing data between processes using
shared memory. shared memory.
@ -129,8 +143,6 @@ Notes and limitations
reconstruction of the object. The raylet will attempt to reconstruct reconstruction of the object. The raylet will attempt to reconstruct
the object by replaying its task lineage. the object by replaying its task lineage.
TODO: Limitations on reconstruction.
Getting an object ID Getting an object ID
-------------------- --------------------

View file

@ -1,5 +1,5 @@
The Ray API Ray Package Reference
=========== =====================
.. autofunction:: ray.init .. autofunction:: ray.init
@ -30,21 +30,21 @@ The Ray API
Inspect the Cluster State Inspect the Cluster State
------------------------- -------------------------
.. autofunction:: ray.nodes() .. autofunction:: ray.nodes
.. autofunction:: ray.tasks() .. autofunction:: ray.tasks
.. autofunction:: ray.objects() .. autofunction:: ray.objects
.. autofunction:: ray.timeline() .. autofunction:: ray.timeline
.. autofunction:: ray.object_transfer_timeline() .. autofunction:: ray.object_transfer_timeline
.. autofunction:: ray.cluster_resources() .. autofunction:: ray.cluster_resources
.. autofunction:: ray.available_resources() .. autofunction:: ray.available_resources
.. autofunction:: ray.errors() .. autofunction:: ray.errors
The Ray Command Line API The Ray Command Line API

View file

@ -1,47 +0,0 @@
The Plasma Object Store
=======================
Plasma is a high-performance shared memory object store originally developed in
Ray and now being developed in `Apache Arrow`_. See the `relevant
documentation`_.
Using Plasma with Huge Pages
----------------------------
On Linux, it is possible to increase the write throughput of the Plasma object
store by using huge pages. You first need to create a file system and activate
huge pages as follows.
.. code-block:: shell
sudo mkdir -p /mnt/hugepages
gid=`id -g`
uid=`id -u`
sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages
sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group"
# This typically corresponds to 20000 2MB pages (about 40GB), but this
# depends on the platform.
sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages"
**Note:** Once you create the huge pages, they will take up memory which will
never be freed unless you remove the huge pages. If you run into memory issues,
that may be the issue.
You need root access to create the file system, but not for running the object
store.
You can then start Ray with huge pages on a single machine as follows.
.. code-block:: python
ray.init(huge_pages=True, plasma_directory="/mnt/hugepages")
In the cluster case, you can do it by passing ``--huge-pages`` and
``--plasma-directory=/mnt/hugepages`` into ``ray start`` on any machines where
huge pages should be enabled.
See the relevant `Arrow documentation for huge pages`_.
.. _`Apache Arrow`: https://arrow.apache.org/
.. _`relevant documentation`: https://arrow.apache.org/docs/python/plasma.html#the-plasma-in-memory-object-store
.. _`Arrow documentation for huge pages`: https://arrow.apache.org/docs/python/plasma.html#using-plasma-with-huge-pages

View file

@ -1,121 +0,0 @@
Resources (CPUs, GPUs)
======================
This document describes how resources are managed in Ray. Each node in a Ray
cluster knows its own resource capacities, and each task specifies its resource
requirements.
CPUs and GPUs
-------------
The Ray backend includes built-in support for CPUs and GPUs.
Specifying a node's resource requirements
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To specify a node's resource requirements from the command line, pass the
``--num-cpus`` and ``--num-cpus`` flags into ``ray start``.
.. code-block:: bash
# To start a head node.
ray start --head --num-cpus=8 --num-gpus=1
# To start a non-head node.
ray start --redis-address=<redis-address> --num-cpus=4 --num-gpus=2
To specify a node's resource requirements when the Ray processes are all started
through ``ray.init``, do the following.
.. code-block:: python
ray.init(num_cpus=8, num_gpus=1)
If the number of CPUs is unspecified, Ray will automatically determine the
number by running ``multiprocessing.cpu_count()``. If the number of GPUs is
unspecified, Ray will attempt to automatically detect the number of GPUs.
Specifying a task's CPU and GPU requirements
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To specify a task's CPU and GPU requirements, pass the ``num_cpus`` and
``num_gpus`` arguments into the remote decorator. Note that Ray supports
**fractional** resource requirements.
.. code-block:: python
@ray.remote(num_cpus=4, num_gpus=2)
def f():
return 1
@ray.remote(num_gpus=0.5)
def h():
return 1
The ``f`` tasks will be scheduled on machines that have at least 4 CPUs and 2
GPUs, and when one of the ``f`` tasks executes, 4 CPUs and 2 GPUs will be
reserved for that task. The IDs of the GPUs that are reserved for the task can
be accessed with ``ray.get_gpu_ids()``. Ray will automatically set the
environment variable ``CUDA_VISIBLE_DEVICES`` for that process. These resources
will be released when the task finishes executing.
However, if the task gets blocked in a call to ``ray.get``. For example,
consider the following remote function.
.. code-block:: python
@ray.remote(num_cpus=1, num_gpus=1)
def g():
return ray.get(f.remote())
When a ``g`` task is executing, it will release its CPU resources when it gets
blocked in the call to ``ray.get``. It will reacquire the CPU resources when
``ray.get`` returns. It will retain its GPU resources throughout the lifetime of
the task because the task will most likely continue to use GPU memory.
To specify that an **actor** requires GPUs, do the following.
.. code-block:: python
@ray.remote(num_gpus=1)
class Actor(object):
pass
When an ``Actor`` instance is created, it will be placed on a node that has at
least 1 GPU, and the GPU will be reserved for the actor for the duration of the
actor's lifetime (even if the actor is not executing tasks). The GPU resources
will be released when the actor terminates. Note that currently **only GPU
resources are used for actor placement**.
Custom Resources
----------------
While Ray has built-in support for CPUs and GPUs, nodes can be started with
arbitrary custom resources. **All custom resources behave like GPUs.**
A node can be started with some custom resources as follows.
.. code-block:: bash
ray start --head --resources='{"Resource1": 4, "Resource2": 16}'
It can be done through ``ray.init`` as follows.
.. code-block:: python
ray.init(resources={'Resource1': 4, 'Resource2': 16})
To require custom resources in a task, specify the requirements in the remote
decorator.
.. code-block:: python
@ray.remote(resources={'Resource2': 1})
def f():
return 1
Fractional Resources
--------------------
Task and actor resource requirements can be fractional. This is particularly
useful if you want multiple tasks or actors to share a single GPU.

View file

@ -1,55 +0,0 @@
Security
========
This document describes best security practices for using Ray.
Intended Use and Threat Model
-----------------------------
Ray instances should run on a secure network without public facing ports.
The most common threat for Ray instances is unauthorized access to Redis,
which can be exploited to gain shell access and run arbitray code.
The best fix is to run Ray instances on a secure, trusted network.
Running Ray on a secured network is not always feasible, so Ray
provides some basic security features:
Redis Port Authentication
-------------------------
To prevent exploits via unauthorized Redis access, Ray provides the option to
password-protect Redis ports. While this is not a replacement for running Ray
behind a firewall, this feature is useful for instances exposed to the internet
where configuring a firewall is not possible. Because Redis is
very fast at serving queries, the chosen password should be long.
Redis authentication is only supported on the raylet code path.
To add authentication via the Python API, start Ray using:
.. code-block:: python
ray.init(redis_password="password")
To add authentication via the CLI, or connect to an existing Ray instance with
password-protected Redis ports:
.. code-block:: bash
ray start [--head] --redis-password="password"
While Redis port authentication may protect against external attackers,
Ray does not encrypt traffic between nodes so man-in-the-middle attacks are
possible for clusters on untrusted networks.
Cloud Security
--------------
Launching Ray clusters on AWS or GCP using the ``ray up`` command
automatically configures security groups that prevent external Redis access.
References
----------
- The `Redis security documentation <https://redis.io/topics/security>`

View file

@ -1,131 +0,0 @@
Serialization in the Object Store
=================================
This document describes what Python objects Ray can and cannot serialize into
the object store. Once an object is placed in the object store, it is immutable.
There are a number of situations in which Ray will place objects in the object
store.
1. The return values of a remote function.
2. The value ``x`` in a call to ``ray.put(x)``.
3. Arguments to remote functions (except for simple arguments like ints or
floats).
A Python object may have an arbitrary number of pointers with arbitrarily deep
nesting. To place an object in the object store or send it between processes,
it must first be converted to a contiguous string of bytes. This process is
known as serialization. The process of converting the string of bytes back into a
Python object is known as deserialization. Serialization and deserialization
are often bottlenecks in distributed computing.
Pickle is one example of a library for serialization and deserialization in
Python.
.. code-block::python
import pickle
pickle.dumps([1, 2, 3]) # prints b'\x80\x03]q\x00(K\x01K\x02K\x03e.'
pickle.loads(b'\x80\x03]q\x00(K\x01K\x02K\x03e.') # prints [1, 2, 3]
Pickle (and the variant we use, cloudpickle) is general-purpose. It can
serialize a large variety of Python objects. However, for numerical workloads,
pickling and unpickling can be inefficient. For example, if multiple processes
want to access a Python list of numpy arrays, each process must unpickle the
list and create its own new copies of the arrays. This can lead to high memory
overheads, even when all processes are read-only and could easily share memory.
In Ray, we optimize for numpy arrays by using the `Apache Arrow`_ data format.
When we deserialize a list of numpy arrays from the object store, we still
create a Python list of numpy array objects. However, rather than copy each
numpy array, each numpy array object holds a pointer to the relevant array held
in shared memory. There are some advantages to this form of serialization.
- Deserialization can be very fast.
- Memory is shared between processes so worker processes can all read the same
data without having to copy it.
.. _`Apache Arrow`: https://arrow.apache.org/
What Objects Does Ray Handle
----------------------------
Ray does not currently support serialization of arbitrary Python objects. The
set of Python objects that Ray can serialize using Arrow includes the following.
1. Primitive types: ints, floats, longs, bools, strings, unicode, and numpy
arrays.
2. Any list, dictionary, or tuple whose elements can be serialized by Ray.
For a more general object, Ray will first attempt to serialize the object by
unpacking the object as a dictionary of its fields. This behavior is not
correct in all cases. If Ray cannot serialize the object as a dictionary of its
fields, Ray will fall back to using pickle. However, using pickle will likely
be inefficient.
Notes and limitations
---------------------
- We currently handle certain patterns incorrectly, according to Python
semantics. For example, a list that contains two copies of the same list will
be serialized as if the two lists were distinct.
.. code-block:: python
l1 = [0]
l2 = [l1, l1]
l3 = ray.get(ray.put(l2))
l2[0] is l2[1] # True.
l3[0] is l3[1] # False.
- For reasons similar to the above example, we also do not currently handle
objects that recursively contain themselves (this may be common in graph-like
data structures).
.. code-block:: python
l = []
l.append(l)
# Try to put this list that recursively contains itself in the object store.
ray.put(l)
This will throw an exception with a message like the following.
.. code-block:: bash
This object exceeds the maximum recursion depth. It may contain itself recursively.
- Whenever possible, use numpy arrays for maximum performance.
Last Resort Workaround
----------------------
If you find cases where Ray serialization doesn't work or does something
unexpected, please `let us know`_ so we can fix it. In the meantime, you may
have to resort to writing custom serialization and deserialization code (e.g.,
calling pickle by hand).
.. _`let us know`: https://github.com/ray-project/ray/issues
.. code-block:: python
import pickle
@ray.remote
def f(complicated_object):
# Deserialize the object manually.
obj = pickle.loads(complicated_object)
return "Successfully passed {} into f.".format(obj)
# Define a complicated object.
l = []
l.append(l)
# Manually serialize the object and pass it in as a string.
ray.get(f.remote(pickle.dumps(l))) # prints 'Successfully passed [[...]] into f.'
**Note:** If you have trouble with pickle, you may have better luck with
cloudpickle.

View file

@ -1,89 +0,0 @@
Temporary Files
===============
Ray will produce some temporary files during running.
They are useful for logging, debugging & sharing object store with other programs.
Ray session
-----------
First we introduce the concept of a Ray session.
A Ray session represents all tasks, processes, and resources managed by Ray. A
session is created by executing the ``ray start`` command or by calling
``ray.init()``, and it is terminated by executing ``ray stop`` or calling
``ray.shutdown()``.
Each Ray session will have a unique name. By default, the name is
``session_{timestamp}_{pid}``. The format of ``timestamp`` is
``%Y-%m-%d_%H-%M-%S_%f`` (See `Python time format <strftime.org>`__ for details);
the pid belongs to the startup process (the process calling ``ray.init()`` or
the Ray process executed by a shell in ``ray start``).
Location of Temporary Files
---------------------------
For each session, Ray will place all its temporary files under the
*session directory*. A *session directory* is a subdirectory of the
*root temporary path* (``/tmp/ray`` by default),
so the default session directory is ``/tmp/ray/{ray_session_name}``.
You can sort by their names to find the latest session.
You are allowed to change the *root temporary directory* in one of these ways:
* Pass ``--temp-dir={your temp path}`` to ``ray start``
* Specify ``temp_dir`` when call ``ray.init()``
You can also use ``default_worker.py --temp-dir={your temp path}`` to
start a new worker with the given *root temporary directory*.
Layout of Temporary Files
-------------------------
A typical layout of temporary files could look like this:
.. code-block:: text
/tmp
└── ray
└── session_{datetime}_{pid}
├── logs # for logging
│   ├── log_monitor.err
│   ├── log_monitor.out
│   ├── monitor.err
│   ├── monitor.out
│   ├── plasma_store.err # outputs of the plasma store
│   ├── plasma_store.out
│   ├── raylet.err # outputs of the raylet process
│   ├── raylet.out
│   ├── redis-shard_0.err # outputs of redis shards
│   ├── redis-shard_0.out
│   ├── redis.err # redis
│   ├── redis.out
│   ├── webui.err # ipython notebook web ui
│   ├── webui.out
│   ├── worker-{worker_id}.err # redirected output of workers
│   ├── worker-{worker_id}.out
│   └── {other workers}
└── sockets # for sockets
├── plasma_store
└── raylet # this could be deleted by Ray's shutdown cleanup.
Plasma Object Store Socket
--------------------------
Plasma object store sockets can be used to share objects with other programs using Apache Arrow.
You are allowed to specify the plasma object store socket in one of these ways:
* Pass ``--plasma-store-socket-name={your socket path}`` to ``ray start``
* Specify ``plasma_store_socket_name`` when call ``ray.init()``
The path you specified will be given as it is without being affected any other paths.
Notes
-----
Temporary file policies are defined in ``python/ray/node.py``.

View file

@ -1,5 +1,5 @@
Troubleshooting Troubleshooting and FAQs
=============== ========================
This document discusses some common problems that people run into when using Ray This document discusses some common problems that people run into when using Ray
as well as some known problems. If you encounter other problems, please as well as some known problems. If you encounter other problems, please

View file

@ -1,279 +0,0 @@
Tutorial
========
To use Ray, you need to understand the following:
- How Ray executes tasks asynchronously to achieve parallelism.
- How Ray uses object IDs to represent immutable remote objects.
Overview
--------
Ray is a fast and simple framework for building and running distributed applications.
The same code can be run on a single machine to achieve efficient multiprocessing,
and it can be used on a cluster for large computations.
When using Ray, several processes are involved.
- Multiple **worker** processes execute tasks and store results in object
stores. Each worker is a separate process.
- One **object store** per node stores immutable objects in shared memory and
allows workers to efficiently share objects on the same node with minimal
copying and deserialization.
- One **raylet** per node assigns tasks to workers on the same node.
- A **driver** is the Python process that the user controls. For example, if the
user is running a script or using a Python shell, then the driver is the Python
process that runs the script or the shell. A driver is similar to a worker in
that it can submit tasks to its raylet and get objects from the object
store, but it is different in that the raylet will not assign tasks to
the driver to be executed.
- A **Redis server** maintains much of the system's state. For example, it keeps
track of which objects live on which machines and of the task specifications
(but not data). It can also be queried directly for debugging purposes.
Starting Ray
------------
To start Ray, start Python and run the following commands.
.. code-block:: python
import ray
ray.init()
This starts Ray.
Immutable remote objects
------------------------
In Ray, we can create and compute on objects. We refer to these objects as
**remote objects**, and we use **object IDs** to refer to them. Remote objects
are stored in **object stores**, and there is one object store per node in the
cluster. In the cluster setting, we may not actually know which machine each
object lives on.
An **object ID** is essentially a unique ID that can be used to refer to a
remote object. If you're familiar with Futures, our object IDs are conceptually
similar.
We assume that remote objects are immutable. That is, their values cannot be
changed after creation. This allows remote objects to be replicated in multiple
object stores without needing to synchronize the copies.
Put and Get
~~~~~~~~~~~
The commands ``ray.get`` and ``ray.put`` can be used to convert between Python
objects and object IDs, as shown in the example below.
.. code-block:: python
x = "example"
ray.put(x) # ObjectID(b49a32d72057bdcfc4dda35584b3d838aad89f5d)
The command ``ray.put(x)`` would be run by a worker process or by the driver
process (the driver process is the one running your script). It takes a Python
object and copies it to the local object store (here *local* means *on the same
node*). Once the object has been stored in the object store, its value cannot be
changed.
In addition, ``ray.put(x)`` returns an object ID, which is essentially an ID that
can be used to refer to the newly created remote object. If we save the object
ID in a variable with ``x_id = ray.put(x)``, then we can pass ``x_id`` into remote
functions, and those remote functions will operate on the corresponding remote
object.
The command ``ray.get(x_id)`` takes an object ID and creates a Python object from
the corresponding remote object. For some objects like arrays, we can use shared
memory and avoid copying the object. For other objects, this copies the object
from the object store to the worker process's heap. If the remote object
corresponding to the object ID ``x_id`` does not live on the same node as the
worker that calls ``ray.get(x_id)``, then the remote object will first be
transferred from an object store that has it to the object store that needs it.
.. code-block:: python
x_id = ray.put("example")
ray.get(x_id) # "example"
If the remote object corresponding to the object ID ``x_id`` has not been created
yet, the command ``ray.get(x_id)`` will wait until the remote object has been
created.
A very common use case of ``ray.get`` is to get a list of object IDs. In this
case, you can call ``ray.get(object_ids)`` where ``object_ids`` is a list of object
IDs.
.. code-block:: python
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Asynchronous Computation in Ray
-------------------------------
Ray enables arbitrary Python functions to be executed asynchronously. This is
done by designating a Python function as a **remote function**.
For example, a normal Python function looks like this.
.. code-block:: python
def add1(a, b):
return a + b
A remote function looks like this.
.. code-block:: python
@ray.remote
def add2(a, b):
return a + b
Remote functions
~~~~~~~~~~~~~~~~
Whereas calling ``add1(1, 2)`` returns ``3`` and causes the Python interpreter to
block until the computation has finished, calling ``add2.remote(1, 2)``
immediately returns an object ID and creates a **task**. The task will be
scheduled by the system and executed asynchronously (potentially on a different
machine). When the task finishes executing, its return value will be stored in
the object store.
.. code-block:: python
x_id = add2.remote(1, 2)
ray.get(x_id) # 3
The following simple example demonstrates how asynchronous tasks can be used
to parallelize computation.
.. code-block:: python
import time
def f1():
time.sleep(1)
@ray.remote
def f2():
time.sleep(1)
# The following takes ten seconds.
[f1() for _ in range(10)]
# The following takes one second (assuming the system has at least ten CPUs).
ray.get([f2.remote() for _ in range(10)])
There is a sharp distinction between *submitting a task* and *executing the
task*. When a remote function is called, the task of executing that function is
submitted to a raylet, and object IDs for the outputs of the task are
immediately returned. However, the task will not be executed until the system
actually schedules the task on a worker. Task execution is **not** done lazily.
The system moves the input data to the task, and the task will execute as soon
as its input dependencies are available and there are enough resources for the
computation.
**When a task is submitted, each argument may be passed in by value or by object
ID.** For example, these lines have the same behavior.
.. code-block:: python
add2.remote(1, 2)
add2.remote(1, ray.put(2))
add2.remote(ray.put(1), ray.put(2))
Remote functions never return actual values, they always return object IDs.
When the remote function is actually executed, it operates on Python objects.
That is, if the remote function was called with any object IDs, the system will
retrieve the corresponding objects from the object store.
Note that a remote function can return multiple object IDs.
.. code-block:: python
@ray.remote(num_return_vals=3)
def return_multiple():
return 1, 2, 3
a_id, b_id, c_id = return_multiple.remote()
Expressing dependencies between tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Programmers can express dependencies between tasks by passing the object ID
output of one task as an argument to another task. For example, we can launch
three tasks as follows, each of which depends on the previous task.
.. code-block:: python
@ray.remote
def f(x):
return x + 1
x = f.remote(0)
y = f.remote(x)
z = f.remote(y)
ray.get(z) # 3
The second task above will not execute until the first has finished, and the
third will not execute until the second has finished. In this example, there are
no opportunities for parallelism.
The ability to compose tasks makes it easy to express interesting dependencies.
Consider the following implementation of a tree reduce.
.. code-block:: python
import numpy as np
@ray.remote
def generate_data():
return np.random.normal(size=1000)
@ray.remote
def aggregate_data(x, y):
return x + y
# Generate some random data. This launches 100 tasks that will be scheduled on
# various nodes. The resulting data will be distributed around the cluster.
data = [generate_data.remote() for _ in range(100)]
# Perform a tree reduce.
while len(data) > 1:
data.append(aggregate_data.remote(data.pop(0), data.pop(0)))
# Fetch the result.
ray.get(data)
Remote Functions Within Remote Functions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
So far, we have been calling remote functions only from the driver. But worker
processes can also call remote functions. To illustrate this, consider the
following example.
.. code-block:: python
@ray.remote
def sub_experiment(i, j):
# Run the jth sub-experiment for the ith experiment.
return i + j
@ray.remote
def run_experiment(i):
sub_results = []
# Launch tasks to perform 10 sub-experiments in parallel.
for j in range(10):
sub_results.append(sub_experiment.remote(i, j))
# Return the sum of the results of the sub-experiments.
return sum(ray.get(sub_results))
results = [run_experiment.remote(i) for i in range(5)]
ray.get(results) # [45, 55, 65, 75, 85]
When the remote function ``run_experiment`` is executed on a worker, it calls the
remote function ``sub_experiment`` a number of times. This is an example of how
multiple experiments, each of which takes advantage of parallelism internally,
can all be run in parallel.

View file

@ -1,20 +1,16 @@
Profiling for Ray Users How-to: Profile Ray Programs
======================= ============================
This document is intended for users of Ray who want to know how to evaluate
the performance of their code while running on Ray. Profiling the
performance of your code can be very helpful to determine performance
bottlenecks or to find out where your code may not be parallelized properly.
If you are interested in pinpointing why your Ray application may not be
achieving the expected speedup, read on!
Profiling the performance of your code can be very helpful to determine
performance bottlenecks or to find out where your code may not be parallelized
properly.
Visualizing Tasks in the Ray Timeline Visualizing Tasks in the Ray Timeline
------------------------------------- -------------------------------------
The most important tool is the timeline visualization tool. To visualize tasks The most important tool is the timeline visualization tool. To visualize tasks
in the Ray timeline, you can dump the timeline as a JSON file using the in the Ray timeline, you can dump the timeline as a JSON file by running ``ray
following command. timeline`` from the command line or by using the following command.
.. code-block:: python .. code-block:: python

View file

@ -41,9 +41,7 @@ should look something like ``123.45.67.89:6379``).
ray start --redis-address=<redis-address> ray start --redis-address=<redis-address>
If you wish to specify that a machine has 10 CPUs and 1 GPU, you can do this If you wish to specify that a machine has 10 CPUs and 1 GPU, you can do this
with the flags ``--num-cpus=10`` and ``--num-gpus=1``. If these flags are not with the flags ``--num-cpus=10`` and ``--num-gpus=1``. See the `Configuration <configure.html>`__ page for more information.
used, then Ray will detect the number of CPUs automatically and will assume
there are 0 GPUs.
Now we've started all of the Ray processes on each node Ray. This includes Now we've started all of the Ray processes on each node Ray. This includes

View file

@ -1,5 +1,5 @@
Using Ray with GPUs How-to: Using Ray with GPUs
=================== ===========================
GPUs are critical for many machine learning applications. Ray enables remote GPUs are critical for many machine learning applications. Ray enables remote
functions and actors to specify their GPU requirements in the ``ray.remote`` functions and actors to specify their GPU requirements in the ``ray.remote``
@ -16,8 +16,7 @@ the number of GPUs as follows.
ray.init(num_gpus=4) ray.init(num_gpus=4)
If you don't pass in the ``num_gpus`` argument, Ray will assume that there are 0 If you don't pass in the ``num_gpus`` argument, Ray will automatically detect the number of GPUs available.
GPUs on the machine.
If you are starting Ray with the ``ray start`` command, you can indicate the If you are starting Ray with the ``ray start`` command, you can indicate the
number of GPUs on the machine with the ``--num-gpus`` argument. number of GPUs on the machine with the ``--num-gpus`` argument.
@ -40,14 +39,19 @@ remote decorator.
.. code-block:: python .. code-block:: python
import os
@ray.remote(num_gpus=1) @ray.remote(num_gpus=1)
def gpu_method(): def use_gpu():
return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids()) print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
Inside of the remote function, a call to ``ray.get_gpu_ids()`` will return a Inside of the remote function, a call to ``ray.get_gpu_ids()`` will return a
list of integers indicating which GPUs the remote function is allowed to use. list of integers indicating which GPUs the remote function is allowed to use.
Typically, it is not necessary to call ``ray.get_gpu_ids()`` because Ray will
automatically set the ``CUDA_VISIBLE_DEVICES`` environment variable.
**Note:** The function ``gpu_method`` defined above doesn't actually use any **Note:** The function ``use_gpu`` defined above doesn't actually use any
GPUs. Ray will schedule it on a machine which has at least one GPU, and will GPUs. Ray will schedule it on a machine which has at least one GPU, and will
reserve one GPU for it while it is being executed, however it is up to the reserve one GPU for it while it is being executed, however it is up to the
function to actually make use of the GPU. This is typically done through an function to actually make use of the GPU. This is typically done through an
@ -57,22 +61,44 @@ TensorFlow.
.. code-block:: python .. code-block:: python
import os
import tensorflow as tf import tensorflow as tf
@ray.remote(num_gpus=1) @ray.remote(num_gpus=1)
def gpu_method(): def use_gpu():
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, ray.get_gpu_ids()))
# Create a TensorFlow session. TensorFlow will restrict itself to use the # Create a TensorFlow session. TensorFlow will restrict itself to use the
# GPUs specified by the CUDA_VISIBLE_DEVICES environment variable. # GPUs specified by the CUDA_VISIBLE_DEVICES environment variable.
tf.Session() tf.Session()
**Note:** It is certainly possible for the person implementing ``gpu_method`` to **Note:** It is certainly possible for the person implementing ``use_gpu`` to
ignore ``ray.get_gpu_ids`` and to use all of the GPUs on the machine. Ray does ignore ``ray.get_gpu_ids`` and to use all of the GPUs on the machine. Ray does
not prevent this from happening, and this can lead to too many workers using the not prevent this from happening, and this can lead to too many workers using the
same GPU at the same time. For example, if the ``CUDA_VISIBLE_DEVICES`` same GPU at the same time. However, Ray does automatically set the
environment variable is not set, then TensorFlow will attempt to use all of the ``CUDA_VISIBLE_DEVICES`` environment variable, which will restrict the GPUs used
GPUs on the machine. by most deep learning frameworks.
Fractional GPUs
---------------
If you want two tasks to share the same GPU, then the tasks can each request
half (or some other fraction) of a GPU.
.. code-block:: python
import ray
import time
ray.init(num_cpus=4, num_gpus=1)
@ray.remote(num_gpus=0.25)
def f():
time.sleep(1)
# The four tasks created here can execute concurrently.
ray.get([f.remote() for _ in range(4)])
It is the developer's responsibility to make sure that the individual tasks
don't use more than their share of the GPU memory. TensorFlow can be configured
to limit its memory usage.
Using Actors with GPUs Using Actors with GPUs
---------------------- ----------------------
@ -88,12 +114,8 @@ instance requires in the ``ray.remote`` decorator.
return "This actor is allowed to use GPUs {}.".format(ray.get_gpu_ids()) return "This actor is allowed to use GPUs {}.".format(ray.get_gpu_ids())
When the actor is created, GPUs will be reserved for that actor for the lifetime When the actor is created, GPUs will be reserved for that actor for the lifetime
of the actor. of the actor. If sufficient GPU resources are not available, then the actor will
not be created.
Note that Ray must have been started with at least as many GPUs as the number of
GPUs you pass into the ``ray.remote`` decorator. Otherwise, if you pass in a
number greater than what was passed into ``ray.init``, an exception will be
thrown when instantiating the actor.
The following is an example of how to use GPUs in an actor through TensorFlow. The following is an example of how to use GPUs in an actor through TensorFlow.
@ -102,17 +124,27 @@ The following is an example of how to use GPUs in an actor through TensorFlow.
@ray.remote(num_gpus=1) @ray.remote(num_gpus=1)
class GPUActor(object): class GPUActor(object):
def __init__(self): def __init__(self):
self.gpu_ids = ray.get_gpu_ids()
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, self.gpu_ids))
# The call to tf.Session() will restrict TensorFlow to use the GPUs # The call to tf.Session() will restrict TensorFlow to use the GPUs
# specified in the CUDA_VISIBLE_DEVICES environment variable. # specified in the CUDA_VISIBLE_DEVICES environment variable.
self.sess = tf.Session() self.sess = tf.Session()
Troubleshooting Workers not Releasing GPU Resources
--------------- -----------------------------------
**Note:** Currently, when a worker executes a task that uses a GPU, the task may **Note:** Currently, when a worker executes a task that uses a GPU (e.g.,
allocate memory on the GPU and may not release it when the task finishes through TensorFlow), the task may allocate memory on the GPU and may not release
executing. This can lead to problems. See `this issue`_. it when the task finishes executing. This can lead to problems the next time a
task tries to use the same GPU. You can address this by setting ``max_calls=1``
in the remote decorator so that the worker automatically exits after executing
the task (thereby releasing the GPU resources).
.. _`this issue`: https://github.com/ray-project/ray/issues/616 .. code-block:: python
import tensorflow as tf
@ray.remote(num_gpus=1, max_calls=1)
def leak_gpus():
# This task will allocate memory on the GPU and then never release it, so
# we include the max_calls argument to kill the worker and release the
# resources.
sess = tf.Session()

View file

@ -4,26 +4,23 @@ Using Ray with TensorFlow
This document describes best practices for using Ray with TensorFlow. This document describes best practices for using Ray with TensorFlow.
To see more involved examples using TensorFlow, take a look at To see more involved examples using TensorFlow, take a look at
`A3C`_, `ResNet`_, `Policy Gradients`_, and `LBFGS`_. `A3C`_, `ResNet`_, and `LBFGS`_.
.. _`A3C`: http://ray.readthedocs.io/en/latest/example-a3c.html .. _`A3C`: http://ray.readthedocs.io/en/latest/example-a3c.html
.. _`ResNet`: http://ray.readthedocs.io/en/latest/example-resnet.html .. _`ResNet`: http://ray.readthedocs.io/en/latest/example-resnet.html
.. _`Policy Gradients`: http://ray.readthedocs.io/en/latest/example-policy-gradient.html
.. _`LBFGS`: http://ray.readthedocs.io/en/latest/example-lbfgs.html .. _`LBFGS`: http://ray.readthedocs.io/en/latest/example-lbfgs.html
If you are training a deep network in the distributed setting, you may need to If you are training a deep network in the distributed setting, you may need to
ship your deep network between processes (or machines). For example, you may ship your deep network between processes (or machines). However, shipping the model is not always straightforward.
update your model on one machine and then use that model to compute a gradient
on another machine. However, shipping the model is not always straightforward.
For example, a straightforward attempt to pickle a TensorFlow graph gives mixed A straightforward attempt to pickle a TensorFlow graph gives mixed
results. Some examples fail, and some succeed (but produce very large strings). results. Some examples fail, and some succeed (but produce very large strings).
The results are similar with other pickling libraries as well. The results are similar with other pickling libraries as well.
Furthermore, creating a TensorFlow graph can take tens of seconds, and so Furthermore, creating a TensorFlow graph can take tens of seconds, and so
serializing a graph and recreating it in another process will be inefficient. serializing a graph and recreating it in another process will be inefficient.
The better solution is to create the same TensorFlow graph on each worker once The better solution is to replicate the same TensorFlow graph on each worker once
at the beginning and then to ship only the weights between the workers. at the beginning and then to ship only the weights between the workers.
Suppose we have a simple network definition (this one is modified from the Suppose we have a simple network definition (this one is modified from the

311
doc/source/walkthrough.rst Normal file
View file

@ -0,0 +1,311 @@
Walkthrough
===========
This walkthrough will overview the core concepts of Ray:
1. Using remote functions (tasks) [``ray.remote``]
2. Fetching results (object IDs) [``ray.put``, ``ray.get``, ``ray.wait``]
3. Using remote classes (actors) [``ray.remote``]
.. list-table:: Title
:widths: 25 25 50
:header-rows: 1
* - Heading row 1, column 1
- Heading row 1, column 2
- Heading row 1, column 3
* - Row 1, column 1
-
- Row 1, column 3
* - Row 2, column 1
- Row 2, column 2
- Row 2, column 3
With Ray, your code will work on a single machine and can be easily scaled to a
large cluster. To run this walkthrough, install Ray with ``pip install -U ray``.
.. code-block:: python
import ray
# Start Ray. If you're connecting to an existing cluster, you would use
# ray.init(redis_address=<cluster-redis-address>) instead.
ray.init()
See the `Configuration <configure.html>`__ documentation for the various ways to
configure Ray. To start a multi-node Ray cluster, see the `cluster setup page
<using-ray-on-a-cluster.html>`__. You can stop ray by calling
``ray.shutdown()``. To check if Ray is initialized, you can call
``ray.is_initialized()``.
Remote functions (Tasks)
------------------------
Ray enables arbitrary Python functions to be executed asynchronously. These asynchronous Ray functions are called "remote functions". The standard way to turn a Python function into a remote function is to add the ``@ray.remote`` decorator. Here is an example.
.. code:: python
# A regular Python function.
def regular_function():
return 1
# A Ray remote function.
@ray.remote
def remote_function():
return 1
This causes a few things changes in behavior:
1. **Invocation:** The regular version is called with ``regular_function()``, whereas the remote version is called with ``remote_function.remote()``.
2. **Return values:** ``regular_function`` immediately executes and returns ``1``, whereas ``remote_function`` immediately returns an object ID (a future) and then creates a task that will be executed on a worker process. The result can be retrieved with ``ray.get``.
.. code:: python
>>> regular_function()
1
>>> remote_function.remote()
ObjectID(1c80d6937802cd7786ad25e50caf2f023c95e350)
>>> ray.get(remote_function.remote())
1
3. **Parallelism:** Invocations of ``regular_function`` happen
**serially**, for example
.. code:: python
# These happen serially.
for _ in range(4):
regular_function()
whereas invocations of ``remote_function`` happen in **parallel**,
for example
.. code:: python
# These happen in parallel.
for _ in range(4):
remote_function.remote()
See the `ray.remote package reference <package-ref.html>`__ page for specific documentation on how to use ``ray.remote``.
**Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**.
.. code:: python
>>> y1_id = f.remote(x1_id)
>>> ray.get(y1_id)
1
>>> y2_id = f.remote(x2_id)
>>> ray.get(y2_id)
[1, 2, 3]
Note the following behaviors:
- The second task will not be executed until the first task has finished
executing because the second task depends on the output of the first task.
- If the two tasks are scheduled on different machines, the output of the
first task (the value corresponding to ``x1_id``) will be sent over the
network to the machine where the second task is scheduled.
Oftentimes, you may want to specify a task's resource requirements (for example
one task may require a GPU). The ``ray.init()`` command will automatically
detect the available GPUs and CPUs on the machine. However, you can override
this default behavior by passing in specific resources, e.g.,
``ray.init(num_cpus=8, num_gpus=4, resources={'Custom': 2})``.
To specify a task's CPU and GPU requirements, pass the ``num_cpus`` and
``num_gpus`` arguments into the remote decorator. The task will only run on a
machine if there are enough CPU and GPU (and other custom) resources available
to execute the task. Ray can also handle arbitrary custom resources.
.. note::
* If you do not specify any resources in the ``@ray.remote`` decorator, the
default is 1 CPU resource and no other resources.
* If specifying CPUs, Ray does not enforce isolation (i.e., your task is
expected to honor its request.)
* If specifying GPUs, Ray does provide isolation in forms of visible devices
(setting the environment variable ``CUDA_VISIBLE_DEVICES``), but it is the
task's responsibility to actually use the GPUs (e.g., through a deep
learning framework like TensorFlow or PyTorch).
.. code-block:: python
@ray.remote(num_cpus=4, num_gpus=2)
def f():
return 1
The resource requirements of a task have implications for the Ray's scheduling
concurrency. In particular, the sum of the resource requirements of all of the
concurrently executing tasks on a given node cannot exceed the node's total
resources.
Below are more examples of resource specifications:
.. code-block:: python
# Ray also supports fractional resource requirements
@ray.remote(num_gpus=0.5)
def h():
return 1
# Ray support custom resources too.
@ray.remote(resources={'Custom': 1})
def f():
return 1
Further, remote function can return multiple object IDs.
.. code-block:: python
@ray.remote(num_return_vals=3)
def return_multiple():
return 1, 2, 3
a_id, b_id, c_id = return_multiple.remote()
Objects in Ray
--------------
In Ray, we can create and compute on objects. We refer to these objects as **remote objects**, and we use **object IDs** to refer to them. Remote objects are stored in **object stores**, and there is one object store per node in the cluster. In the cluster setting, we may not actually know which machine each object lives on.
An **object ID** is essentially a unique ID that can be used to refer to a
remote object. If you're familiar with futures, our object IDs are conceptually
similar.
Object IDs can be created in multiple ways.
1. They are returned by remote function calls.
2. They are returned by ``ray.put``.
.. code-block:: python
>>> y = 1
>>> y_id = ray.put(y)
>>> print(y_id)
ObjectID(0369a14bc595e08cfbd508dfaa162cb7feffffff)
Here is the docstring for ``ray.put``:
.. autofunction:: ray.put
:noindex:
.. important::
Remote objects are immutable. That is, their values cannot be changed after
creation. This allows remote objects to be replicated in multiple object
stores without needing to synchronize the copies.
Fetching Results
----------------
The command ``ray.get(x_id)`` takes an object ID and creates a Python object
from the corresponding remote object. For some objects like arrays, we can use
shared memory and avoid copying the object.
.. code-block:: python
>>> y = 1
>>> obj_id = ray.put(y)
>>> print(obj_id)
ObjectID(0369a14bc595e08cfbd508dfaa162cb7feffffff)
>>> ray.get(obj_id)
1
Here is the docstring for ``ray.get``:
.. autofunction:: ray.get
:noindex:
After launching a number of tasks, you may want to know which ones have
finished executing. This can be done with ``ray.wait``. The function
works as follows.
.. code:: python
ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout=None)
Here is the docstring for ``ray.wait``:
.. autofunction:: ray.wait
:noindex:
Remote Classes (Actors)
-----------------------
Actors extend the Ray API from functions (tasks) to classes. The ``ray.remote``
decorator indicates that instances of the ``Counter`` class will be actors. An
actor is essentially a stateful worker. Each actor runs in its own Python
process.
.. code-block:: python
@ray.remote
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
To create a couple actors, we can instantiate this class as follows:
.. code-block:: python
a1 = Counter.remote()
a2 = Counter.remote()
When an actor is instantiated, the following events happen.
1. A worker Python process is started on a node of the cluster.
2. A ``Counter`` object is instantiated on that worker.
You can specify resource requirements in Actors too (see the `Actors section
<actors.html>`__ for more details.)
.. code-block:: python
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor(object):
pass
We can interact with the actor by calling its methods with the ``.remote``
operator. We can then call ``ray.get`` on the object ID to retrieve the actual
value.
.. code-block:: python
obj_id = a1.increment.remote()
ray.get(obj_id) == 1
Methods called on different actors can execute in parallel, and methods called on the same actor are executed serially in the order that they are called. Methods on the same actor will share state with one another, as shown below.
.. code-block:: python
# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]
# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results) # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results) # prints [2, 3, 4, 5, 6]
To learn more about Ray Actors, see the `Actors section <actors.html>`__.

17
rllib_doc/index.rst Normal file
View file

@ -0,0 +1,17 @@
Ray
===
.. toctree::
:maxdepth: 1
:caption: RLlib
rllib.rst
rllib-training.rst
rllib-env.rst
rllib-models.rst
rllib-algorithms.rst
rllib-offline.rst
rllib-concepts.rst
rllib-examples.rst
rllib-dev.rst
rllib-package-ref.rst