From 3fd3cb96ed41cea13881a8a1a04b38a12d9183e0 Mon Sep 17 00:00:00 2001 From: architkulkarni Date: Thu, 10 Dec 2020 08:49:18 -0800 Subject: [PATCH] [Utils] Add Queue async and batch methods (#12578) --- python/ray/tests/test_queue.py | 66 ++++++++++++- python/ray/util/queue.py | 168 ++++++++++++++++++++++++++++----- 2 files changed, 210 insertions(+), 24 deletions(-) diff --git a/python/ray/tests/test_queue.py b/python/ray/tests/test_queue.py index a3f39ec4b..11b2a9a4c 100644 --- a/python/ray/tests/test_queue.py +++ b/python/ray/tests/test_queue.py @@ -5,6 +5,7 @@ from ray.exceptions import GetTimeoutError from ray.util.queue import Queue, Empty, Full +# Remote helper functions for testing concurrency @ray.remote def async_get(queue): return queue.get(block=True) @@ -50,6 +51,29 @@ def test_get(ray_start_regular_shared): q.get(timeout=0.2) +@pytest.mark.asyncio +async def test_get_async(ray_start_regular_shared): + + q = Queue() + + item = 0 + await q.put_async(item) + assert await q.get_async(block=False) == item + + item = 1 + await q.put_async(item) + assert await q.get_async(timeout=0.2) == item + + with pytest.raises(ValueError): + await q.get_async(timeout=-1) + + with pytest.raises(Empty): + await q.get_async(block=False) + + with pytest.raises(Empty): + await q.get_async(timeout=0.2) + + def test_put(ray_start_regular_shared): q = Queue(1) @@ -73,7 +97,31 @@ def test_put(ray_start_regular_shared): q.put(1, timeout=0.2) -def test_async_get(ray_start_regular_shared): +@pytest.mark.asyncio +async def test_put_async(ray_start_regular_shared): + + q = Queue(1) + + item = 0 + await q.put_async(item, block=False) + assert await q.get_async() == item + + item = 1 + await q.put_async(item, timeout=0.2) + assert await q.get_async() == item + + with pytest.raises(ValueError): + await q.put_async(0, timeout=-1) + + await q.put_async(0) + with pytest.raises(Full): + await q.put_async(1, block=False) + + with pytest.raises(Full): + await q.put_async(1, timeout=0.2) + + +def test_concurrent_get(ray_start_regular_shared): q = Queue() future = async_get.remote(q) @@ -87,7 +135,7 @@ def test_async_get(ray_start_regular_shared): assert ray.get(future) == 1 -def test_async_put(ray_start_regular_shared): +def test_concurrent_put(ray_start_regular_shared): q = Queue(1) q.put(1) future = async_put.remote(q, 2) @@ -102,6 +150,20 @@ def test_async_put(ray_start_regular_shared): assert q.get() == 2 +def test_batch(ray_start_regular_shared): + q = Queue(1) + + with pytest.raises(Full): + q.put_nowait_batch([1, 2]) + + with pytest.raises(Empty): + q.get_nowait_batch(1) + + big_q = Queue(100) + big_q.put_nowait_batch(list(range(100))) + assert big_q.get_nowait_batch(100) == list(range(100)) + + def test_qsize(ray_start_regular_shared): q = Queue() diff --git a/python/ray/util/queue.py b/python/ray/util/queue.py index cc6f6bd05..59be761de 100644 --- a/python/ray/util/queue.py +++ b/python/ray/util/queue.py @@ -1,4 +1,6 @@ import asyncio +from typing import Optional, Any, List +from collections.abc import Iterable import ray @@ -12,44 +14,69 @@ class Full(Exception): class Queue: - """Queue implementation on Ray. + """A first-in, first-out queue implementation on Ray. + + The behavior and use cases are similar to those of the asyncio.Queue class. + + Features both sync and async put and get methods. Provides the option to + block until space is available when calling put on a full queue, + or to block until items are available when calling get on an empty queue. + + Optionally supports batched put and get operations to minimize + serialization overhead. Args: - maxsize (int): maximum size of the queue. If zero, size is unbounded. + maxsize (optional, int): maximum size of the queue. If zero, size is + unbounded. + + Examples: + >>> q = Queue() + >>> items = list(range(10)) + >>> for item in items: + >>> q.put(item) + >>> for item in items: + >>> assert item == q.get() """ - def __init__(self, maxsize=0): - self.actor = _QueueActor.remote(maxsize) + def __init__(self, maxsize: int = 0) -> None: + self.maxsize = maxsize + self.actor = _QueueActor.remote(self.maxsize) - def __len__(self): + def __len__(self) -> int: return self.size() - def size(self): + def size(self) -> int: """The size of the queue.""" return ray.get(self.actor.qsize.remote()) - def qsize(self): + def qsize(self) -> int: """The size of the queue.""" return self.size() - def empty(self): + def empty(self) -> bool: """Whether the queue is empty.""" return ray.get(self.actor.empty.remote()) - def full(self): + def full(self) -> bool: """Whether the queue is full.""" return ray.get(self.actor.full.remote()) - def put(self, item, block=True, timeout=None): + def put(self, + item: Any, + block: bool = True, + timeout: Optional[float] = None) -> None: """Adds an item to the queue. + If block is True and the queue is full, blocks until the queue is no + longer full or until timeout. + There is no guarantee of order if multiple producers put to the same full queue. Raises: - Full if the queue is full and blocking is False. - Full if the queue is full, blocking is True, and it timed out. - ValueError if timeout is negative. + Full: if the queue is full and blocking is False. + Full: if the queue is full, blocking is True, and it timed out. + ValueError: if timeout is negative. """ if not block: try: @@ -62,9 +89,40 @@ class Queue: else: ray.get(self.actor.put.remote(item, timeout)) - def get(self, block=True, timeout=None): + async def put_async(self, + item: Any, + block: bool = True, + timeout: Optional[float] = None) -> None: + """Adds an item to the queue. + + If block is True and the queue is full, + blocks until the queue is no longer full or until timeout. + + There is no guarantee of order if multiple producers put to the same + full queue. + + Raises: + Full: if the queue is full and blocking is False. + Full: if the queue is full, blocking is True, and it timed out. + ValueError: if timeout is negative. + """ + if not block: + try: + await self.actor.put_nowait.remote(item) + except asyncio.QueueFull: + raise Full + else: + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + await self.actor.put.remote(item, timeout) + + def get(self, block: bool = True, timeout: Optional[float] = None) -> Any: """Gets an item from the queue. + If block is True and the queue is empty, blocks until the queue is no + longer empty or until timeout. + There is no guarantee of order if multiple consumers get from the same empty queue. @@ -72,9 +130,9 @@ class Queue: The next item in the queue. Raises: - Empty if the queue is empty and blocking is False. - Empty if the queue is empty, blocking is True, and it timed out. - ValueError if timeout is negative. + Empty: if the queue is empty and blocking is False. + Empty: if the queue is empty, blocking is True, and it timed out. + ValueError: if timeout is negative. """ if not block: try: @@ -87,27 +145,79 @@ class Queue: else: return ray.get(self.actor.get.remote(timeout)) - def put_nowait(self, item): + async def get_async(self, + block: bool = True, + timeout: Optional[float] = None) -> Any: + """Gets an item from the queue. + + There is no guarantee of order if multiple consumers get from the + same empty queue. + + Returns: + The next item in the queue. + Raises: + Empty: if the queue is empty and blocking is False. + Empty: if the queue is empty, blocking is True, and it timed out. + ValueError: if timeout is negative. + """ + if not block: + try: + return await self.actor.get_nowait.remote() + except asyncio.QueueEmpty: + raise Empty + else: + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + return await self.actor.get.remote(timeout) + + def put_nowait(self, item: Any) -> None: """Equivalent to put(item, block=False). Raises: - Full if the queue is full. + Full: if the queue is full. """ return self.put(item, block=False) - def get_nowait(self): + def put_nowait_batch(self, items: Iterable) -> None: + """Takes in a list of items and puts them into the queue in order. + + Raises: + Full: if the items will not fit in the queue + """ + if not isinstance(items, Iterable): + raise TypeError("Argument 'items' must be an Iterable") + + ray.get(self.actor.put_nowait_batch.remote(items)) + + def get_nowait(self) -> Any: """Equivalent to get(block=False). Raises: - Empty if the queue is empty. + Empty: if the queue is empty. """ return self.get(block=False) + def get_nowait_batch(self, num_items: int) -> List[Any]: + """Gets items from the queue and returns them in a + list in order. + + Raises: + Empty: if the queue does not contain the desired number of items + """ + if not isinstance(num_items, int): + raise TypeError("Argument 'num_items' must be an int") + if num_items < 0: + raise ValueError("'num_items' must be nonnegative") + + return ray.get(self.actor.get_nowait_batch.remote(num_items)) + @ray.remote class _QueueActor: def __init__(self, maxsize): - self.queue = asyncio.Queue(maxsize) + self.maxsize = maxsize + self.queue = asyncio.Queue(self.maxsize) def qsize(self): return self.queue.qsize() @@ -133,5 +243,19 @@ class _QueueActor: def put_nowait(self, item): self.queue.put_nowait(item) + def put_nowait_batch(self, items): + # If maxsize is 0, queue is unbounded, so no need to check size. + if self.maxsize > 0 and len(items) + self.qsize() > self.maxsize: + raise Full(f"Cannot add {len(items)} items to queue of size " + f"{self.qsize()} and maxsize {self.maxsize}.") + for item in items: + self.queue.put_nowait(item) + def get_nowait(self): return self.queue.get_nowait() + + def get_nowait_batch(self, num_items): + if num_items > self.qsize(): + raise Empty(f"Cannot get {num_items} items from queue of size " + f"{self.qsize()}.") + return [self.queue.get_nowait() for _ in range(num_items)]