From 5fe7b1c618e47b2bce78824461d43d8bd91653ad Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 19 Feb 2019 20:03:11 -0800 Subject: [PATCH] Make object_manager_test::test_object_transfer_retry less flaky. (#4057) * Make object_manager_test::test_object_transfer_retry less flaky. * Make the test pass. --- test/object_manager_test.py | 49 ++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/test/object_manager_test.py b/test/object_manager_test.py index afab02921..b51f54d1a 100644 --- a/test/object_manager_test.py +++ b/test/object_manager_test.py @@ -213,63 +213,78 @@ def test_object_transfer_retry(ray_start_empty_cluster): repeated_push_delay = 10 # Force the sending object manager to allow duplicate pushes again sooner. - # Also, force the receiving object manager to retry the Pull sooner. + # Also, force the receiving object manager to retry the Pull sooner. We + # make the chunk size smaller in order to make it easier to test objects + # with multiple chunks. config = json.dumps({ "object_manager_repeated_push_delay_ms": repeated_push_delay * 1000, "object_manager_pull_timeout_ms": repeated_push_delay * 1000 / 4, + "object_manager_default_chunk_size": 1000 }) - cluster.add_node(_internal_config=config) - cluster.add_node(num_gpus=1, _internal_config=config) + object_store_memory = 10**8 + cluster.add_node( + object_store_memory=object_store_memory, _internal_config=config) + cluster.add_node( + num_gpus=1, + object_store_memory=object_store_memory, + _internal_config=config) ray.init(redis_address=cluster.redis_address) @ray.remote(num_gpus=1) def f(size): return np.zeros(size, dtype=np.uint8) - x_ids = [f.remote(10**i) for i in [1, 2, 3, 4, 5, 6, 7]] + # Transfer an object to warm up the object manager. + ray.get(f.remote(10**6)) + + x_ids = [f.remote(10**i) for i in [1, 2, 3, 4]] assert not any( ray.worker.global_worker.plasma_client.contains( ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids) + # Get the objects locally to cause them to be transferred. This is the + # first time the objects are getting transferred, so it should happen + # quickly. start_time = time.time() - - # Get the objects locally to cause them to be transferred. xs = ray.get(x_ids) + end_time = time.time() + if end_time - start_time > repeated_push_delay: + warnings.warn("The initial transfer took longer than the repeated " + "push delay, so this test may not be testing the thing " + "it's supposed to test.") # Cause all objects to be flushed. del xs - x = np.zeros(10**7, dtype=np.uint8) - for _ in range(10): + x = np.zeros(object_store_memory // 10, dtype=np.uint8) + for _ in range(15): ray.put(x) assert not any( ray.worker.global_worker.plasma_client.contains( ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids) - end_time = time.time() - print(end_time - start_time) - # Make sure that the first time the objects get transferred, it happens - # quickly. - assert end_time - start_time < repeated_push_delay - # Get the objects again and make sure they get transferred. xs = ray.get(x_ids) - end_transfer_time = time.time() - # We should have had to wait for the repeated push delay. assert end_transfer_time - start_time >= repeated_push_delay # Flush the objects again and wait longer than the repeated push delay and # make sure that the objects are transferred again. del xs - for _ in range(10): + for _ in range(15): ray.put(x) assert not any( ray.worker.global_worker.plasma_client.contains( ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids) time.sleep(repeated_push_delay) + + # Get the objects locally to cause them to be transferred. This should + # happen quickly. + start_time = time.time() ray.get(x_ids) + end_time = time.time() + assert end_time - start_time < repeated_push_delay # The purpose of this test is to make sure we can transfer many objects. In the