ray/test/component_failures_test.py
Robert Nishihara 3ce8eb2d4c Test dying_worker_get and dying_worker_wait for xray. (#2997)
This tests the case in which a worker is blocked in a call to ray.get or ray.wait, and then the worker dies. Then later, the object that the worker was waiting for becomes available. We need to make sure not to try to send a message to the dead worker and then die. Related to #2790.
2018-10-02 00:08:47 -07:00

569 lines
18 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import signal
import time
import pytest
import ray
from ray.test.test_utils import run_string_as_driver_nonblocking
import pyarrow as pa
@pytest.fixture
def ray_start_workers_separate():
# Start the Ray processes.
ray.worker._init(
num_cpus=1,
start_workers_from_local_scheduler=False,
start_ray_local=True,
redirect_output=True)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
# This test checks that when a worker dies in the middle of a get, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY") != "1",
reason="This test only works with xray.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_worker_get_raylet(shutdown_only):
# Start the Ray processes.
ray.init(num_cpus=2)
@ray.remote
def sleep_forever():
time.sleep(10**6)
@ray.remote
def get_worker_pid():
return os.getpid()
x_id = sleep_forever.remote()
time.sleep(0.01) # Try to wait for the sleep task to get scheduled.
# Get the PID of the other worker.
worker_pid = ray.get(get_worker_pid.remote())
@ray.remote
def f(id_in_a_list):
ray.get(id_in_a_list[0])
# Have the worker wait in a get call.
result_id = f.remote([x_id])
time.sleep(1)
# Make sure the task hasn't finished.
ready_ids, _ = ray.wait([result_id], timeout=0)
assert len(ready_ids) == 0
# Kill the worker.
os.kill(worker_pid, signal.SIGKILL)
time.sleep(0.1)
# Make sure the sleep task hasn't finished.
ready_ids, _ = ray.wait([x_id], timeout=0)
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(x_id, 1)
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.all_processes_alive()
# This test checks that when a driver dies in the middle of a get, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY") != "1",
reason="This test only works with xray.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_driver_get(shutdown_only):
# Start the Ray processes.
address_info = ray.init(num_cpus=1)
@ray.remote
def sleep_forever():
time.sleep(10**6)
x_id = sleep_forever.remote()
driver = """
import ray
ray.init("{}")
ray.get(ray.ObjectID({}))
""".format(address_info["redis_address"], x_id.id())
p = run_string_as_driver_nonblocking(driver)
# Make sure the driver is running.
time.sleep(1)
assert p.poll() is None
# Kill the driver process.
p.kill()
p.wait()
time.sleep(0.1)
# Make sure the original task hasn't finished.
ready_ids, _ = ray.wait([x_id], timeout=0)
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(x_id, 1)
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.all_processes_alive()
# This test checks that when a worker dies in the middle of a get, the
# plasma store and manager will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY", False),
reason="This test does not work with xray yet.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_worker_get(ray_start_workers_separate):
obj_id = 20 * b"a"
@ray.remote
def f():
ray.worker.global_worker.plasma_client.get(ray.ObjectID(obj_id))
# Have the worker wait in a get call.
f.remote()
# Kill the worker.
time.sleep(1)
(ray.services.all_processes[ray.services.PROCESS_TYPE_WORKER][0]
.terminate())
time.sleep(0.1)
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.plasma_client.create(
pa.plasma.ObjectID(obj_id), 100)
ray.worker.global_worker.plasma_client.seal(pa.plasma.ObjectID(obj_id))
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.all_processes_alive(
exclude=[ray.services.PROCESS_TYPE_WORKER])
# This test checks that when a worker dies in the middle of a wait, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY") != "1",
reason="This test only works with xray.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_worker_wait_raylet(shutdown_only):
ray.init(num_cpus=2)
@ray.remote
def sleep_forever():
time.sleep(10**6)
@ray.remote
def get_pid():
return os.getpid()
x_id = sleep_forever.remote()
# Get the PID of the worker that block_in_wait will run on (sleep a little
# to make sure that sleep_forever has already started).
time.sleep(0.1)
worker_pid = ray.get(get_pid.remote())
@ray.remote
def block_in_wait(object_id_in_list):
ray.wait(object_id_in_list)
# Have the worker wait in a wait call.
block_in_wait.remote([x_id])
time.sleep(0.1)
# Kill the worker.
os.kill(worker_pid, signal.SIGKILL)
time.sleep(0.1)
# Create the object.
ray.worker.global_worker.put_object(x_id, 1)
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.all_processes_alive()
# This test checks that when a driver dies in the middle of a wait, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY") != "1",
reason="This test only works with xray.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_driver_wait(shutdown_only):
# Start the Ray processes.
address_info = ray.init(num_cpus=1)
@ray.remote
def sleep_forever():
time.sleep(10**6)
x_id = sleep_forever.remote()
driver = """
import ray
ray.init("{}")
ray.wait([ray.ObjectID({})])
""".format(address_info["redis_address"], x_id.id())
p = run_string_as_driver_nonblocking(driver)
# Make sure the driver is running.
time.sleep(1)
assert p.poll() is None
# Kill the driver process.
p.kill()
p.wait()
time.sleep(0.1)
# Make sure the original task hasn't finished.
ready_ids, _ = ray.wait([x_id], timeout=0)
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# wait can return.
ray.worker.global_worker.put_object(x_id, 1)
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.all_processes_alive()
# This test checks that when a worker dies in the middle of a wait, the
# plasma store and manager will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY", False),
reason="This test does not work with xray yet.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_worker_wait(ray_start_workers_separate):
obj_id = 20 * b"a"
@ray.remote
def f():
ray.worker.global_worker.plasma_client.wait([ray.ObjectID(obj_id)])
# Have the worker wait in a get call.
f.remote()
# Kill the worker.
time.sleep(1)
(ray.services.all_processes[ray.services.PROCESS_TYPE_WORKER][0]
.terminate())
time.sleep(0.1)
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.plasma_client.create(
pa.plasma.ObjectID(obj_id), 100)
ray.worker.global_worker.plasma_client.seal(pa.plasma.ObjectID(obj_id))
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.all_processes_alive(
exclude=[ray.services.PROCESS_TYPE_WORKER])
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_workers_separate_multinode(request):
num_local_schedulers = request.param[0]
num_initial_workers = request.param[1]
# Start the Ray processes.
ray.worker._init(
num_workers=(num_initial_workers * num_local_schedulers),
num_local_schedulers=num_local_schedulers,
start_workers_from_local_scheduler=False,
start_ray_local=True,
num_cpus=[num_initial_workers] * num_local_schedulers,
redirect_output=True)
yield num_local_schedulers, num_initial_workers
# The code after the yield will run as teardown code.
ray.shutdown()
def test_worker_failed(ray_start_workers_separate_multinode):
num_local_schedulers, num_initial_workers = (
ray_start_workers_separate_multinode)
@ray.remote
def f(x):
time.sleep(0.5)
return x
# Submit more tasks than there are workers so that all workers and
# cores are utilized.
object_ids = [
f.remote(i) for i in range(num_initial_workers * num_local_schedulers)
]
object_ids += [f.remote(object_id) for object_id in object_ids]
# Allow the tasks some time to begin executing.
time.sleep(0.1)
# Kill the workers as the tasks execute.
for worker in (
ray.services.all_processes[ray.services.PROCESS_TYPE_WORKER]):
worker.terminate()
time.sleep(0.1)
# Make sure that we can still get the objects after the executing tasks
# died.
ray.get(object_ids)
def _test_component_failed(component_type):
"""Kill a component on all worker nodes and check workload succeeds."""
# Raylet is able to pass a harder failure test than legacy ray.
use_raylet = os.environ.get("RAY_USE_XRAY") == "1"
# Start with 4 workers and 4 cores.
num_local_schedulers = 4
num_workers_per_scheduler = 8
ray.worker._init(
num_workers=num_workers_per_scheduler,
num_local_schedulers=num_local_schedulers,
start_ray_local=True,
num_cpus=[num_workers_per_scheduler] * num_local_schedulers,
redirect_output=True)
if use_raylet:
# Submit many tasks with many dependencies.
@ray.remote
def f(x):
return x
@ray.remote
def g(*xs):
return 1
# Kill the component on all nodes except the head node as the tasks
# execute. Do this in a loop while submitting tasks between each
# component failure.
# NOTE(swang): Legacy ray hangs on this test if the plasma manager
# is killed.
time.sleep(0.1)
components = ray.services.all_processes[component_type]
for process in components[1:]:
# Submit a round of tasks with many dependencies.
x = 1
for _ in range(1000):
x = f.remote(x)
xs = [g.remote(1)]
for _ in range(100):
xs.append(g.remote(*xs))
xs.append(g.remote(1))
# Kill a component on one of the nodes.
process.terminate()
time.sleep(1)
process.kill()
process.wait()
assert not process.poll() is None
# Make sure that we can still get the objects after the
# executing tasks died.
ray.get(x)
ray.get(xs)
else:
@ray.remote
def f(x, j):
time.sleep(0.2)
return x
# Submit more tasks than there are workers so that all workers and
# cores are utilized.
object_ids = [
f.remote(i, 0)
for i in range(num_workers_per_scheduler * num_local_schedulers)
]
object_ids += [f.remote(object_id, 1) for object_id in object_ids]
object_ids += [f.remote(object_id, 2) for object_id in object_ids]
# Kill the component on all nodes except the head node as the tasks
# execute.
time.sleep(0.1)
components = ray.services.all_processes[component_type]
for process in components[1:]:
process.terminate()
# while the local_scheduler is fetching object_ids,
# and would trigger `fetch_object_timeout_handler`,
# which leads to find the plasma manager or plasma
# store socket is broken, so local_scheduler failed.
time.sleep(1)
for process in components[1:]:
process.kill()
process.wait()
assert not process.poll() is None
# Make sure that we can still get the objects after the executing
# tasks died.
results = ray.get(object_ids)
expected_results = 4 * list(
range(num_workers_per_scheduler * num_local_schedulers))
assert results == expected_results
def check_components_alive(component_type, check_component_alive):
"""Check that a given component type is alive on all worker nodes.
"""
components = ray.services.all_processes[component_type][1:]
for component in components:
if check_component_alive:
assert component.poll() is None
else:
print("waiting for " + component_type + " with PID " +
str(component.pid) + "to terminate")
component.wait()
print("done waiting for " + component_type + " with PID " +
str(component.pid) + "to terminate")
assert not component.poll() is None
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY") != "1",
reason="This test only makes sense with xray.")
def test_raylet_failed():
# Kill all local schedulers on worker nodes.
_test_component_failed(ray.services.PROCESS_TYPE_RAYLET)
# The plasma stores and plasma managers should still be alive on the
# worker nodes.
check_components_alive(ray.services.PROCESS_TYPE_PLASMA_STORE, True)
ray.shutdown()
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY") == "1",
reason="This test does not make sense with xray.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_local_scheduler_failed():
# Kill all local schedulers on worker nodes.
_test_component_failed(ray.services.PROCESS_TYPE_LOCAL_SCHEDULER)
# The plasma stores and plasma managers should still be alive on the
# worker nodes.
check_components_alive(ray.services.PROCESS_TYPE_PLASMA_STORE, True)
check_components_alive(ray.services.PROCESS_TYPE_PLASMA_MANAGER, True)
check_components_alive(ray.services.PROCESS_TYPE_LOCAL_SCHEDULER, False)
ray.shutdown()
@pytest.mark.skipif(
os.environ.get("RAY_USE_XRAY") == "1",
reason="This test does not make sense with xray.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_plasma_manager_failed():
# Kill all plasma managers on worker nodes.
_test_component_failed(ray.services.PROCESS_TYPE_PLASMA_MANAGER)
# The plasma stores should still be alive (but unreachable) on the
# worker nodes.
check_components_alive(ray.services.PROCESS_TYPE_PLASMA_STORE, True)
check_components_alive(ray.services.PROCESS_TYPE_PLASMA_MANAGER, False)
check_components_alive(ray.services.PROCESS_TYPE_LOCAL_SCHEDULER, False)
ray.shutdown()
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_plasma_store_failed():
# Kill all plasma stores on worker nodes.
_test_component_failed(ray.services.PROCESS_TYPE_PLASMA_STORE)
# No processes should be left alive on the worker nodes.
check_components_alive(ray.services.PROCESS_TYPE_PLASMA_STORE, False)
check_components_alive(ray.services.PROCESS_TYPE_PLASMA_MANAGER, False)
check_components_alive(ray.services.PROCESS_TYPE_LOCAL_SCHEDULER, False)
check_components_alive(ray.services.PROCESS_TYPE_RAYLET, False)
ray.shutdown()
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_sequential():
ray.worker.init()
all_processes = ray.services.all_processes
processes = (all_processes[ray.services.PROCESS_TYPE_PLASMA_STORE] +
all_processes[ray.services.PROCESS_TYPE_PLASMA_MANAGER] +
all_processes[ray.services.PROCESS_TYPE_LOCAL_SCHEDULER] +
all_processes[ray.services.PROCESS_TYPE_GLOBAL_SCHEDULER] +
all_processes[ray.services.PROCESS_TYPE_RAYLET])
# Kill all the components sequentially.
for process in processes:
process.terminate()
time.sleep(0.1)
process.kill()
process.wait()
ray.shutdown()
# If the driver can reach the tearDown method, then it is still alive.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_parallel():
ray.worker.init()
all_processes = ray.services.all_processes
processes = (all_processes[ray.services.PROCESS_TYPE_PLASMA_STORE] +
all_processes[ray.services.PROCESS_TYPE_PLASMA_MANAGER] +
all_processes[ray.services.PROCESS_TYPE_LOCAL_SCHEDULER] +
all_processes[ray.services.PROCESS_TYPE_GLOBAL_SCHEDULER] +
all_processes[ray.services.PROCESS_TYPE_RAYLET])
# Kill all the components in parallel.
for process in processes:
process.terminate()
time.sleep(0.1)
for process in processes:
process.kill()
for process in processes:
process.wait()
# If the driver can reach the tearDown method, then it is still alive.
ray.shutdown()