AsyncIO / Concurrency for Actors ================================ Since Python 3.5, it is possible to write concurrent code using the ``async/await`` `syntax `__. Ray natively integrates with asyncio. You can use ray alongside with popular async frameworks like aiohttp, aioredis, etc. You can try it about by running the following snippet in ``ipython`` or a shell that supports top level ``await``: .. code-block:: python import ray import asyncio ray.init() @ray.remote class AsyncActor: # multiple invocation of this method can be running in # the event loop at the same time async def run_concurrent(self): print("started") await asyncio.sleep(2) # concurrent workload here print("finished") actor = AsyncActor.remote() # regular ray.get ray.get([actor.run_concurrent.remote() for _ in range(4)]) # async ray.get await actor.run_concurrent.remote() ObjectIDs as asyncio.Futures ---------------------------- ObjectIDs can be translated to asyncio.Future. This feature make it possible to ``await`` on ray futures in existing concurrent applications. Instead of: .. code-block:: python @ray.remote def some_task(): return 1 ray.get(some_task.remote()) ray.wait([some_task.remote()]) you can do: .. code-block:: python @ray.remote def some_task(): return 1 await some_task.remote() await asyncio.wait([some_task.remote()]) Please refer to `asyncio doc `__ for more `asyncio` patterns including timeouts and ``asyncio.gather``. Async Actor ----------- Ray also supports concurrent multitasking by executing many actor tasks at once. To do so, you can define an actor with async methods: .. code-block:: python import asyncio @ray.remote class AsyncActor: async def run_task(self): print("started") await asyncio.sleep(1) # Network, I/O task here print("ended") actor = AsyncActor.remote() # All 50 tasks should start at once. After 1 second they should all finish. # they should finish at the same time ray.get([actor.run_task.remote() for _ in range(50)]) Under the hood, Ray runs all of the methods inside a single python event loop. Please note that running blocking ``ray.get`` or ``ray.wait`` inside async actor method is not allowed, because ``ray.get`` will block the execution of the event loop. You can limit the number of concurrent task running at once using the ``max_concurrency`` flag. By default, 1000 tasks can be running concurrently. .. code-block:: python import asyncio @ray.remote class AsyncActor: async def run_task(self): print("started") await asyncio.sleep(1) # Network, I/O task here print("ended") actor = AsyncActor.options(max_concurrency=10).remote() # Only 10 tasks will be running concurrently. Once 10 finish, the next 10 should run. ray.get([actor.run_task.remote() for _ in range(50)]) Known Issues ------------ Async API support is experimental, and we are working to improve it. Please `let us know `__ any issues you encounter.