diff --git a/doc/source/async_api.rst b/doc/source/async_api.rst index 76b8b68ff..0c49a438f 100644 --- a/doc/source/async_api.rst +++ b/doc/source/async_api.rst @@ -1,78 +1,119 @@ Async API (Experimental) ======================== -Since Python 3.5, it is possible to write concurrent code using the ``async/await`` `syntax `__. +Since Python 3.5, it is possible to write concurrent code using the +``async/await`` `syntax `__. +Ray natively integrates with asyncio. You can use ray alongside with popular +async frameworks like aiohttp, aioredis, etc. -This document describes Ray's support for asyncio, which enables integration with popular async frameworks (e.g., aiohttp, aioredis, etc.) for high performance web and prediction serving. - - -Converting Ray objects into asyncio futures -------------------------------------------- - -Ray object IDs can be converted into asyncio futures with ``ray.experimental.async_api``. +You can try it about by running the following snippet in ``ipython`` or a shell +that supports top level ``await``: .. code-block:: python - import asyncio - import time - import ray - from ray.experimental import async_api + import ray + import asyncio + ray.init() - @ray.remote - def f(): - time.sleep(1) - return {'key1': ['value']} + @ray.remote + class AsyncActor: + # multiple invocation of this method can be running in + # the event loop at the same time + async def run_concurrent(self): + print("started") + await asyncio.sleep(2) # concurrent workload here + print("finished") - ray.init() - future = async_api.as_future(f.remote()) - asyncio.get_event_loop().run_until_complete(future) # {'key1': ['value']} + actor = AsyncActor.remote() + + # regular ray.get + ray.get([actor.run_concurrent.remote() for _ in range(4)]) + + # async ray.get + await actor.run_concurrent.remote() + + +ObjectIDs as asyncio.Futures +---------------------------- +ObjectIDs can be translated to asyncio.Future. This feature +make it possible to ``await`` on ray futures in existing concurrent +applications. + +Instead of: + +.. code-block:: python + + @ray.remote + def some_task(): + return 1 + + ray.get(some_task.remote()) + ray.wait([some_task.remote()]) + +you can do: + +.. code-block:: python + + @ray.remote + def some_task(): + return 1 + + await some_task.remote() + await asyncio.wait([some_task.remote()]) + +Please refer to `asyncio doc `__ +for more `asyncio` patterns including timeouts and ``asyncio.gather``. -.. autofunction:: ray.experimental.async_api.as_future +Async Actor +----------- +Ray also supports concurrent multitasking by executing many actor tasks at once. +To do so, you can define an actor with async methods: +.. code-block:: python -Example Usage -------------- + import asyncio -+----------------------------------------+-----------------------------------------------------+ -| **Basic Python** | **Distributed with Ray** | -+----------------------------------------+-----------------------------------------------------+ -| .. code-block:: python | .. code-block:: python | -| | | -| # Execute f serially. | # Execute f in parallel. | -| | | -| | | -| def f(): | @ray.remote | -| time.sleep(1) | def f(): | -| return 1 | time.sleep(1) | -| | return 1 | -| | | -| | ray.init() | -| results = [f() for i in range(4)] | results = ray.get([f.remote() for i in range(4)]) | -+----------------------------------------+-----------------------------------------------------+ -| **Async Python** | **Async Ray** | -+----------------------------------------+-----------------------------------------------------+ -| .. code-block:: python | .. code-block:: python | -| | | -| # Execute f asynchronously. | # Execute f asynchronously with Ray/asyncio. | -| | | -| | from ray.experimental import async_api | -| | | -| | @ray.remote | -| async def f(): | def f(): | -| await asyncio.sleep(1) | time.sleep(1) | -| return 1 | return 1 | -| | | -| | ray.init() | -| loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | -| tasks = [f() for i in range(4)] | tasks = [async_api.as_future(f.remote()) | -| | for i in range(4)] | -| results = loop.run_until_complete( | results = loop.run_until_complete( | -| asyncio.gather(tasks)) | asyncio.gather(tasks)) | -+----------------------------------------+-----------------------------------------------------+ + @ray.remote + class AsyncActor: + async def run_task(self): + print("started") + await asyncio.sleep(1) # Network, I/O task here + print("ended") + + actor = AsyncActor.remote() + # All 50 tasks should start at once. After 1 second they should all finish. + # they should finish at the same time + ray.get([actor.run_task.remote() for _ in range(50)]) + +Under the hood, Ray runs all of the methods inside a single python event loop. +Please note that running blocking ``ray.get`` or ``ray.wait`` inside async +actor method is not allowed, because ``ray.get`` will block the execution +of the event loop. + +You can limit the number of concurrent task running at once using the +``max_concurrency`` flag. By default, 1000 tasks can be running concurrently. + +.. code-block:: python + + import asyncio + + @ray.remote + class AsyncActor: + async def run_task(self): + print("started") + await asyncio.sleep(1) # Network, I/O task here + print("ended") + + actor = AsyncActor.options(max_concurreny=10).remote() + + # Only 10 tasks will be running concurrently. Once 10 finish, the next 10 should run. + ray.get([actor.run_task.remote() for _ in range(50)]) Known Issues ------------ -Async API support is experimental, and we are working to improve its performance. Please `let us know `__ any issues you encounter. +Async API support is experimental, and we are working to improve it. +Please `let us know `__ +any issues you encounter. diff --git a/python/ray/worker.py b/python/ray/worker.py index 22e5a1e69..bcaee234b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1459,6 +1459,10 @@ def get(object_ids, timeout=None): object has been created). If object_ids is a list, then the objects corresponding to each object in the list will be returned. + This method will error will error if it's running inside async context, + you can use ``await object_id`` instead of ``ray.get(object_id)``. For + a list of object ids, you can use ``await asyncio.gather(*object_ids)``. + Args: object_ids: Object ID of the object to get or a list of object IDs to get. @@ -1573,6 +1577,9 @@ def wait(object_ids, num_returns=1, timeout=None): precede B in the ready list. This also holds true if A and B are both in the remaining list. + This method will error if it's running inside an async context. Instead of + ``ray.wait(object_ids)``, you can use ``await asyncio.wait(object_ids)``. + Args: object_ids (List[ObjectID]): List of object IDs for objects that may or may not be ready. Note that these IDs must be unique.