mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[CI][rfc] Fix flaky test_multi_node:test_cleanup_on_driver_exit
This commit is contained in:
parent
8136d2912b
commit
feeb20e920
1 changed files with 67 additions and 64 deletions
|
@ -12,6 +12,73 @@ from ray._private.test_utils import (
|
|||
object_memory_usage, wait_for_condition)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"call_ray_start", [
|
||||
"ray start --head --num-cpus=1 --min-worker-port=0 "
|
||||
"--max-worker-port=0 --port 0",
|
||||
],
|
||||
indirect=True)
|
||||
def test_cleanup_on_driver_exit(call_ray_start):
|
||||
# This test will create a driver that creates a bunch of objects and then
|
||||
# exits. The entries in the object table should be cleaned up.
|
||||
address = call_ray_start
|
||||
|
||||
ray.init(address=address)
|
||||
|
||||
# Define a driver that creates a bunch of objects and exits.
|
||||
driver_script = """
|
||||
import time
|
||||
import ray
|
||||
import numpy as np
|
||||
from ray._private.test_utils import object_memory_usage
|
||||
import os
|
||||
|
||||
|
||||
ray.init(address="{}")
|
||||
object_refs = [ray.put(np.zeros(200 * 1024, dtype=np.uint8))
|
||||
for i in range(1000)]
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 30:
|
||||
if object_memory_usage() > 0:
|
||||
break
|
||||
else:
|
||||
raise Exception("Objects did not appear in object table.")
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
time.sleep(1)
|
||||
|
||||
print("success")
|
||||
# Submit some tasks without waiting for them to finish. Their workers should
|
||||
# still get cleaned up eventually, even if they get started after the driver
|
||||
# exits.
|
||||
[f.remote() for _ in range(10)]
|
||||
""".format(address)
|
||||
|
||||
out = run_string_as_driver(driver_script)
|
||||
assert "success" in out
|
||||
|
||||
# Make sure the objects are removed from the object table.
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 30:
|
||||
if object_memory_usage() == 0:
|
||||
break
|
||||
else:
|
||||
raise Exception("Objects were not all removed from object table.")
|
||||
|
||||
def all_workers_exited():
|
||||
result = True
|
||||
print("list of idle workers:")
|
||||
for proc in psutil.process_iter():
|
||||
if ray_constants.WORKER_PROCESS_TYPE_IDLE_WORKER in proc.name():
|
||||
print(f"{proc}")
|
||||
result = False
|
||||
return result
|
||||
|
||||
# Check that workers are eventually cleaned up.
|
||||
wait_for_condition(all_workers_exited, timeout=15, retry_interval_ms=1000)
|
||||
|
||||
|
||||
def test_error_isolation(call_ray_start):
|
||||
address = call_ray_start
|
||||
# Connect a driver to the Ray cluster.
|
||||
|
@ -166,70 +233,6 @@ print("success")
|
|||
assert "success" in out
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"call_ray_start", [
|
||||
"ray start --head --num-cpus=1 --min-worker-port=0 "
|
||||
"--max-worker-port=0 --port 0",
|
||||
],
|
||||
indirect=True)
|
||||
def test_cleanup_on_driver_exit(call_ray_start):
|
||||
# This test will create a driver that creates a bunch of objects and then
|
||||
# exits. The entries in the object table should be cleaned up.
|
||||
address = call_ray_start
|
||||
|
||||
ray.init(address=address)
|
||||
|
||||
# Define a driver that creates a bunch of objects and exits.
|
||||
driver_script = """
|
||||
import time
|
||||
import ray
|
||||
import numpy as np
|
||||
from ray._private.test_utils import object_memory_usage
|
||||
import os
|
||||
|
||||
|
||||
ray.init(address="{}")
|
||||
object_refs = [ray.put(np.zeros(200 * 1024, dtype=np.uint8))
|
||||
for i in range(1000)]
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 30:
|
||||
if object_memory_usage() > 0:
|
||||
break
|
||||
else:
|
||||
raise Exception("Objects did not appear in object table.")
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
time.sleep(1)
|
||||
|
||||
print("success")
|
||||
# Submit some tasks without waiting for them to finish. Their workers should
|
||||
# still get cleaned up eventually, even if they get started after the driver
|
||||
# exits.
|
||||
[f.remote() for _ in range(10)]
|
||||
""".format(address)
|
||||
|
||||
out = run_string_as_driver(driver_script)
|
||||
assert "success" in out
|
||||
|
||||
# Make sure the objects are removed from the object table.
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 30:
|
||||
if object_memory_usage() == 0:
|
||||
break
|
||||
else:
|
||||
raise Exception("Objects were not all removed from object table.")
|
||||
|
||||
def all_workers_exited():
|
||||
for proc in psutil.process_iter():
|
||||
if ray_constants.WORKER_PROCESS_TYPE_IDLE_WORKER in proc.name():
|
||||
return False
|
||||
return True
|
||||
|
||||
# Check that workers are eventually cleaned up.
|
||||
wait_for_condition(all_workers_exited)
|
||||
|
||||
|
||||
def test_drivers_named_actors(call_ray_start):
|
||||
# This test will create some drivers that submit some tasks to the same
|
||||
# named actor.
|
||||
|
|
Loading…
Add table
Reference in a new issue