mirror of
https://github.com/vale981/ray
synced 2025-03-09 21:06:39 -04:00
Make object_manager_test::test_object_transfer_retry less flaky. (#4057)
* Make object_manager_test::test_object_transfer_retry less flaky. * Make the test pass.
This commit is contained in:
parent
e9ee38ace2
commit
5fe7b1c618
1 changed files with 32 additions and 17 deletions
|
@ -213,63 +213,78 @@ def test_object_transfer_retry(ray_start_empty_cluster):
|
||||||
repeated_push_delay = 10
|
repeated_push_delay = 10
|
||||||
|
|
||||||
# Force the sending object manager to allow duplicate pushes again sooner.
|
# Force the sending object manager to allow duplicate pushes again sooner.
|
||||||
# Also, force the receiving object manager to retry the Pull sooner.
|
# Also, force the receiving object manager to retry the Pull sooner. We
|
||||||
|
# make the chunk size smaller in order to make it easier to test objects
|
||||||
|
# with multiple chunks.
|
||||||
config = json.dumps({
|
config = json.dumps({
|
||||||
"object_manager_repeated_push_delay_ms": repeated_push_delay * 1000,
|
"object_manager_repeated_push_delay_ms": repeated_push_delay * 1000,
|
||||||
"object_manager_pull_timeout_ms": repeated_push_delay * 1000 / 4,
|
"object_manager_pull_timeout_ms": repeated_push_delay * 1000 / 4,
|
||||||
|
"object_manager_default_chunk_size": 1000
|
||||||
})
|
})
|
||||||
cluster.add_node(_internal_config=config)
|
object_store_memory = 10**8
|
||||||
cluster.add_node(num_gpus=1, _internal_config=config)
|
cluster.add_node(
|
||||||
|
object_store_memory=object_store_memory, _internal_config=config)
|
||||||
|
cluster.add_node(
|
||||||
|
num_gpus=1,
|
||||||
|
object_store_memory=object_store_memory,
|
||||||
|
_internal_config=config)
|
||||||
ray.init(redis_address=cluster.redis_address)
|
ray.init(redis_address=cluster.redis_address)
|
||||||
|
|
||||||
@ray.remote(num_gpus=1)
|
@ray.remote(num_gpus=1)
|
||||||
def f(size):
|
def f(size):
|
||||||
return np.zeros(size, dtype=np.uint8)
|
return np.zeros(size, dtype=np.uint8)
|
||||||
|
|
||||||
x_ids = [f.remote(10**i) for i in [1, 2, 3, 4, 5, 6, 7]]
|
# Transfer an object to warm up the object manager.
|
||||||
|
ray.get(f.remote(10**6))
|
||||||
|
|
||||||
|
x_ids = [f.remote(10**i) for i in [1, 2, 3, 4]]
|
||||||
assert not any(
|
assert not any(
|
||||||
ray.worker.global_worker.plasma_client.contains(
|
ray.worker.global_worker.plasma_client.contains(
|
||||||
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
|
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
|
||||||
|
|
||||||
|
# Get the objects locally to cause them to be transferred. This is the
|
||||||
|
# first time the objects are getting transferred, so it should happen
|
||||||
|
# quickly.
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
# Get the objects locally to cause them to be transferred.
|
|
||||||
xs = ray.get(x_ids)
|
xs = ray.get(x_ids)
|
||||||
|
end_time = time.time()
|
||||||
|
if end_time - start_time > repeated_push_delay:
|
||||||
|
warnings.warn("The initial transfer took longer than the repeated "
|
||||||
|
"push delay, so this test may not be testing the thing "
|
||||||
|
"it's supposed to test.")
|
||||||
|
|
||||||
# Cause all objects to be flushed.
|
# Cause all objects to be flushed.
|
||||||
del xs
|
del xs
|
||||||
x = np.zeros(10**7, dtype=np.uint8)
|
x = np.zeros(object_store_memory // 10, dtype=np.uint8)
|
||||||
for _ in range(10):
|
for _ in range(15):
|
||||||
ray.put(x)
|
ray.put(x)
|
||||||
assert not any(
|
assert not any(
|
||||||
ray.worker.global_worker.plasma_client.contains(
|
ray.worker.global_worker.plasma_client.contains(
|
||||||
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
|
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
|
||||||
|
|
||||||
end_time = time.time()
|
|
||||||
print(end_time - start_time)
|
|
||||||
# Make sure that the first time the objects get transferred, it happens
|
|
||||||
# quickly.
|
|
||||||
assert end_time - start_time < repeated_push_delay
|
|
||||||
|
|
||||||
# Get the objects again and make sure they get transferred.
|
# Get the objects again and make sure they get transferred.
|
||||||
xs = ray.get(x_ids)
|
xs = ray.get(x_ids)
|
||||||
|
|
||||||
end_transfer_time = time.time()
|
end_transfer_time = time.time()
|
||||||
|
|
||||||
# We should have had to wait for the repeated push delay.
|
# We should have had to wait for the repeated push delay.
|
||||||
assert end_transfer_time - start_time >= repeated_push_delay
|
assert end_transfer_time - start_time >= repeated_push_delay
|
||||||
|
|
||||||
# Flush the objects again and wait longer than the repeated push delay and
|
# Flush the objects again and wait longer than the repeated push delay and
|
||||||
# make sure that the objects are transferred again.
|
# make sure that the objects are transferred again.
|
||||||
del xs
|
del xs
|
||||||
for _ in range(10):
|
for _ in range(15):
|
||||||
ray.put(x)
|
ray.put(x)
|
||||||
assert not any(
|
assert not any(
|
||||||
ray.worker.global_worker.plasma_client.contains(
|
ray.worker.global_worker.plasma_client.contains(
|
||||||
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
|
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
|
||||||
|
|
||||||
time.sleep(repeated_push_delay)
|
time.sleep(repeated_push_delay)
|
||||||
|
|
||||||
|
# Get the objects locally to cause them to be transferred. This should
|
||||||
|
# happen quickly.
|
||||||
|
start_time = time.time()
|
||||||
ray.get(x_ids)
|
ray.get(x_ids)
|
||||||
|
end_time = time.time()
|
||||||
|
assert end_time - start_time < repeated_push_delay
|
||||||
|
|
||||||
|
|
||||||
# The purpose of this test is to make sure we can transfer many objects. In the
|
# The purpose of this test is to make sure we can transfer many objects. In the
|
||||||
|
|
Loading…
Add table
Reference in a new issue