mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
update tutorial (#318)
This commit is contained in:
parent
f79494c8c6
commit
8d5e61d3c0
14 changed files with 357 additions and 339 deletions
41
README.md
41
README.md
|
@ -2,8 +2,8 @@
|
|||
|
||||
[](https://travis-ci.org/amplab/ray)
|
||||
|
||||
Ray is an experimental distributed execution framework with a Python-like
|
||||
programming model. It is under development and not ready for general use.
|
||||
Ray is an experimental distributed extension of Python. It is under development
|
||||
and not ready for general use.
|
||||
|
||||
The goal of Ray is to make it easy to write machine learning applications that
|
||||
run on a cluster while providing the development and debugging experience of
|
||||
|
@ -12,35 +12,37 @@ working on a single machine.
|
|||
Before jumping into the details, here's a simple Python example for doing a
|
||||
Monte Carlo estimation of pi (using multiple cores or potentially multiple
|
||||
machines).
|
||||
```python
|
||||
import ray
|
||||
import functions # See definition below
|
||||
|
||||
results = []
|
||||
for _ in range(10):
|
||||
results.append(functions.estimate_pi(100))
|
||||
estimate = np.mean([ray.get(ref) for ref in results])
|
||||
print "Pi is approximately {}.".format(estimate)
|
||||
```
|
||||
|
||||
This assumes that we've defined the file `functions.py` as below.
|
||||
```python
|
||||
import ray
|
||||
import numpy as np
|
||||
|
||||
# Start a scheduler, an object store, and some workers.
|
||||
ray.services.start_ray_local(num_workers=10)
|
||||
|
||||
# Define a remote function for estimating pi.
|
||||
@ray.remote([int], [float])
|
||||
def estimate_pi(n):
|
||||
x = np.random.uniform(size=n)
|
||||
y = np.random.uniform(size=n)
|
||||
return 4 * np.mean(x ** 2 + y ** 2 < 1)
|
||||
|
||||
# Launch 10 tasks, each of which estimates pi.
|
||||
results = []
|
||||
for _ in range(10):
|
||||
results.append(estimate_pi(100))
|
||||
|
||||
# Fetch the results of the tasks and print their average.
|
||||
estimate = np.mean([ray.get(ref) for ref in results])
|
||||
print "Pi is approximately {}.".format(estimate)
|
||||
```
|
||||
|
||||
Within the for loop, each call to `functions.estimate_pi(100)` sends a message
|
||||
to the scheduler asking it to schedule the task of running
|
||||
`functions.estimate_pi` with the argument `100`. This call returns right away
|
||||
without waiting for the actual estimation of pi to take place. Instead of
|
||||
returning a float, it returns an **object reference**, which represents the
|
||||
eventual output of the computation.
|
||||
Within the for loop, each call to `estimate_pi(100)` sends a message to the
|
||||
scheduler asking it to schedule the task of running `estimate_pi` with the
|
||||
argument `100`. This call returns right away without waiting for the actual
|
||||
estimation of pi to take place. Instead of returning a float, it returns an
|
||||
**object reference**, which represents the eventual output of the computation
|
||||
(this is a similar to a Future).
|
||||
|
||||
The call to `ray.get(ref)` takes an object reference and returns the actual
|
||||
estimate of pi (waiting until the computation has finished if necessary).
|
||||
|
@ -48,7 +50,6 @@ estimate of pi (waiting until the computation has finished if necessary).
|
|||
## Next Steps
|
||||
|
||||
- Installation on [Ubuntu](doc/install-on-ubuntu.md), [Mac OS X](doc/install-on-macosx.md), [Windows](doc/install-on-windows.md)
|
||||
- [Basic Usage](doc/basic-usage.md)
|
||||
- [Tutorial](doc/tutorial.md)
|
||||
- [About the System](doc/about-the-system.md)
|
||||
- [Using Ray on a Cluster](doc/using-ray-on-a-cluster.md)
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
## About the System
|
||||
# About the System
|
||||
|
||||
This document describes the current architecture of Ray. However, some of these
|
||||
decisions are likely to change.
|
||||
|
||||
### Components
|
||||
## Components
|
||||
|
||||
A Ray cluster consists of several components.
|
||||
|
||||
|
@ -12,20 +12,20 @@ A Ray cluster consists of several components.
|
|||
- One object store per node
|
||||
- One (or more) drivers
|
||||
|
||||
#### The scheduler
|
||||
### The scheduler
|
||||
|
||||
The scheduler assigns tasks to the workers.
|
||||
|
||||
#### The workers
|
||||
### The workers
|
||||
|
||||
The workers execute tasks and submit tasks to the scheduler.
|
||||
|
||||
#### The object store
|
||||
### The object store
|
||||
|
||||
The object store shares objects between the worker processes on the same node so
|
||||
that the workers don't need to each have their own copies of the objects.
|
||||
|
||||
#### The driver
|
||||
### The driver
|
||||
|
||||
The driver submits tasks to the scheduler. If you use Ray in a script, the
|
||||
Python process running the script is the driver. If you use Ray interactively
|
||||
|
|
|
@ -1,207 +0,0 @@
|
|||
## Basic Usage
|
||||
|
||||
To use Ray, you need to understand the following:
|
||||
|
||||
- How Ray uses object references to represent immutable remote objects.
|
||||
- How Ray constructs computation graphs using remote functions.
|
||||
|
||||
### Immutable remote objects
|
||||
|
||||
In Ray, we can create and manipulate objects. We refer to these objects as
|
||||
**remote objects**, and we use **object references** to refer to them. Remote
|
||||
objects are stored in **object stores**, and there is one object store per node
|
||||
in the cluster. In the cluster setting, we may not actually know which machine
|
||||
each object lives on.
|
||||
|
||||
An **object reference** is essentially a unique ID that can be used to refer to
|
||||
a remote object. If you're familiar with Futures, our object references are
|
||||
conceptually similar.
|
||||
|
||||
We assume that remote objects are immutable. That is, their values cannot be
|
||||
changed after creation. This allows remote objects to be replicated in multiple
|
||||
object stores without needing to synchronize the copies.
|
||||
|
||||
#### Put and Get
|
||||
|
||||
The commands `ray.get` and `ray.put` can be used to convert between Python
|
||||
objects and object references, as shown in the example below.
|
||||
```python
|
||||
>>> x = [1, 2, 3]
|
||||
>>> ray.put(x)
|
||||
<ray.ObjRef at 0x1031baef0>
|
||||
```
|
||||
|
||||
The command `ray.put(x)` would be run by a worker process or by the driver
|
||||
process (the driver process is the one running your script). It takes a Python
|
||||
object and copies it to the local object store (here *local* means *on the same
|
||||
node*). Once the object has been stored in the object store, its value cannot be
|
||||
changed.
|
||||
|
||||
In addition, `ray.put(x)` returns an object reference, which is essentially an
|
||||
ID that can be used to refer to the newly created remote object. If we save the
|
||||
object reference in a variable with `ref = ray.put(x)`, then we can pass `ref`
|
||||
into remote functions, and those remote functions will operate on the
|
||||
corresponding remote object.
|
||||
|
||||
The command `ray.get(ref)` takes an object reference and creates a Python object
|
||||
from the corresponding remote object. For some objects like arrays, we can use
|
||||
shared memory and avoid copying the object. For other objects, this currently
|
||||
copies the object from the object store into the memory of the worker process.
|
||||
If the remote object corresponding to the object reference `ref` does not live
|
||||
on the same node as the worker that calls `ray.get(ref)`, then the remote object
|
||||
will first be copied from an object store that has it to the object store that
|
||||
needs it.
|
||||
```python
|
||||
>>> ref = ray.put([1, 2, 3])
|
||||
>>> ray.get(ref)
|
||||
[1, 2, 3]
|
||||
```
|
||||
|
||||
If the remote object corresponding to the object reference `ref` has not been
|
||||
created yet, the command `ray.get(ref)` will wait until the remote object has
|
||||
been created.
|
||||
|
||||
### Computation graphs in Ray
|
||||
|
||||
Ray represents computation with a directed acyclic graph of tasks. Tasks are
|
||||
added to this graph by calling **remote functions**.
|
||||
|
||||
For example, a normal Python function looks like this.
|
||||
```python
|
||||
def add(a, b):
|
||||
return a + b
|
||||
```
|
||||
A remote function in Ray looks like this.
|
||||
```python
|
||||
@ray.remote([int, int], [int])
|
||||
def add(a, b):
|
||||
return a + b
|
||||
```
|
||||
|
||||
The information passed to the `@ray.remote` decorator includes type information
|
||||
for the arguments and for the return values of the function. Because of the
|
||||
distinction that we make between *submitting a task* and *executing the task*,
|
||||
we require type information so that we can catch type errors when the remote
|
||||
function is called instead of catching them when the task is actually executed.
|
||||
|
||||
However, the only piece of information that is fundamentally required by the
|
||||
system is the number of return values (because the system must assign the
|
||||
correct number of object references to the outputs before the function has
|
||||
actually executed and produced any outputs).
|
||||
|
||||
#### Remote functions
|
||||
|
||||
Whereas in regular Python, calling `add(1, 2)` would return `3`, in Ray, calling
|
||||
`add(1, 2)` does not actually execute the task. Instead, it adds a task to the
|
||||
computation graph and immediately returns an object reference to the output of
|
||||
the computation.
|
||||
|
||||
```python
|
||||
>>> ref = add(1, 2)
|
||||
>>> ray.get(ref)
|
||||
3
|
||||
```
|
||||
|
||||
There is a sharp distinction between *submitting a task* and *executing the
|
||||
task*. When a remote function is called, the task of executing that function is
|
||||
submitted to the scheduler, and the scheduler immediately returns object
|
||||
references for the outputs of the task. However, the task will not be executed
|
||||
until the scheduler actually schedules the task on a worker.
|
||||
|
||||
When a task is submitted, each argument may be passed in by value or by object
|
||||
reference. For example, these lines have the same behavior.
|
||||
|
||||
```python
|
||||
>>> add(1, 2)
|
||||
>>> add(1, ray.put(2))
|
||||
>>> add(ray.put(1), ray.put(2))
|
||||
```
|
||||
|
||||
Remote functions never return actual values, they always return object
|
||||
references.
|
||||
|
||||
When the remote function is actually executed, it operates on Python objects.
|
||||
That is, if the remote function was called with any object references, the
|
||||
Python objects corresponding to those object references will be retrieved and
|
||||
passed into the actual execution of the remote function.
|
||||
|
||||
#### Blocking computation
|
||||
|
||||
In a regular Python script, the specification of a computation is intimately
|
||||
linked to the actual execution of the code. For example, consider the following
|
||||
code.
|
||||
```python
|
||||
result = []
|
||||
for i in range(10):
|
||||
result.append(np.zeros(size=[100, 100]))
|
||||
```
|
||||
|
||||
At the core of the above script, there are 10 separate tasks, each of which
|
||||
generates a 100x100 matrix of zeros. These tasks do not depend on each other, so
|
||||
in principle, they could be executed in parallel. However, in the above
|
||||
implementation, they will be executed serially.
|
||||
|
||||
Ray gets around this by representing computation as a graph of tasks, where some
|
||||
tasks depend on the outputs of other tasks and where tasks can be executed once
|
||||
their dependencies have been executed.
|
||||
|
||||
For example, suppose we define the remote function `zeros` to be a wrapper
|
||||
around `np.zeros`.
|
||||
```python
|
||||
from typing import List
|
||||
import numpy as np
|
||||
|
||||
@ray.remote([List[int]], [np.ndarray])
|
||||
def zeros(shape):
|
||||
return np.zeros(shape)
|
||||
```
|
||||
Then we can write
|
||||
```python
|
||||
result_refs = []
|
||||
for i in range(10):
|
||||
result.append(zeros([100, 100]))
|
||||
```
|
||||
This adds 10 tasks to the computation graph, with no dependencies between the
|
||||
tasks.
|
||||
|
||||
The computation graph looks like this.
|
||||
|
||||
<p align="center">
|
||||
<img src="figures/compgraph2.png">
|
||||
</p>
|
||||
|
||||
In this figure, boxes are tasks and ovals are objects.
|
||||
|
||||
The box that says `op-root` in it just refers to the overall script itself. The
|
||||
dotted lines indicate that the script launched 10 tasks (tasks are denoted by
|
||||
rectangular boxes). The solid lines indicate that each task produces one output
|
||||
(represented by an oval).
|
||||
|
||||
It is clear from the computation graph that these ten tasks can be executed in
|
||||
parallel.
|
||||
|
||||
Computation graphs encode dependencies. For example, suppose we define
|
||||
```python
|
||||
ray.remote([np.ndarray, np.ndarray], [np.ndarray])
|
||||
def dot(a, b):
|
||||
return np.dot(a, b)
|
||||
```
|
||||
Then we run
|
||||
```python
|
||||
aref = zeros([10, 10])
|
||||
bref = zeros([10, 10])
|
||||
cref = dot(aref, bref)
|
||||
```
|
||||
The corresponding computation graph looks like this.
|
||||
|
||||
<p align="center">
|
||||
<img src="figures/compgraph3.png" width="300">
|
||||
</p>
|
||||
|
||||
|
||||
The three dashed lines indicate that the script launched three tasks (the two
|
||||
`zeros` tasks and the one `dot` task). Each task produces a single output, and
|
||||
the `dot` task depends on the outputs of the two `zeros` tasks.
|
||||
|
||||
This makes it clear that the two `zeros` tasks can execute in parallel but that
|
||||
the `dot` task must wait until the two `zeros` tasks have finished.
|
Binary file not shown.
Before Width: | Height: | Size: 2.8 KiB After Width: | Height: | Size: 4.8 KiB |
Binary file not shown.
Before Width: | Height: | Size: 4.5 KiB After Width: | Height: | Size: 2.6 KiB |
Binary file not shown.
Before Width: | Height: | Size: 2.6 KiB |
|
@ -1,14 +1,14 @@
|
|||
## Installation on Mac OS X
|
||||
# Installation on Mac OS X
|
||||
|
||||
Ray must currently be built from source.
|
||||
|
||||
### Clone the Ray repository
|
||||
## Clone the Ray repository
|
||||
|
||||
```
|
||||
git clone https://github.com/amplab/ray.git
|
||||
```
|
||||
|
||||
### Dependencies
|
||||
## Dependencies
|
||||
|
||||
First install the dependencies using brew. We currently do not support Python 3.
|
||||
If you have trouble installing the Python packages, you may find it easier to
|
||||
|
@ -22,7 +22,7 @@ sudo pip install ipython --user
|
|||
sudo pip install numpy typing funcsigs subprocess32 protobuf==3.0.0a2 colorama graphviz cloudpickle --ignore-installed six
|
||||
```
|
||||
|
||||
### Build
|
||||
## Build
|
||||
|
||||
Then run the setup scripts.
|
||||
|
||||
|
@ -37,7 +37,7 @@ For convenience, you may also want to add the line `source
|
|||
"$RAY_ROOT/setup-env.sh"` to the bottom of your `~/.bashrc` file manually, where
|
||||
`$RAY_ROOT` is the Ray directory (e.g., `/home/ubuntu/ray`).
|
||||
|
||||
### Test if the installation succeeded
|
||||
## Test if the installation succeeded
|
||||
|
||||
To test if the installation was successful, try running some tests.
|
||||
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
## Installation on Ubuntu
|
||||
# Installation on Ubuntu
|
||||
|
||||
Ray must currently be built from source.
|
||||
|
||||
### Clone the Ray repository
|
||||
## Clone the Ray repository
|
||||
|
||||
```
|
||||
git clone https://github.com/amplab/ray.git
|
||||
```
|
||||
|
||||
### Dependencies
|
||||
## Dependencies
|
||||
|
||||
First install the dependencies. We currently do not support Python 3.
|
||||
|
||||
|
@ -18,7 +18,7 @@ sudo apt-get install -y git cmake build-essential autoconf curl libtool python-d
|
|||
sudo pip install ipython typing funcsigs subprocess32 protobuf==3.0.0a2 colorama graphviz cloudpickle
|
||||
```
|
||||
|
||||
### Build
|
||||
## Build
|
||||
|
||||
Then run the setup scripts.
|
||||
|
||||
|
@ -33,7 +33,7 @@ For convenience, you may also want to add the line `source
|
|||
"$RAY_ROOT/setup-env.sh"` to the bottom of your `~/.bashrc` file manually, where
|
||||
`$RAY_ROOT` is the Ray directory (e.g., `/home/ubuntu/ray`).
|
||||
|
||||
### Test if the installation succeeded
|
||||
## Test if the installation succeeded
|
||||
|
||||
To test if the installation was successful, try running some tests.
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
## Installation on Windows
|
||||
# Installation on Windows
|
||||
|
||||
Ray currently does not run on Windows. However, it can be compiled with the
|
||||
following instructions.
|
||||
|
@ -16,7 +16,7 @@ re-running the batch file.
|
|||
3. `git clone https://github.com/amplab/ray.git`
|
||||
4. `ray\thirdparty\download_thirdparty.bat`
|
||||
|
||||
### Test if the installation succeeded
|
||||
## Test if the installation succeeded
|
||||
|
||||
To test if the installation was successful, try running some tests.
|
||||
|
||||
|
|
79
doc/remote-functions.md
Normal file
79
doc/remote-functions.md
Normal file
|
@ -0,0 +1,79 @@
|
|||
# Remote Functions
|
||||
|
||||
This document elaborates a bit on what can and cannot be done with remote
|
||||
functions. Remote functions are written like regular Python functions, but with
|
||||
the `@ray.remote` decorator on top.
|
||||
|
||||
```python
|
||||
@ray.remote([int], [int])
|
||||
def increment(n):
|
||||
return n + 1
|
||||
```
|
||||
|
||||
## Invocation and Execution
|
||||
|
||||
Normally, when a Python functions is invoked, it executes immediately and
|
||||
returns once the execution has completed. For remote functions, we distinguish
|
||||
between the invocation of the function (that is, when the user calls the
|
||||
function) and the execution of the function (that is, when the function's
|
||||
implementation is run).
|
||||
|
||||
**Invocation:** To invoke a remote function, the user calls the remote function.
|
||||
|
||||
```python
|
||||
ref = increment(1)
|
||||
```
|
||||
|
||||
This line sends a message to the scheduler asking it to schedule the task of
|
||||
executing the function `increment` with the argument `1`. It then returns an
|
||||
object reference for the eventual output of the task. This takes place almost
|
||||
instantaneously and does not wait for the actual task to be executed.
|
||||
|
||||
When calling a remote function, the return value always consists of one or more
|
||||
object references. If you want the actual value, call `ray.get(ref)`, which will
|
||||
wait for the task to execute and will return the resulting value.
|
||||
|
||||
**Execution:** Eventually, the scheduler will schedule the task on a worker. The
|
||||
worker will then run the increment function and store the function's output in
|
||||
the worker's local object store.
|
||||
|
||||
At that point, the scheduler will be notified that the outputs of the task are
|
||||
ready, and tasks that depend on those outputs can be scheduled.
|
||||
|
||||
## Serializable Types
|
||||
|
||||
It is not reasonable to store arbitrary Python objects in the object store or to
|
||||
ship arbitrary Python objects between machines (for example a file handle on one
|
||||
machine may not be meaningful on another). Currently, we serialize only specific
|
||||
types in the object store. **The serializable types are:**
|
||||
|
||||
1. Primitive data types (for example, `1`, `1.0`, `"hello"`, `True`)
|
||||
2. Numpy arrays
|
||||
3. Object references
|
||||
4. Lists, tuples, and dictionaries of other serializable types, but excluding
|
||||
custom classes (for example, `[1, 1.0, "hello"]`, `{True: "hi", 1: ["hi"]}`)
|
||||
5. Custom classes where the user has provided `serialize` and `desererialize`
|
||||
methods
|
||||
|
||||
If you wish to define a custom class and to allow it to be serialized in the
|
||||
object store, you must implement `serialize` and `deserialize` methods which
|
||||
convert the object to and from primitive data types. A simple example is shown
|
||||
below.
|
||||
|
||||
```python
|
||||
BLOCK_SIZE = 1000
|
||||
|
||||
class ExampleClass(object):
|
||||
def __init__(self, field1, field2):
|
||||
# This example assumes that field1 and field2 are serializable types.
|
||||
self.field1 = field1
|
||||
self.field2 = field2
|
||||
|
||||
def deserialize(self, primitives):
|
||||
(field1, field2) = primitives
|
||||
self.field1 = field1
|
||||
self.field2 = field2
|
||||
|
||||
def serialize(self):
|
||||
return (self.field1, self.field2)
|
||||
```
|
307
doc/tutorial.md
307
doc/tutorial.md
|
@ -1,118 +1,265 @@
|
|||
## Tutorial
|
||||
# Tutorial
|
||||
|
||||
This section assumes that Ray has been built. See the [instructions for
|
||||
installing Ray](download-and-setup.md)
|
||||
To use Ray, you need to understand the following:
|
||||
|
||||
### Trying it out
|
||||
- How Ray uses object references to represent immutable remote objects.
|
||||
- How Ray constructs a computation graph using remote functions.
|
||||
|
||||
Start a shell by running this command.
|
||||
```
|
||||
python scripts/shell.py
|
||||
```
|
||||
By default, this will start up several things
|
||||
## Overview
|
||||
|
||||
- 1 scheduler (for assigning tasks to workers)
|
||||
- 1 object store (for sharing objects between worker processes)
|
||||
- 10 workers (for executing tasks)
|
||||
- 1 driver (for submitting tasks to the scheduler)
|
||||
Ray is a distributed extension of Python. When using Ray, several processes are
|
||||
involved.
|
||||
|
||||
Each of the above items, and each worker, is its own process.
|
||||
- A **scheduler**: The scheduler assigns tasks to workers. It is its own
|
||||
process.
|
||||
- Multiple **workers**: Workers execute tasks and store the results in object
|
||||
stores. Each worker is a separate process.
|
||||
- One **object store** per node: The object store enables the sharing of Python
|
||||
objects between worker processes so each worker does not have to have a separate
|
||||
copy.
|
||||
- A **driver**: The driver is the Python process that the user controls and
|
||||
which submits tasks to the scheduler. For example, if the user is running a
|
||||
script or using a Python shell, then the driver is the process that runs the
|
||||
script or the shell.
|
||||
|
||||
The shell that you just started is the driver process.
|
||||
## Starting Ray
|
||||
|
||||
You can take a Python object and store it in the object store using `ray.put`.
|
||||
This turns it into a **remote object** (we are currently on a single machine,
|
||||
but the terminology makes sense if we are on a cluster), and allows it to be
|
||||
shared among the worker processes. The function `ray.put` returns an object
|
||||
reference that is used to identify this remote object.
|
||||
To start Ray, start Python, and run the following commands.
|
||||
|
||||
```python
|
||||
>>> xref = ray.put([1, 2, 3])
|
||||
import ray
|
||||
ray.services.start_ray_local(num_workers=10)
|
||||
```
|
||||
|
||||
We can use `ray.get` to retrieve the object corresponding to an object
|
||||
reference.
|
||||
That command starts a scheduler, one object store, and ten workers. Each of
|
||||
these are distinct processes. They will be killed when you exit the Python
|
||||
interpreter. They can also be killed manually with the following command.
|
||||
|
||||
```
|
||||
killall scheduler objstore python
|
||||
```
|
||||
|
||||
## Immutable remote objects
|
||||
|
||||
In Ray, we can create and manipulate objects. We refer to these objects as
|
||||
**remote objects**, and we use **object references** to refer to them. Remote
|
||||
objects are stored in **object stores**, and there is one object store per node
|
||||
in the cluster. In the cluster setting, we may not actually know which machine
|
||||
each object lives on.
|
||||
|
||||
An **object reference** is essentially a unique ID that can be used to refer to
|
||||
a remote object. If you're familiar with Futures, our object references are
|
||||
conceptually similar.
|
||||
|
||||
We assume that remote objects are immutable. That is, their values cannot be
|
||||
changed after creation. This allows remote objects to be replicated in multiple
|
||||
object stores without needing to synchronize the copies.
|
||||
|
||||
### Put and Get
|
||||
|
||||
The commands `ray.get` and `ray.put` can be used to convert between Python
|
||||
objects and object references, as shown in the example below.
|
||||
|
||||
```python
|
||||
>>> ray.get(xref)
|
||||
[1, 2, 3]
|
||||
```
|
||||
We can call a remote function.
|
||||
```python
|
||||
>>> ref = example_functions.increment(1)
|
||||
>>>ray.get(ref)
|
||||
2
|
||||
x = [1, 2, 3]
|
||||
ray.put(x) # prints <ray.ObjRef at 0x1031baef0>
|
||||
```
|
||||
|
||||
Note that `example_functions.increment` is defined in
|
||||
[`scripts/example_functions.py`](../scripts/example_functions.py) as
|
||||
The command `ray.put(x)` would be run by a worker process or by the driver
|
||||
process (the driver process is the one running your script). It takes a Python
|
||||
object and copies it to the local object store (here *local* means *on the same
|
||||
node*). Once the object has been stored in the object store, its value cannot be
|
||||
changed.
|
||||
|
||||
In addition, `ray.put(x)` returns an object reference, which is essentially an
|
||||
ID that can be used to refer to the newly created remote object. If we save the
|
||||
object reference in a variable with `ref = ray.put(x)`, then we can pass `ref`
|
||||
into remote functions, and those remote functions will operate on the
|
||||
corresponding remote object.
|
||||
|
||||
The command `ray.get(ref)` takes an object reference and creates a Python object
|
||||
from the corresponding remote object. For some objects like arrays, we can use
|
||||
shared memory and avoid copying the object. For other objects, this currently
|
||||
copies the object from the object store into the memory of the worker process.
|
||||
If the remote object corresponding to the object reference `ref` does not live
|
||||
on the same node as the worker that calls `ray.get(ref)`, then the remote object
|
||||
will first be copied from an object store that has it to the object store that
|
||||
needs it.
|
||||
|
||||
```python
|
||||
>>> ref = ray.put([1, 2, 3])
|
||||
>>> ray.get(ref) # prints [1, 2, 3]
|
||||
```
|
||||
|
||||
If the remote object corresponding to the object reference `ref` has not been
|
||||
created yet, *the command `ray.get(ref)` will wait until the remote object has
|
||||
been created.*
|
||||
|
||||
## Computation graphs in Ray
|
||||
|
||||
Ray represents computation with a directed acyclic graph of tasks. Tasks are
|
||||
added to this graph by calling **remote functions**.
|
||||
|
||||
For example, a normal Python function looks like this.
|
||||
```python
|
||||
def add(a, b):
|
||||
return a + b
|
||||
```
|
||||
A remote function in Ray looks like this.
|
||||
```python
|
||||
@ray.remote([int, int], [int])
|
||||
def add(a, b):
|
||||
return a + b
|
||||
```
|
||||
|
||||
The information passed to the `@ray.remote` decorator includes type information
|
||||
for the arguments and for the return values of the function. Because of the
|
||||
distinction that we make between *submitting a task* and *executing the task*,
|
||||
we require type information so that we can catch type errors when the remote
|
||||
function is called instead of catching them when the task is actually executed
|
||||
(which could be much later and could be on a different machine).
|
||||
|
||||
### Remote functions
|
||||
|
||||
Whereas in regular Python, calling `add(1, 2)` would return `3`, in Ray, calling
|
||||
`add(1, 2)` does not actually execute the task. Instead, it adds a task to the
|
||||
computation graph and immediately returns an object reference to the output of
|
||||
the computation.
|
||||
|
||||
```python
|
||||
>>> ref = add(1, 2)
|
||||
>>> ray.get(ref) # prints 3
|
||||
```
|
||||
|
||||
There is a sharp distinction between *submitting a task* and *executing the
|
||||
task*. When a remote function is called, the task of executing that function is
|
||||
submitted to the scheduler, and the scheduler immediately returns object
|
||||
references for the outputs of the task. However, the task will not be executed
|
||||
until the scheduler actually schedules the task on a worker.
|
||||
|
||||
When a task is submitted, each argument may be passed in by value or by object
|
||||
reference. For example, these lines have the same behavior.
|
||||
|
||||
```python
|
||||
>>> add(1, 2)
|
||||
>>> add(1, ray.put(2))
|
||||
>>> add(ray.put(1), ray.put(2))
|
||||
```
|
||||
|
||||
Remote functions never return actual values, they always return object
|
||||
references.
|
||||
|
||||
When the remote function is actually executed, it operates on Python objects.
|
||||
That is, if the remote function was called with any object references, the
|
||||
Python objects corresponding to those object references will be retrieved and
|
||||
passed into the actual execution of the remote function.
|
||||
|
||||
### Blocking computation
|
||||
|
||||
In a regular Python script, the specification of a computation is intimately
|
||||
linked to the actual execution of the code. For example, consider the following
|
||||
code.
|
||||
|
||||
```python
|
||||
import time
|
||||
|
||||
# This takes 50 seconds.
|
||||
for i in range(10):
|
||||
time.sleep(5)
|
||||
```
|
||||
|
||||
At the core of the above script, there are ten separate tasks, each of which
|
||||
sleeps for five seconds (this is a toy example, but you could imagine replacing
|
||||
the call to `sleep` with some computationally intensive task). These tasks do
|
||||
not depend on each other, so in principle, they could be executed in parallel.
|
||||
However, in the above implementation, they will be executed serially, which will
|
||||
take fifty seconds.
|
||||
|
||||
Ray gets around this by representing computation as a graph of tasks, where some
|
||||
tasks depend on the outputs of other tasks and where tasks can be executed once
|
||||
their dependencies are done.
|
||||
|
||||
For example, suppose we define the remote function `sleep` to be a wrapper
|
||||
around `time.sleep`.
|
||||
|
||||
```python
|
||||
@ray.remote([int], [int])
|
||||
def increment(x):
|
||||
return x + 1
|
||||
def sleep(n):
|
||||
time.sleep(n)
|
||||
return 0
|
||||
```
|
||||
|
||||
Note that, we can pass arguments into remote functions either by value or by
|
||||
object reference. That is, these two lines have the same behavior.
|
||||
Then we can write
|
||||
|
||||
```python
|
||||
>>> ray.get(example_functions.increment(1))
|
||||
2
|
||||
>>> ray.get(example_functions.increment(ray.put(1)))
|
||||
2
|
||||
```
|
||||
This is convenient for chaining remote functions together, for example,
|
||||
```python
|
||||
>>> ref = example_functions.increment(1)
|
||||
>>> ref = example_functions.increment(ref)
|
||||
>>> ref = example_functions.increment(ref)
|
||||
>>> ray.get(ref)
|
||||
4
|
||||
# Submit ten tasks to the scheduler. This finishes almost immediately.
|
||||
result_refs = []
|
||||
for i in range(10):
|
||||
result_refs.append(sleep(5))
|
||||
|
||||
# Wait for the results. If we have at least ten workers, this takes 5 seconds.
|
||||
[ray.get(ref) for ref in result_refs] # prints [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
|
||||
```
|
||||
|
||||
### Visualize the computation graph
|
||||
The for loop simply adds ten tasks to the computation graph, with no
|
||||
dependencies between the tasks. It executes almost instantaneously. Afterwards,
|
||||
we use `ray.get` to wait for the tasks to finish. If we have at least ten
|
||||
workers, then all ten tasks can be executed in parallel, and so the overall
|
||||
script should take five seconds.
|
||||
|
||||
### Visualizing the Computation Graph
|
||||
|
||||
The computation graph can be viewed as follows.
|
||||
|
||||
At any point, we can visualize the computation graph by running
|
||||
```python
|
||||
>>> ray.visualize_computation_graph(view=True)
|
||||
ray.visualize_computation_graph(view=True)
|
||||
```
|
||||
This will display an image like the following one.
|
||||
|
||||
<p align="center">
|
||||
<img src="figures/compgraph1.png" width="300">
|
||||
<img src="figures/compgraph1.png">
|
||||
</p>
|
||||
|
||||
### Restart workers
|
||||
In this figure, boxes are tasks and ovals are objects.
|
||||
|
||||
During development, suppose that you want to change the implementation of
|
||||
`example_functions.increment`, but you've already done a bunch of work in the
|
||||
shell loading and preprocessing data, and you don't want to have to recompute
|
||||
all of that work.
|
||||
The box that says `op-root` in it just refers to the overall script itself. The
|
||||
dotted lines indicate that the script launched 10 tasks (tasks are denoted by
|
||||
rectangular boxes). The solid lines indicate that each task produces one output
|
||||
(represented by an oval).
|
||||
|
||||
We can simply restart the workers.
|
||||
It is clear from the computation graph that these ten tasks can be executed in
|
||||
parallel.
|
||||
|
||||
First, change the code, for example, modify the function
|
||||
`example_functions.increment` in
|
||||
[`scripts/example_functions.py`](../scripts/example_functions.py) to add 10
|
||||
instead of 1.
|
||||
Computation graphs encode dependencies. For example, suppose we define
|
||||
|
||||
```python
|
||||
@ray.remote([int], [int])
|
||||
def increment(x):
|
||||
return x + 10
|
||||
```
|
||||
Then from the shell, restart the workers like this.
|
||||
```python
|
||||
>>> ray.restart_workers("scripts/example_worker.py") # This should be the correct relative path to the example_worker.py code
|
||||
```
|
||||
We can check that the code has been updated by running.
|
||||
```python
|
||||
>>> ray.get(example_functions.increment(1))
|
||||
11
|
||||
import numpy as np
|
||||
|
||||
@ray.remote([list], [np.ndarray])
|
||||
def zeros(shape):
|
||||
return np.zeros(shape)
|
||||
|
||||
@ray.remote([np.ndarray, np.ndarray], [np.ndarray])
|
||||
def dot(a, b):
|
||||
return np.dot(a, b)
|
||||
```
|
||||
|
||||
Note that it is not as simple as running `reload(example_functions)` because we
|
||||
need to reload the Python module on all of the workers as well, and the workers
|
||||
are separate Python processes. Calling `reload(example_functions)` would only
|
||||
reload the module on the driver.
|
||||
Then we run
|
||||
|
||||
```python
|
||||
aref = zeros([10, 10])
|
||||
bref = zeros([10, 10])
|
||||
cref = dot(aref, bref)
|
||||
```
|
||||
|
||||
The corresponding computation graph looks like this.
|
||||
|
||||
<p align="center">
|
||||
<img src="figures/compgraph2.png" width="300">
|
||||
</p>
|
||||
|
||||
The three dashed lines indicate that the script launched three tasks (the two
|
||||
`zeros` tasks and the one `dot` task). Each task produces a single output, and
|
||||
the `dot` task depends on the outputs of the two `zeros` tasks.
|
||||
|
||||
This makes it clear that the two `zeros` tasks can execute in parallel but that
|
||||
the `dot` task must wait until the two `zeros` tasks have finished.
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
## Using Ray on a cluster
|
||||
# Using Ray on a cluster
|
||||
|
||||
Running Ray on a cluster is still experimental.
|
||||
|
||||
|
@ -6,12 +6,12 @@ Ray can be used in several ways. In addition to running on a single machine, Ray
|
|||
is designed to run on a cluster of machines. This document is about how to use
|
||||
Ray on a cluster.
|
||||
|
||||
### Launching a cluster on EC2
|
||||
## Launching a cluster on EC2
|
||||
|
||||
This section describes how to start a cluster on EC2. These instructions are
|
||||
copied and adapted from https://github.com/amplab/spark-ec2.
|
||||
|
||||
#### Before you start
|
||||
### Before you start
|
||||
|
||||
- Create an Amazon EC2 key pair for yourself. This can be done by logging into
|
||||
your Amazon Web Services account through the [AWS
|
||||
|
@ -25,7 +25,7 @@ and secret access key. These can be obtained from the [AWS
|
|||
homepage](http://aws.amazon.com/) by clicking Account > Security Credentials >
|
||||
Access Credentials.
|
||||
|
||||
#### Launching a Cluster
|
||||
### Launching a Cluster
|
||||
|
||||
- Go into the `ray/scripts` directory.
|
||||
- Run `python ec2.py -k <keypair> -i <key-file> -s <num-slaves> launch
|
||||
|
@ -56,7 +56,7 @@ capacity in one zone, and you should try to launch in another.
|
|||
Instances](http://aws.amazon.com/ec2/spot-instances/), bidding for the given
|
||||
maximum price (in dollars).
|
||||
|
||||
### Getting started with Ray on a cluster
|
||||
## Getting started with Ray on a cluster
|
||||
|
||||
These instructions work on EC2, but they may require some modifications to run
|
||||
on your own cluster. In particular, on EC2, running `sudo` does not require a
|
||||
|
@ -125,8 +125,6 @@ For example,
|
|||
- `cluster.update_ray()` - This pulls the latest Ray source code and builds
|
||||
it.
|
||||
|
||||
### Running Ray on a cluster
|
||||
|
||||
Once you've started a Ray cluster using the above instructions, to actually use
|
||||
Ray, ssh to the head node (the first node listed in the `nodes.txt` file) and
|
||||
attach a shell to the already running cluster.
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
## Hyperparameter Optimization
|
||||
# Hyperparameter Optimization
|
||||
|
||||
This document provides a walkthrough of the hyperparameter optimization example.
|
||||
To run the application, first install this dependency.
|
||||
|
@ -22,7 +22,7 @@ Choosing these parameters can be challenging, and so a common practice is to
|
|||
search over the space of hyperparameters. One approach that works surprisingly
|
||||
well is to randomly sample different options.
|
||||
|
||||
### The serial version
|
||||
## The serial version
|
||||
|
||||
Suppose that we want to train a convolutional network, but we aren't sure how to
|
||||
choose the following hyperparameters:
|
||||
|
@ -74,7 +74,7 @@ Of course, as there are no dependencies between the different invocations of
|
|||
`train_cnn_and_compute_accuracy`, this computation could easily be parallelized
|
||||
over multiple cores or multiple machines. Let's do that now.
|
||||
|
||||
### The distributed version
|
||||
## The distributed version
|
||||
|
||||
First, let's turn `train_cnn_and_compute_accuracy` into a remote function in Ray
|
||||
by writing it as follows. In this example application, a slightly more
|
||||
|
@ -115,7 +115,7 @@ their values with `ray.get`.
|
|||
results = [(params, ray.get(ref)) for (params, ref) in result_refs]
|
||||
```
|
||||
|
||||
### Additional notes
|
||||
## Additional notes
|
||||
|
||||
**Early Stopping:** Sometimes when running an optimization, it is clear early on
|
||||
that the hyperparameters being used are bad (for example, the loss function may
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
## Batch L-BFGS
|
||||
# Batch L-BFGS
|
||||
|
||||
This document provides a walkthrough of the L-BFGS example. To run the
|
||||
application, first install these dependencies.
|
||||
|
@ -21,7 +21,7 @@ one such algorithm. It is a quasi-Newton method that uses gradient information
|
|||
to approximate the inverse Hessian of the loss function in a computationally
|
||||
efficient manner.
|
||||
|
||||
### The serial version
|
||||
## The serial version
|
||||
|
||||
First we load the data in batches. Here, each element in `batches` is a tuple
|
||||
whose first component is a batch of `100` images and whose second component is a
|
||||
|
@ -73,7 +73,7 @@ theta_init = 1e-2 * np.random.normal(size=dim)
|
|||
result = scipy.optimize.fmin_l_bfgs_b(full_loss, theta_init, fprime=full_grad)
|
||||
```
|
||||
|
||||
### The distributed version
|
||||
## The distributed version
|
||||
|
||||
In this example, the computation of the gradient itself can be done in parallel
|
||||
on a number of workers or machines.
|
||||
|
|
Loading…
Add table
Reference in a new issue