Fix some flaky tests (#4535)

This commit is contained in:
Hao Chen 2019-04-03 08:57:11 +08:00 committed by Robert Nishihara
parent db4cf24636
commit 23404f7bcf
3 changed files with 18 additions and 114 deletions

View file

@ -1295,43 +1295,6 @@ def test_actors_and_tasks_with_gpus_version_two(shutdown_only):
assert set(gpu_ids) == set(range(10))
@pytest.mark.skipif(
sys.version_info < (3, 0), reason="This test requires Python 3.")
def test_actors_and_task_resource_bookkeeping(ray_start_regular):
@ray.remote
class Foo(object):
def __init__(self):
start = time.monotonic()
time.sleep(0.1)
end = time.monotonic()
self.interval = (start, end)
def get_interval(self):
return self.interval
def sleep(self):
start = time.monotonic()
time.sleep(0.01)
end = time.monotonic()
return start, end
# First make sure that we do not have more actor methods running at a
# time than we have CPUs.
actors = [Foo.remote() for _ in range(4)]
interval_ids = []
interval_ids += [actor.get_interval.remote() for actor in actors]
for _ in range(4):
interval_ids += [actor.sleep.remote() for actor in actors]
# Make sure that the intervals don't overlap.
intervals = ray.get(interval_ids)
intervals.sort(key=lambda x: x[0])
for interval1, interval2 in zip(intervals[:-1], intervals[1:]):
assert interval1[0] < interval1[1]
assert interval1[1] < interval2[0]
assert interval2[0] < interval2[1]
def test_blocking_actor_task(shutdown_only):
ray.init(num_cpus=1, num_gpus=1)
@ -1452,7 +1415,7 @@ def test_actor_init_fails(ray_start_cluster_head):
def test_reconstruction_suppression(ray_start_cluster_head):
cluster = ray_start_cluster_head
num_nodes = 10
num_nodes = 5
worker_nodes = [cluster.add_node() for _ in range(num_nodes)]
@ray.remote(max_reconstructions=1)
@ -1469,7 +1432,7 @@ def test_reconstruction_suppression(ray_start_cluster_head):
return ray.get(actor_handle.inc.remote())
# Make sure all of the actors have started.
actors = [Counter.remote() for _ in range(20)]
actors = [Counter.remote() for _ in range(10)]
ray.get([actor.inc.remote() for actor in actors])
# Kill a node.

View file

@ -1590,7 +1590,7 @@ def test_resource_constraints(shutdown_only):
]))) == num_workers:
break
time_buffer = 0.3
time_buffer = 0.5
# At most 10 copies of this can run at once.
@ray.remote(num_cpus=1)
@ -1674,7 +1674,7 @@ def test_multi_resource_constraints(shutdown_only):
def g(n):
time.sleep(n)
time_buffer = 0.3
time_buffer = 0.5
start_time = time.time()
ray.get([f.remote(0.5), g.remote(0.5)])
@ -1705,71 +1705,21 @@ def test_gpu_ids(shutdown_only):
num_gpus = 10
ray.init(num_cpus=10, num_gpus=num_gpus)
@ray.remote(num_gpus=0)
def f0():
def get_gpu_ids(num_gpus_per_worker):
time.sleep(0.1)
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 0
assert len(gpu_ids) == num_gpus_per_worker
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
for gpu_id in gpu_ids:
assert gpu_id in range(num_gpus)
return gpu_ids
@ray.remote(num_gpus=1)
def f1():
time.sleep(0.1)
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
for gpu_id in gpu_ids:
assert gpu_id in range(num_gpus)
return gpu_ids
@ray.remote(num_gpus=2)
def f2():
time.sleep(0.1)
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 2
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
for gpu_id in gpu_ids:
assert gpu_id in range(num_gpus)
return gpu_ids
@ray.remote(num_gpus=3)
def f3():
time.sleep(0.1)
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 3
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
for gpu_id in gpu_ids:
assert gpu_id in range(num_gpus)
return gpu_ids
@ray.remote(num_gpus=4)
def f4():
time.sleep(0.1)
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 4
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
for gpu_id in gpu_ids:
assert gpu_id in range(num_gpus)
return gpu_ids
@ray.remote(num_gpus=5)
def f5():
time.sleep(0.1)
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 5
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
for gpu_id in gpu_ids:
assert gpu_id in range(num_gpus)
return gpu_ids
f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0))
f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1))
f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2))
f4 = ray.remote(num_gpus=4)(lambda: get_gpu_ids(4))
f5 = ray.remote(num_gpus=5)(lambda: get_gpu_ids(5))
# Wait for all workers to start up.
@ray.remote
@ -1796,20 +1746,11 @@ def test_gpu_ids(shutdown_only):
all_ids = [gpu_id for gpu_ids in list_of_ids for gpu_id in gpu_ids]
assert set(all_ids) == set(range(10))
remaining = [f5.remote() for _ in range(20)]
for _ in range(10):
t1 = time.time()
ready, remaining = ray.wait(remaining, num_returns=2)
t2 = time.time()
# There are only 10 GPUs, and each task uses 2 GPUs, so there
# should only be 2 tasks scheduled at a given time, so if we wait
# for 2 tasks to finish, then it should take at least 0.1 seconds
# for each pair of tasks to finish.
assert t2 - t1 > 0.09
list_of_ids = ray.get(ready)
all_ids = [gpu_id for gpu_ids in list_of_ids for gpu_id in gpu_ids]
# Commenting out the below assert because it seems to fail a lot.
# assert set(all_ids) == set(range(10))
# There are only 10 GPUs, and each task uses 5 GPUs, so there should only
# be 2 tasks scheduled at a given time.
t1 = time.time()
ray.get([f5.remote() for _ in range(20)])
assert time.time() - t1 >= 10 * 0.1
# Test that actors have CUDA_VISIBLE_DEVICES set properly.

View file

@ -143,7 +143,7 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
args=[],
kwargs={},
num_cpus=0.01,
resources={str(i % num_nodes): 1}) for i in range(100)
resources={str(i % num_nodes): 1}) for i in range(30)
]
# Wait for the actors to start up.
@ -152,7 +152,7 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
object_ids = []
# Broadcast a large object to all actors.
for _ in range(10):
for _ in range(5):
x_id = ray.put(np.zeros(10**7, dtype=np.uint8))
object_ids.append(x_id)
# Pass the object into a method for every actor.