diff --git a/python/ray/tests/test_queue.py b/python/ray/tests/test_queue.py index 11b2a9a4c..df87a93f8 100644 --- a/python/ray/tests/test_queue.py +++ b/python/ray/tests/test_queue.py @@ -1,7 +1,9 @@ +import time + import pytest import ray -from ray.exceptions import GetTimeoutError +from ray.exceptions import GetTimeoutError, RayActorError from ray.util.queue import Queue, Empty, Full @@ -184,6 +186,31 @@ def test_qsize(ray_start_regular_shared): assert q.qsize() == size +def test_shutdown(ray_start_regular_shared): + q = Queue() + actor = q.actor + q.shutdown() + assert q.actor is None + with pytest.raises(RayActorError): + ray.get(actor.empty.remote()) + + +def test_custom_resources(ray_start_regular_shared): + current_resources = ray.available_resources() + assert current_resources["CPU"] == 1.0 + + # By default an actor should not reserve any resources. + Queue() + current_resources = ray.available_resources() + assert current_resources["CPU"] == 1.0 + + # Specify resource requirement. The queue should now reserve 1 CPU. + Queue(actor_options={"num_cpus": 1}) + time.sleep(1) + current_resources = ray.available_resources() + assert "CPU" not in current_resources, current_resources + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/queue.py b/python/ray/util/queue.py index 59be761de..627d64a7a 100644 --- a/python/ray/util/queue.py +++ b/python/ray/util/queue.py @@ -1,5 +1,5 @@ import asyncio -from typing import Optional, Any, List +from typing import Optional, Any, List, Dict from collections.abc import Iterable import ray @@ -28,6 +28,10 @@ class Queue: Args: maxsize (optional, int): maximum size of the queue. If zero, size is unbounded. + actor_options (optional, Dict): Dictionary of options to pass into + the QueueActor during creation. These are directly passed into + QueueActor.options(...). This could be useful if you + need to pass in custom resource requirements, for example. Examples: >>> q = Queue() @@ -36,11 +40,16 @@ class Queue: >>> q.put(item) >>> for item in items: >>> assert item == q.get() + >>> # Create Queue with the underlying actor reserving 1 CPU. + >>> q = Queue(actor_options={"num_cpus": 1}) """ - def __init__(self, maxsize: int = 0) -> None: + def __init__(self, maxsize: int = 0, + actor_options: Optional[Dict] = None) -> None: + actor_options = actor_options or {} self.maxsize = maxsize - self.actor = _QueueActor.remote(self.maxsize) + self.actor = ray.remote(_QueueActor).options(**actor_options).remote( + self.maxsize) def __len__(self) -> int: return self.size() @@ -212,8 +221,31 @@ class Queue: return ray.get(self.actor.get_nowait_batch.remote(num_items)) + def shutdown(self, force: bool = False, grace_period_s: int = 5) -> None: + """Terminates the underlying QueueActor. + + All of the resources reserved by the queue will be released. + + Args: + force (bool): If True, forcefully kill the actor, causing an + immediate failure. If False, graceful + actor termination will be attempted first, before falling back + to a forceful kill. + grace_period_s (int): If force is False, how long in seconds to + wait for graceful termination before falling back to + forceful kill. + """ + if self.actor: + if force: + ray.kill(self.actor, no_restart=True) + else: + done_ref = self.actor.__ray_terminate__.remote() + done, not_done = ray.wait([done_ref], timeout=grace_period_s) + if not_done: + ray.kill(self.actor, no_restart=True) + self.actor = None + -@ray.remote class _QueueActor: def __init__(self, maxsize): self.maxsize = maxsize