Rewrite the async api documentation (#6936)

* Rewrite the async api documentation

* Apply suggestions from code review

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* clearify comment

* Add quickstart

* Add reference for async in ray.get ray.wait docstring

Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This commit is contained in:
Simon Mo 2020-01-30 09:34:09 -08:00 committed by GitHub
parent 11d90d6d0c
commit 1e3a34b223
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 107 additions and 59 deletions

View file

@ -1,78 +1,119 @@
Async API (Experimental)
========================
Since Python 3.5, it is possible to write concurrent code using the ``async/await`` `syntax <https://docs.python.org/3/library/asyncio.html>`__.
Since Python 3.5, it is possible to write concurrent code using the
``async/await`` `syntax <https://docs.python.org/3/library/asyncio.html>`__.
Ray natively integrates with asyncio. You can use ray alongside with popular
async frameworks like aiohttp, aioredis, etc.
This document describes Ray's support for asyncio, which enables integration with popular async frameworks (e.g., aiohttp, aioredis, etc.) for high performance web and prediction serving.
Converting Ray objects into asyncio futures
-------------------------------------------
Ray object IDs can be converted into asyncio futures with ``ray.experimental.async_api``.
You can try it about by running the following snippet in ``ipython`` or a shell
that supports top level ``await``:
.. code-block:: python
import asyncio
import time
import ray
from ray.experimental import async_api
import ray
import asyncio
ray.init()
@ray.remote
def f():
time.sleep(1)
return {'key1': ['value']}
@ray.remote
class AsyncActor:
# multiple invocation of this method can be running in
# the event loop at the same time
async def run_concurrent(self):
print("started")
await asyncio.sleep(2) # concurrent workload here
print("finished")
ray.init()
future = async_api.as_future(f.remote())
asyncio.get_event_loop().run_until_complete(future) # {'key1': ['value']}
actor = AsyncActor.remote()
# regular ray.get
ray.get([actor.run_concurrent.remote() for _ in range(4)])
# async ray.get
await actor.run_concurrent.remote()
ObjectIDs as asyncio.Futures
----------------------------
ObjectIDs can be translated to asyncio.Future. This feature
make it possible to ``await`` on ray futures in existing concurrent
applications.
Instead of:
.. code-block:: python
@ray.remote
def some_task():
return 1
ray.get(some_task.remote())
ray.wait([some_task.remote()])
you can do:
.. code-block:: python
@ray.remote
def some_task():
return 1
await some_task.remote()
await asyncio.wait([some_task.remote()])
Please refer to `asyncio doc <https://docs.python.org/3/library/asyncio-task.html>`__
for more `asyncio` patterns including timeouts and ``asyncio.gather``.
.. autofunction:: ray.experimental.async_api.as_future
Async Actor
-----------
Ray also supports concurrent multitasking by executing many actor tasks at once.
To do so, you can define an actor with async methods:
.. code-block:: python
Example Usage
-------------
import asyncio
+----------------------------------------+-----------------------------------------------------+
| **Basic Python** | **Distributed with Ray** |
+----------------------------------------+-----------------------------------------------------+
| .. code-block:: python | .. code-block:: python |
| | |
| # Execute f serially. | # Execute f in parallel. |
| | |
| | |
| def f(): | @ray.remote |
| time.sleep(1) | def f(): |
| return 1 | time.sleep(1) |
| | return 1 |
| | |
| | ray.init() |
| results = [f() for i in range(4)] | results = ray.get([f.remote() for i in range(4)]) |
+----------------------------------------+-----------------------------------------------------+
| **Async Python** | **Async Ray** |
+----------------------------------------+-----------------------------------------------------+
| .. code-block:: python | .. code-block:: python |
| | |
| # Execute f asynchronously. | # Execute f asynchronously with Ray/asyncio. |
| | |
| | from ray.experimental import async_api |
| | |
| | @ray.remote |
| async def f(): | def f(): |
| await asyncio.sleep(1) | time.sleep(1) |
| return 1 | return 1 |
| | |
| | ray.init() |
| loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() |
| tasks = [f() for i in range(4)] | tasks = [async_api.as_future(f.remote()) |
| | for i in range(4)] |
| results = loop.run_until_complete( | results = loop.run_until_complete( |
| asyncio.gather(tasks)) | asyncio.gather(tasks)) |
+----------------------------------------+-----------------------------------------------------+
@ray.remote
class AsyncActor:
async def run_task(self):
print("started")
await asyncio.sleep(1) # Network, I/O task here
print("ended")
actor = AsyncActor.remote()
# All 50 tasks should start at once. After 1 second they should all finish.
# they should finish at the same time
ray.get([actor.run_task.remote() for _ in range(50)])
Under the hood, Ray runs all of the methods inside a single python event loop.
Please note that running blocking ``ray.get`` or ``ray.wait`` inside async
actor method is not allowed, because ``ray.get`` will block the execution
of the event loop.
You can limit the number of concurrent task running at once using the
``max_concurrency`` flag. By default, 1000 tasks can be running concurrently.
.. code-block:: python
import asyncio
@ray.remote
class AsyncActor:
async def run_task(self):
print("started")
await asyncio.sleep(1) # Network, I/O task here
print("ended")
actor = AsyncActor.options(max_concurreny=10).remote()
# Only 10 tasks will be running concurrently. Once 10 finish, the next 10 should run.
ray.get([actor.run_task.remote() for _ in range(50)])
Known Issues
------------
Async API support is experimental, and we are working to improve its performance. Please `let us know <https://github.com/ray-project/ray/issues>`__ any issues you encounter.
Async API support is experimental, and we are working to improve it.
Please `let us know <https://github.com/ray-project/ray/issues>`__
any issues you encounter.

View file

@ -1459,6 +1459,10 @@ def get(object_ids, timeout=None):
object has been created). If object_ids is a list, then the objects
corresponding to each object in the list will be returned.
This method will error will error if it's running inside async context,
you can use ``await object_id`` instead of ``ray.get(object_id)``. For
a list of object ids, you can use ``await asyncio.gather(*object_ids)``.
Args:
object_ids: Object ID of the object to get or a list of object IDs to
get.
@ -1573,6 +1577,9 @@ def wait(object_ids, num_returns=1, timeout=None):
precede B in the ready list. This also holds true if A and B are both in
the remaining list.
This method will error if it's running inside an async context. Instead of
``ray.wait(object_ids)``, you can use ``await asyncio.wait(object_ids)``.
Args:
object_ids (List[ObjectID]): List of object IDs for objects that may or
may not be ready. Note that these IDs must be unique.