Add overview of internals to documentation, improve serialization doc… (#390)

* Add overview of internals to documentation, improve serialization documentation.

* Doc edits

* Small fixes.
This commit is contained in:
Robert Nishihara 2017-03-27 21:52:17 -07:00 committed by Stephanie Wang
parent 78e1167a42
commit 8245758ccb
5 changed files with 397 additions and 229 deletions

View file

@ -36,8 +36,8 @@ Ray
:maxdepth: 1
:caption: Design
remote-functions.md
serialization.md
internals-overview.rst
serialization.rst
.. toctree::
:maxdepth: 1

View file

@ -0,0 +1,192 @@
An Overview of the Internals
============================
In this document, we trace through in more detail what happens at the system
level when certain API calls are made.
Connecting to Ray
-----------------
There are two ways that a Ray script can be initiated. It can either be run in a
standalone fashion or it can be connect to an existing Ray cluster.
Running Ray standalone
~~~~~~~~~~~~~~~~~~~~~~
Ray can be used standalone by calling ``ray.init()`` within a script. When the
call to ``ray.init()`` happens, all of the relevant processes are started.
These include a local scheduler, a global scheduler, an object store and
manager, a Redis server, and a number of worker processes.
When the script exits, these processes will be killed.
**Note:** This approach is limited to a single machine.
Connecting to an existing Ray cluster
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To connect to an existing Ray cluster, simply pass the argument address of the
Redis server as the ``redis_address=`` keyword argument into ``ray.init``. In
this case, no new processes will be started when ``ray.init`` is called, and
similarly the processes will continue running when the script exits. In this
case, all processes except workers that correspond to actors are shared between
different driver processes.
Defining a remote function
--------------------------
A central component of this system is the **centralized control plane**. This is
implemented using one or more Redis servers. `Redis`_ is an in-memory key-value
store.
.. _`Redis`: https://github.com/antirez/redis
We use the centralized control plane in two ways. First, as persistent store of
the system's control state. Second, as a message bus for communication between
processes (using Redis's publish-subscribe functionality).
Now, consider a remote function definition as below.
.. code-block:: python
@ray.remote
def f(x):
return x + 1
When the remote function is defined as above, the function is immediately
pickled, assigned a unique ID, and stored in a Redis server. You can view the
remote functions in the centralized control plane as below.
.. code-block:: python
TODO: Fill this in.
Each worker process has a separate thread running in the background that
listens for the addition of remote functions to the centralized control state.
When a new remote function is added, the thread fetches the pickled remote
function, unpickles it, and can then execute that function.
Notes and limitations
~~~~~~~~~~~~~~~~~~~~~
- Because we export remote functions as soon as they are defined, that means
that remote functions can't close over variables that are defined after the
remote function is defined. For example, the following code gives an error.
.. code-block:: python
@ray.remote
def f(x):
return helper(x)
def helper(x):
return x + 1
If you call ``f.remote(0)``, it will give an error of the form.
.. code-block:: python
Traceback (most recent call last):
File "<ipython-input-3-12a5beeb2306>", line 3, in f
NameError: name 'helper' is not defined
On the other hand, if ``helper`` is defined before ``f``, then it will work.
Calling a remote function
-------------------------
When a driver or worker invokes a remote function, a number of things happen.
- First, a task object is created. The task object includes the following.
- The ID of the function being called.
- The IDs or values of the arguments to the function. Python primitives like
integers or short strings will be pickled and included as part of the task
object. Larger or more complex objects will be put into the object store
with an internal call to ``ray.put``, and the resulting IDs are included in
the task object. Object IDs that are passed directly as arguments are also
included in the task object.
- The ID of the task. This is generated uniquely from the above content.
- The IDs for the return values of the task. These are generated uniquely
from the above content.
- The task object is then sent to the local scheduler on the same node as the
driver or worker.
- The local scheduler makes a decision to either schedule the task locally or to
pass the task on to a global scheduler.
- If all of the task's object dependencies are present in the local object
store and there are enough CPU and GPU resources available to execute the
task, then the local scheduler will assign the task to one of its
available workers.
- If those conditions are not met, the task will be passed on to a global
scheduler. This is done by adding the task to the **task table**, which is
part of the centralized control state.
The task table can be inspected as follows.
.. code-block:: python
TODO: Fill this in.
A global scheduler will be notified of the update and will assign the task
to a local scheduler by updating the task's state in the task table. The
local scheduler will be notified and pull the task object.
- Once a task has been scheduled to a local scheduler, whether by itself or by
a global scheduler, the local scheduler queues the task for execution. A task
is assigned to a worker when enough resources become available and the object
dependencies are available locally, in first-in, first-out order.
- When the task has been assigned to a worker, the worker executes the task and
puts the task's return values into the object store. The object store will
then update the **object table**, which is part of the centralized control
state, to reflect the fact that it contains the newly created objects. The
object table can be viewed as follows.
.. code-block:: python
TODO: Fill this in.
When the task's return values are placed into the object store, they are first
serialized into a contiguous blob of bytes using the `Apache Arrow`_ data
layout, which is helpful for efficiently sharing data between processes using
shared memory.
.. _`Apache Arrow`: https://arrow.apache.org/
Notes and limitations
~~~~~~~~~~~~~~~~~~~~~
- When an object store on a particular node fills up, it will begin evicting
objects in a least-recently-used manner. If an object that is needed later is
evicted, then the call to ``ray.get`` for that object will initiate the
reconstruction of the object. The local scheduler will attempt to reconstruct
the object by replaying its task lineage.
TODO: Limitations on reconstruction.
Getting an object ID
--------------------
Several things happen when a driver or worker calls ``ray.get`` on an object ID.
.. code-block:: python
ray.get(x_id)
- The driver or worker goes to the object store on the same node and requests
the relevant object. Each object store consists of two components, a
shared-memory key-value store of immutable objects, and a manager to
coordinate the transfer of objects between nodes.
- If the object is not present in the object store, the manager checks the
object table to see which other object stores, if any, have the object. It
then requests the object directly from one of those object stores, via its
manager. If the object doesn't exist anywhere, then the centralized control
state will notify the requesting manager when the object is created. If the
object doesn't exist anywhere because it has been evicted from all object
stores, the worker will also request reconstruction of the object from the
local scheduler. These checks repeat periodically until the object is
available in the local object store, whether through reconstruction or
through object transfer.
- Once the object is available in the local object store, the driver or worker
will map the relevant region of memory into its own address space (to avoid
copying the object), and will deserialize the bytes into a Python object.
Note that any numpy arrays that are part of the object will not be copied.

View file

@ -1,62 +0,0 @@
# Remote Functions
This document elaborates a bit on what can and cannot be done with remote
functions. Remote functions are written like regular Python functions, but with
the `@ray.remote` decorator on top.
```python
@ray.remote
def increment(n):
return n + 1
```
## Invocation and Execution
Normally, when a Python functions is invoked, it executes immediately and
returns once the execution has completed. For remote functions, we distinguish
between the invocation of the function (that is, when the user calls the
function) and the execution of the function (that is, when the function's
implementation is run).
**Invocation:** To invoke a remote function, the user calls the remote function.
```python
x_id = increment(1)
```
This line sends a message to the scheduler asking it to schedule the task of
executing the function `increment` with the argument `1`. It then returns an
object id for the eventual output of the task. This takes place almost
instantaneously and does not wait for the actual task to be executed.
When calling a remote function, the return value always consists of one or more
object ids. If you want the actual value, call `ray.get(x_id)`, which will
wait for the task to execute and will return the resulting value.
**Execution:** Eventually, the scheduler will schedule the task on a worker. The
worker will then run the increment function and store the function's output in
the worker's local object store.
At that point, the scheduler will be notified that the outputs of the task are
ready, and tasks that depend on those outputs can be scheduled.
## Serializable Types
It is not reasonable to store arbitrary Python objects in the object store or to
ship arbitrary Python objects between machines (for example a file handle on one
machine may not be meaningful on another). Currently, we serialize only specific
types in the object store. **The serializable types are:**
1. Primitive data types (for example, `1`, `1.0`, `"hello"`, `True`)
2. Numpy arrays
3. Object IDs
4. Lists, tuples, and dictionaries of other serializable types, but excluding
custom classes (for example, `[1, 1.0, "hello"]`, `{True: "hi", 1: ["hi"]}`)
5. Custom classes in many cases. You must explicitly register the class.
```python
class Foo(object):
pass
ray.register_class(Foo)
```

View file

@ -1,165 +0,0 @@
# Serialization in the Object Store
This document describes what Python objects Ray can and cannot serialize into
the object store. Once an object is placed in the object store, it is immutable.
There are a number of situations in which Ray will place objects in the object
store.
1. When a remote function returns, its return values are stored in the object
store.
2. A call to `ray.put(x)` places `x` in the object store.
3. When large objects or objects other than simple primitive types are passed as
arguments into remote functions, they will be placed in the object store.
A normal Python object may have pointers all over the place, so to place an
object in the object store or send it between processes, it must first be
converted to a contiguous string of bytes. This process is known as
serialization. The process of turning the string of bytes back into a Python
object is known as deserialization. Serialization and deserialization are often
bottlenecks in distributed computing.
Pickle is one example of a library for serialization and deserialization in
Python.
```python
import pickle
pickle.dumps([1, 2, 3]) # prints b'\x80\x03]q\x00(K\x01K\x02K\x03e.'
pickle.loads(b'\x80\x03]q\x00(K\x01K\x02K\x03e.') # prints [1, 2, 3]
```
Pickle (and its variants) are pretty general. They can successfully serialize a
large variety of Python objects. However, for numerical workloads, pickling and
unpickling can be inefficient. For example, when unpickling a list of numpy
arrays, pickle will create completely new arrays in memory. In Ray, when we
deserialize a list of numpy arrays from the object store, we will create a list
of numpy array objects in Python, but each numpy array object is essentially
just a pointer to the relevant location in shared memory. There are some
advantages to this form of serialization.
- Deserialization can be very fast.
- Memory is shared between processes so worker processes can all read the same
data without having to copy it.
## What Objects Does Ray Handle
However, Ray is not currently capable of serializing arbitrary Python objects.
The set of Python objects that Ray can serialize includes the following.
1. Primitive types: ints, floats, longs, bools, strings, unicode, and numpy
arrays.
2. Any list, dictionary, or tuple whose elements can be serialized by Ray.
3. Objects whose classes can be registered with `ray.register_class`. This point
is described below.
## Registering Custom Classes
We currently support serializing a limited subset of custom classes. For
example, suppose you define a new class `Foo` as follows.
```python
class Foo(object):
def __init__(self, a, b):
self.a = a
self.b = b
```
Simply calling `ray.put(Foo(1, 2))` will fail with a message like
```
Ray does not know how to serialize the object <__main__.Foo object at 0x1077d7c50>.
```
This can be addressed by calling `ray.register_class(Foo)`.
```python
import ray
ray.init(num_workers=10)
# Define a custom class.
class Foo(object):
def __init__(self, a, b):
self.a = a
self.b = b
# Calling ray.register_class(Foo) ships the class definition to all of the
# workers so that workers know how to construct new Foo objects.
ray.register_class(Foo)
# Create a Foo object, place it in the object store, and retrieve it.
f = Foo(1, 2)
f_id = ray.put(f)
ray.get(f_id) # prints <__main__.Foo at 0x1078128d0>
```
Under the hood, `ray.put` essentially replaces `f` with `f.__dict__`, which is
just the dictionary `{"a": 1, "b": 2}`. Then during deserialization, `ray.get`
constructs a new `Foo` object from the dictionary of fields.
This naive substitution won't work in all cases. For example if we want to
serialize Python objects of type `function` (for example `f = lambda x: x + 1`),
this simple scheme doesn't quite work, and `ray.register_class(type(f))` will
give an error message. In these cases, we can fall back to pickle (actually we
use cloudpickle).
```python
# This call tells Ray to fall back to using pickle when it encounters objects of
# type function.
f = lambda x: x + 1
ray.register_class(type(f), pickle=True)
f_new = ray.get(ray.put(f))
f_new(0) # prints 1
```
However, it's best to avoid using pickle for efficiency reasons. If you find
yourself needing to pickle certain objects, consider trying to use more
efficient data structures like arrays.
**Note:** Another setting where the naive replacement of an object with its
`__dict__` attribute fails is where an object recursively contains itself (or
multiple objects recursively contain each other). For example, consider the code
below.
```python
l = []
l.append(l)
# Try to put this list that recursively contains itself in the object store.
ray.put(l)
```
It will throw an exception with a message like the following.
```
This object exceeds the maximum recursion depth. It may contain itself recursively.
```
## Last Resort Workaround
If you find cases where Ray doesn't work or does the wrong thing, please let us
know so we can fix it. In the meantime, you can do your own custom serialization
and deserialization (for example by calling pickle by hand). Or by writing your
own custom serializer and deserializer.
```python
import pickle
@ray.remote
def f(complicated_object):
# Deserialize the object manually.
obj = pickle.loads(complicated_object)
return "Successfully passed {} into f.".format(obj)
# Define a complicated object.
l = []
l.append(l)
# Manually serialize the object and pass it in as a string.
ray.get(f.remote(pickle.dumps(l))) # prints 'Successfully passed [[...]] into f.'
```
**Note:** If you have trouble with pickle, you may have better luck with
cloudpickle.

View file

@ -0,0 +1,203 @@
Serialization in the Object Store
=================================
This document describes what Python objects Ray can and cannot serialize into
the object store. Once an object is placed in the object store, it is immutable.
There are a number of situations in which Ray will place objects in the object
store.
1. The return values of a remote function.
2. The value ``x`` in a call to ``ray.put(x)``.
3. Large objects or objects other than simple primitive types that are passed
as arguments into remote functions.
A Python object may have an arbitrary number of pointers with arbitrarily deep
nesting. To place an object in the object store or send it between processes,
it must first be converted to a contiguous string of bytes. This process is
known as serialization. The process of converting the string of bytes back into a
Python object is known as deserialization. Serialization and deserialization
are often bottlenecks in distributed computing if the time needed to compute
on the data is relatively low.
Pickle is one example of a library for serialization and deserialization in
Python.
.. code-block::python
import pickle
pickle.dumps([1, 2, 3]) # prints b'\x80\x03]q\x00(K\x01K\x02K\x03e.'
pickle.loads(b'\x80\x03]q\x00(K\x01K\x02K\x03e.') # prints [1, 2, 3]
Pickle (and the variant we use, cloudpickle) is general-purpose. It can
serialize a large variety of Python objects. However, for numerical workloads,
pickling and unpickling can be inefficient. For example, if multiple processes
want to access a Python list of numpy arrays, each process must unpickle the
list and create its own new copies of the arrays. This can lead to high memory
overheads, even when all processes are read-only and could easily share memory.
In Ray, we optimize for numpy arrays by using the `Apache Arrow`_ data format.
When we deserialize a list of numpy arrays from the object store, we still
create a Python list of numpy array objects. However, rather than copy each
numpy array over again, each numpy array object holds a pointer to the relevant
array held in shared memory. There are some advantages to this form of
serialization.
- Deserialization can be very fast.
- Memory is shared between processes so worker processes can all read the same
data without having to copy it.
.. _`Apache Arrow`: https://arrow.apache.org/
What Objects Does Ray Handle
----------------------------
Ray does not currently support serialization of arbitrary Python objects. The
set of Python objects that Ray can serialize includes the following.
1. Primitive types: ints, floats, longs, bools, strings, unicode, and numpy
arrays.
2. Any list, dictionary, or tuple whose elements can be serialized by Ray.
3. Objects whose classes can be registered with ``ray.register_class``. This
point is described below.
Registering Custom Classes
--------------------------
We currently support serializing a limited subset of custom classes. For
example, suppose you define a new class ``Foo`` as follows.
.. code-block:: python
class Foo(object):
def __init__(self, a, b):
self.a = a
self.b = b
Simply calling ``ray.put(Foo(1, 2))`` will fail with a message like
.. code-block:: python
Ray does not know how to serialize the object <__main__.Foo object at 0x1077d7c50>.
This can be addressed by calling ``ray.register_class(Foo)``.
.. code-block:: python
import ray
ray.init(num_workers=10)
# Define a custom class.
class Foo(object):
def __init__(self, a, b):
self.a = a
self.b = b
# Calling ray.register_class(Foo) ships the class definition to all of the
# workers so that workers know how to construct new Foo objects.
ray.register_class(Foo)
# Create a Foo object, place it in the object store, and retrieve it.
f = Foo(1, 2)
f_id = ray.put(f)
ray.get(f_id) # prints <__main__.Foo at 0x1078128d0>
Under the hood, ``ray.put`` places ``f.__dict__``, the dictionary of attributes
of ``f``, into the object store instead of ``f`` itself. In this case, this is
the dictionary, ``{"a": 1, "b": 2}``. Then during deserialization, ``ray.get``
constructs a new ``Foo`` object from the dictionary of fields.
This naive substitution won't work in all cases. For example, this scheme does
not support Python objects of type ``function`` (e.g., ``f = lambda x: x +
1``). In these cases, the call to ``ray.register_class`` will give an error
message, and you should fall back to pickle.
.. code-block:: python
# This call tells Ray to fall back to using pickle when it encounters objects
# of type function.
f = lambda x: x + 1
ray.register_class(type(f), pickle=True)
f_new = ray.get(ray.put(f))
f_new(0) # prints 1
However, it's best to avoid using pickle for the efficiency reasons described
above. If you find yourself needing to pickle certain objects, consider trying
to use more efficient data structures like arrays.
**Note:** Another setting where the naive replacement of an object with its
``__dict__`` attribute fails is recursion, e.g., an object contains itself or
multiple objects contain each other. To see more examples of this, see the
section `Notes and Limitations`_.
Notes and limitations
---------------------
- We currently handle certain patterns incorrectly, according to Python
semantics. For example, a list that contains two copies of the same list will
be serialized as if the two lists were distinct.
.. code-block:: python
l1 = [0]
l2 = [l1, l1]
l3 = ray.get(ray.put(l2))
l2[0] is l2[1] # True.
l3[0] is l3[1] # False.
- For reasons similar to the above example, we also do not currently handle
objects that recursively contain themselves (this may be common in graph-like
data structures).
.. code-block:: python
l = []
l.append(l)
# Try to put this list that recursively contains itself in the object store.
ray.put(l)
This will throw an exception with a message like the following.
.. code-block:: bash
This object exceeds the maximum recursion depth. It may contain itself recursively.
- If you need to pass a custom class into a remote function, you should call
``ray.register_class`` on the class **before defining the remote function**.
- Whenever possible, use numpy arrays for maximum performance.
Last Resort Workaround
----------------------
If you find cases where Ray serialization doesn't work or does something
unexpected, please `let us know`_ so we can fix it. In the meantime, you may
have to resort to writing custom serialization and deserialization code (e.g.,
calling pickle by hand).
.. _`let us know`: https://github.com/ray-project/ray/issues
.. code-block:: python
import pickle
@ray.remote
def f(complicated_object):
# Deserialize the object manually.
obj = pickle.loads(complicated_object)
return "Successfully passed {} into f.".format(obj)
# Define a complicated object.
l = []
l.append(l)
# Manually serialize the object and pass it in as a string.
ray.get(f.remote(pickle.dumps(l))) # prints 'Successfully passed [[...]] into f.'
**Note:** If you have trouble with pickle, you may have better luck with
cloudpickle.