Change timeout from milliseconds to seconds in ray.wait. (#3706)

* Change timeout from milliseconds to seconds in ray.wait.

* Suppress warning.

* Suppress warning.

* Add prominent warning in API documentation.
This commit is contained in:
Robert Nishihara 2019-01-08 21:32:08 -08:00 committed by Philipp Moritz
parent 59d861281e
commit d1e21b702e
11 changed files with 58 additions and 37 deletions

View file

@ -19,7 +19,7 @@ def sleep(x):
class WaitSuite(object):
timeout = 10
timeout = 0.01
timer = time.time
def time_wait_task(self):
@ -35,5 +35,5 @@ class WaitSuite(object):
def time_wait_timeout(self, timeout):
ray.wait([sleep.remote(0.5)], timeout=timeout)
time_wait_timeout.params = [200, 800]
time_wait_timeout.param_names = ["timeout_ms"]
time_wait_timeout.params = [0.2, 0.8]
time_wait_timeout.param_names = ["timeout"]

View file

@ -52,8 +52,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None):
List like of object IDs for objects that may or may not be ready.
Note that these IDs must be unique.
num_returns (int): The number of object IDs that should be returned.
timeout (int): The maximum amount of time in milliseconds to wait
before returning.
timeout (float): The maximum amount of time in seconds to wait before
returning.
Returns:
A list of object IDs that are ready and a list of the remaining object
@ -61,6 +61,11 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None):
"""
worker = ray.worker.global_worker if worker is None else worker
if isinstance(object_ids, (tuple, np.ndarray)):
return ray.wait(list(object_ids), num_returns, timeout, worker)
return ray.wait(
list(object_ids),
num_returns=num_returns,
timeout=timeout,
worker=worker)
return ray.wait(object_ids, num_returns, timeout, worker)
return ray.wait(
object_ids, num_returns=num_returns, timeout=timeout, worker=worker)

View file

@ -32,7 +32,7 @@ def collect_episodes(local_evaluator,
for a in remote_evaluators
]
collected, _ = ray.wait(
pending, num_returns=len(pending), timeout=timeout_seconds * 1000)
pending, num_returns=len(pending), timeout=timeout_seconds * 1.0)
num_metric_batches_dropped = len(pending) - len(collected)
metric_lists = ray.get(collected)

View file

@ -28,7 +28,8 @@ class TaskPool(object):
def completed(self):
pending = list(self._tasks)
if pending:
ready, _ = ray.wait(pending, num_returns=len(pending), timeout=10)
ready, _ = ray.wait(
pending, num_returns=len(pending), timeout=0.01)
for obj_id in ready:
yield (self._tasks.pop(obj_id), self._objects.pop(obj_id))

View file

@ -20,7 +20,8 @@ class Filter(object):
"""Creates a new object with same state as self.
Returns:
copy (Filter): Copy of self"""
A copy of self.
"""
raise NotImplementedError
def sync(self, other):

View file

@ -111,7 +111,7 @@ class RayTrialExecutor(TrialExecutor):
stop_tasks.append(trial.runner.__ray_terminate__.remote())
# TODO(ekl) seems like wait hangs when killing actors
_, unfinished = ray.wait(
stop_tasks, num_returns=2, timeout=250)
stop_tasks, num_returns=2, timeout=0.25)
except Exception:
logger.exception("Error stopping runner.")
self.set_status(trial, Trial.ERROR)

View file

@ -2259,6 +2259,11 @@ def put(value, worker=global_worker):
def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
"""Return a list of IDs that are ready and a list of IDs that are not.
.. warning::
The **timeout** argument used to be in **milliseconds** (up through
``ray==0.6.1``) and now it is in **seconds**.
If timeout is set, the function returns either when the requested number of
IDs are ready or when the timeout is reached, whichever occurs first. If it
is not set, the function simply waits until that number of objects is ready
@ -2278,8 +2283,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
object_ids (List[ObjectID]): List of object IDs for objects that may or
may not be ready. Note that these IDs must be unique.
num_returns (int): The number of object IDs that should be returned.
timeout (int): The maximum amount of time in milliseconds to wait
before returning.
timeout (float): The maximum amount of time in seconds to wait before
returning.
Returns:
A list of object IDs that are ready and a list of the remaining object
@ -2294,6 +2299,15 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
raise TypeError("wait() expected a list of ObjectID, got {}".format(
type(object_ids)))
if isinstance(timeout, int) and timeout != 0:
logger.warning("The 'timeout' argument now requires seconds instead "
"of milliseconds. This message can be suppressed by "
"passing in a float.")
if timeout is not None and timeout < 0:
raise ValueError("The 'timeout' argument must be nonnegative. "
"Received {}".format(timeout))
if worker.mode != LOCAL_MODE:
for object_id in object_ids:
if not isinstance(object_id, ray.ObjectID):
@ -2328,9 +2342,11 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
with worker.state_lock:
current_task_id = worker.get_current_thread_task_id()
timeout = timeout if timeout is not None else 2**30
timeout = timeout if timeout is not None else 10**6
timeout_milliseconds = int(timeout * 1000)
ready_ids, remaining_ids = worker.raylet_client.wait(
object_ids, num_returns, timeout, False, current_task_id)
object_ids, num_returns, timeout_milliseconds, False,
current_task_id)
return ready_ids, remaining_ids

View file

@ -759,7 +759,7 @@ def test_actors_on_nodes_with_no_cpus(ray_start_regular):
pass
f = Foo.remote()
ready_ids, _ = ray.wait([f.method.remote()], timeout=100)
ready_ids, _ = ray.wait([f.method.remote()], timeout=0.1)
assert ready_ids == []
@ -843,7 +843,7 @@ def test_actor_gpus(ray_start_cluster):
# Creating a new actor should fail because all of the GPUs are being
# used.
a = Actor1.remote()
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10)
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01)
assert ready_ids == []
@ -884,7 +884,7 @@ def test_actor_multiple_gpus(ray_start_cluster):
# Creating a new actor should fail because all of the GPUs are being
# used.
a = Actor1.remote()
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10)
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01)
assert ready_ids == []
# We should be able to create more actors that use only a single GPU.
@ -913,7 +913,7 @@ def test_actor_multiple_gpus(ray_start_cluster):
# Creating a new actor should fail because all of the GPUs are being
# used.
a = Actor2.remote()
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10)
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01)
assert ready_ids == []
@ -953,7 +953,7 @@ def test_actor_different_numbers_of_gpus(ray_start_cluster):
# Creating a new actor should fail because all of the GPUs are being
# used.
a = Actor1.remote()
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10)
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01)
assert ready_ids == []
@ -1030,7 +1030,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster):
# All the GPUs should be used up now.
a = Actor.remote()
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10)
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01)
assert ready_ids == []
@ -1165,7 +1165,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
# Now if we run some GPU tasks, they should not be scheduled.
results = [f1.remote() for _ in range(30)]
ready_ids, remaining_ids = ray.wait(results, timeout=1000)
ready_ids, remaining_ids = ray.wait(results, timeout=1.0)
assert len(ready_ids) == 0
@ -1274,7 +1274,7 @@ def test_blocking_actor_task(shutdown_only):
# block.
actor = CPUFoo.remote()
x_id = actor.blocking_method.remote()
ready_ids, remaining_ids = ray.wait([x_id], timeout=1000)
ready_ids, remaining_ids = ray.wait([x_id], timeout=1.0)
assert ready_ids == []
assert remaining_ids == [x_id]
@ -1289,7 +1289,7 @@ def test_blocking_actor_task(shutdown_only):
# Make sure that GPU resources are not released when actors block.
actor = GPUFoo.remote()
x_id = actor.blocking_method.remote()
ready_ids, remaining_ids = ray.wait([x_id], timeout=1000)
ready_ids, remaining_ids = ray.wait([x_id], timeout=1.0)
assert ready_ids == []
assert remaining_ids == [x_id]
@ -2010,7 +2010,7 @@ def test_lifetime_and_transient_resources(ray_start_regular):
actor2s = [Actor2.remote() for _ in range(2)]
results = [a.method.remote() for a in actor2s]
ready_ids, remaining_ids = ray.wait(
results, num_returns=len(results), timeout=1000)
results, num_returns=len(results), timeout=1.0)
assert len(ready_ids) == 1
@ -2066,7 +2066,7 @@ def test_creating_more_actors_than_resources(shutdown_only):
ray.wait([result2])
actor3 = ResourceActor1.remote()
result3 = actor3.method.remote()
ready_ids, _ = ray.wait([result3], timeout=200)
ready_ids, _ = ray.wait([result3], timeout=0.2)
assert len(ready_ids) == 0
# By deleting actor1, we free up resources to create actor3.

View file

@ -389,9 +389,7 @@ def test_actor_creation_node_failure(ray_start_cluster):
# reconstruction for any actor creation tasks that were forwarded
# to nodes that then failed.
ready, _ = ray.wait(
children_out,
num_returns=len(children_out),
timeout=5 * 60 * 1000)
children_out, num_returns=len(children_out), timeout=5 * 60.0)
assert len(ready) == len(children_out)
# Replace any actors that died.

View file

@ -337,7 +337,7 @@ def test_actor_worker_dying(ray_start_regular):
pass
a = Actor.remote()
[obj], _ = ray.wait([a.kill.remote()], timeout=5000)
[obj], _ = ray.wait([a.kill.remote()], timeout=5.0)
with pytest.raises(Exception):
ray.get(obj)
with pytest.raises(Exception):

View file

@ -753,7 +753,7 @@ def test_defining_remote_functions(shutdown_only):
args=[], num_cpus=1, num_gpus=1,
resources={"Custom": 1})) == [0]
infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1})
ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=50)
ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=0.05)
assert len(ready_ids) == 0
assert len(remaining_ids) == 1
@ -828,14 +828,14 @@ def test_wait(shutdown_only):
objectids = [f.remote(0.5), f.remote(0.5), f.remote(0.5), f.remote(0.5)]
start_time = time.time()
ready_ids, remaining_ids = ray.wait(objectids, timeout=1750, num_returns=4)
ready_ids, remaining_ids = ray.wait(objectids, timeout=1.75, num_returns=4)
assert time.time() - start_time < 2
assert len(ready_ids) == 3
assert len(remaining_ids) == 1
ray.wait(objectids)
objectids = [f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5)]
start_time = time.time()
ready_ids, remaining_ids = ray.wait(objectids, timeout=5000)
ready_ids, remaining_ids = ray.wait(objectids, timeout=5.0)
assert time.time() - start_time < 5
assert len(ready_ids) == 1
assert len(remaining_ids) == 3
@ -1302,13 +1302,13 @@ def test_free_objects_multi_node(ray_start_cluster):
]
# Case 1: run this local_only=False. All 3 objects will be deleted.
(a, b, c) = run_one_test(actors, False)
(l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=1)
(l1, l2) = ray.wait([a, b, c], timeout=0.01, num_returns=1)
# All the objects are deleted.
assert len(l1) == 0
assert len(l2) == 3
# Case 2: run this local_only=True. Only 1 object will be deleted.
(a, b, c) = run_one_test(actors, True)
(l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=3)
(l1, l2) = ray.wait([a, b, c], timeout=0.01, num_returns=3)
# One object is deleted and 2 objects are not.
assert len(l1) == 2
assert len(l2) == 1
@ -1740,7 +1740,7 @@ def test_fractional_resources(shutdown_only):
# custom resource. TODO(rkn): Re-enable this once ray.wait is
# implemented.
f2 = Foo2._remote([], {}, resources={"Custom": 0.7})
ready, _ = ray.wait([f2.method.remote()], timeout=500)
ready, _ = ray.wait([f2.method.remote()], timeout=0.5)
assert len(ready) == 0
# Make sure we can start an actor that requries only 0.3 of the custom
# resource.
@ -1977,7 +1977,7 @@ def test_two_custom_resources(ray_start_cluster):
# Make sure that tasks with unsatisfied custom resource requirements do
# not get scheduled.
ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], timeout=500)
ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], timeout=0.5)
assert ready_ids == []