[Queue] Add options and shutdown to Queue (#12932)

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Amog Kamsetty 2020-12-20 00:42:21 -08:00 committed by GitHub
parent 51139ed37c
commit 4c63917439
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 5 deletions

View file

@ -1,7 +1,9 @@
import time
import pytest
import ray
from ray.exceptions import GetTimeoutError
from ray.exceptions import GetTimeoutError, RayActorError
from ray.util.queue import Queue, Empty, Full
@ -184,6 +186,31 @@ def test_qsize(ray_start_regular_shared):
assert q.qsize() == size
def test_shutdown(ray_start_regular_shared):
q = Queue()
actor = q.actor
q.shutdown()
assert q.actor is None
with pytest.raises(RayActorError):
ray.get(actor.empty.remote())
def test_custom_resources(ray_start_regular_shared):
current_resources = ray.available_resources()
assert current_resources["CPU"] == 1.0
# By default an actor should not reserve any resources.
Queue()
current_resources = ray.available_resources()
assert current_resources["CPU"] == 1.0
# Specify resource requirement. The queue should now reserve 1 CPU.
Queue(actor_options={"num_cpus": 1})
time.sleep(1)
current_resources = ray.available_resources()
assert "CPU" not in current_resources, current_resources
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -1,5 +1,5 @@
import asyncio
from typing import Optional, Any, List
from typing import Optional, Any, List, Dict
from collections.abc import Iterable
import ray
@ -28,6 +28,10 @@ class Queue:
Args:
maxsize (optional, int): maximum size of the queue. If zero, size is
unbounded.
actor_options (optional, Dict): Dictionary of options to pass into
the QueueActor during creation. These are directly passed into
QueueActor.options(...). This could be useful if you
need to pass in custom resource requirements, for example.
Examples:
>>> q = Queue()
@ -36,11 +40,16 @@ class Queue:
>>> q.put(item)
>>> for item in items:
>>> assert item == q.get()
>>> # Create Queue with the underlying actor reserving 1 CPU.
>>> q = Queue(actor_options={"num_cpus": 1})
"""
def __init__(self, maxsize: int = 0) -> None:
def __init__(self, maxsize: int = 0,
actor_options: Optional[Dict] = None) -> None:
actor_options = actor_options or {}
self.maxsize = maxsize
self.actor = _QueueActor.remote(self.maxsize)
self.actor = ray.remote(_QueueActor).options(**actor_options).remote(
self.maxsize)
def __len__(self) -> int:
return self.size()
@ -212,8 +221,31 @@ class Queue:
return ray.get(self.actor.get_nowait_batch.remote(num_items))
def shutdown(self, force: bool = False, grace_period_s: int = 5) -> None:
"""Terminates the underlying QueueActor.
All of the resources reserved by the queue will be released.
Args:
force (bool): If True, forcefully kill the actor, causing an
immediate failure. If False, graceful
actor termination will be attempted first, before falling back
to a forceful kill.
grace_period_s (int): If force is False, how long in seconds to
wait for graceful termination before falling back to
forceful kill.
"""
if self.actor:
if force:
ray.kill(self.actor, no_restart=True)
else:
done_ref = self.actor.__ray_terminate__.remote()
done, not_done = ray.wait([done_ref], timeout=grace_period_s)
if not_done:
ray.kill(self.actor, no_restart=True)
self.actor = None
@ray.remote
class _QueueActor:
def __init__(self, maxsize):
self.maxsize = maxsize