[Core][cli][usability] ray stop prints errors during graceful shutdown (#25686)

Why are these changes needed?
This is to address false alarms on subprocesses exiting when killed by ray stop with SIGTERM.

What has been changed?
Added signal handlers for some of the subprocesses:
dashboard (head)
log monitor
ray client server
Changed the --block semantics and prompt messages.

Related issue number
Closes #25518
This commit is contained in:
Ricky Xu 2022-06-27 08:14:59 -07:00 committed by GitHub
parent c4b9e9ffa5
commit 3d8ca6cf0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 265 additions and 27 deletions

View file

@ -4,6 +4,9 @@ import logging
import logging.handlers
import platform
import traceback
import signal
import os
import sys
import ray._private.ray_constants as ray_constants
import ray._private.services
@ -175,6 +178,20 @@ if __name__ == "__main__":
minimal=args.minimal,
)
loop = asyncio.get_event_loop()
def sigterm_handler():
logger.warn("Exiting with SIGTERM immediately...")
os._exit(signal.SIGTERM)
if sys.platform != "win32":
# TODO(rickyyx): we currently do not have any logic for actual
# graceful termination in the dashboard. Most of the underlying
# async tasks run by the dashboard head doesn't handle CancelledError.
# So a truly graceful shutdown is not trivial w/o much refactoring.
# Re-open the issue: https://github.com/ray-project/ray/issues/25518
# if a truly graceful shutdown is required.
loop.add_signal_handler(signal.SIGTERM, sigterm_handler)
loop.run_until_complete(dashboard.run())
except Exception as e:
traceback_str = ray._private.utils.format_error_message(traceback.format_exc())

View file

@ -46,7 +46,7 @@ class GCSHealthCheckThread(threading.Thread):
future.set_result(check_result)
async def check_once(self) -> bool:
"""Ask the thread to perform a healthcheck."""
"""Ask the thread to perform a health check."""
assert (
threading.current_thread != self
), "caller shouldn't be from the same thread as GCSHealthCheckThread."

View file

@ -1,14 +1,13 @@
import asyncio
import errno
import ipaddress
import logging
import os
import sys
import logging
import ipaddress
from distutils.version import LooseVersion
import ray.dashboard.utils as dashboard_utils
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
# All third-party dependencies that are not included in the minimal Ray
# installation must be included in this file. This allows us to determine if

View file

@ -86,7 +86,7 @@ class LogFileInfo:
class LogMonitor:
"""A monitor process for monitoring Ray log files.
This class mantains a list of open files and a list of closed log files. We
This class maintains a list of open files and a list of closed log files. We
can't simply leave all files open because we'll run out of file
descriptors.

View file

@ -2,6 +2,7 @@ import copy
import json
import logging
import os
import signal
import subprocess
import sys
import time
@ -911,33 +912,55 @@ def start(
cli_logger.newline()
with cli_logger.group(cf.bold("--block")):
cli_logger.print(
"This command will now block until terminated by a signal."
"This command will now block forever until terminated by a signal."
)
cli_logger.print(
"Running subprocesses are monitored and a message will be "
"printed if any of them terminate unexpectedly."
"printed if any of them terminate unexpectedly. Subprocesses "
"exit with SIGTERM will be treated as graceful, thus NOT reported."
)
cli_logger.flush()
while True:
time.sleep(1)
deceased = node.dead_processes()
if len(deceased) > 0:
# Report unexpected exits of subprocesses with unexpected return codes.
# We are explicitly expecting SIGTERM because this is how `ray stop` sends
# shutdown signal to subprocesses, i.e. log_monitor, raylet...
# NOTE(rickyyx): We are treating 128+15 as an expected return code since
# this is what autoscaler/_private/monitor.py does upon SIGTERM
# handling.
expected_return_codes = [
0,
signal.SIGTERM,
-1 * signal.SIGTERM,
128 + signal.SIGTERM,
]
unexpected_deceased = [
(process_type, process)
for process_type, process in deceased
if process.returncode not in expected_return_codes
]
if len(unexpected_deceased) > 0:
cli_logger.newline()
cli_logger.error("Some Ray subprcesses exited unexpectedly:")
cli_logger.error("Some Ray subprocesses exited unexpectedly:")
with cli_logger.indented():
for process_type, process in deceased:
for process_type, process in unexpected_deceased:
cli_logger.error(
"{}",
cf.bold(str(process_type)),
_tags={"exit code": str(process.returncode)},
)
# shutdown_at_exit will handle cleanup.
cli_logger.newline()
cli_logger.error("Remaining processes will be killed.")
sys.exit(1)
# explicitly kill all processes since atexit handlers
# will not exit with errors.
node.kill_all_processes(check_alive=False, allow_graceful=False)
os._exit(1)
# not-reachable
@cli.command()

View file

@ -18,29 +18,33 @@ Note: config cache does not work with AWS mocks since the AWS resource ids are
randomized each time.
"""
import glob
import multiprocessing as mp
import os
import re
import sys
import tempfile
import time
import uuid
import re
import os
from contextlib import contextmanager
from pathlib import Path
import pytest
from typing import Optional
from unittest.mock import MagicMock, patch
import moto
from moto import mock_ec2, mock_iam
from unittest.mock import MagicMock, patch
import pytest
from click.testing import CliRunner
from moto import mock_ec2, mock_iam
from testfixtures import Replacer
from testfixtures.popen import MockPopen, PopenBehaviour
import ray
import ray.autoscaler._private.aws.config as aws_config
from ray.cluster_utils import cluster_not_supported
import ray._private.ray_constants as ray_constants
import ray.scripts.scripts as scripts
from ray._private.test_utils import wait_for_condition
from ray.cluster_utils import cluster_not_supported
import psutil
boto3_list = [
{
@ -111,17 +115,43 @@ def _unlink_test_ssh_key():
pass
def _debug_die(result):
def _start_ray_and_block(runner, child_conn: mp.connection.Connection, as_head: bool):
"""Utility function to start a CLI command with `ray start --block`
This function is expected to be run in another process, where `child_conn` is used
for IPC with the parent.
"""
args = ["--block"]
if as_head:
args.append("--head")
else:
# Worker node
args.append(f"--address=localhost:{ray_constants.DEFAULT_PORT}")
result = runner.invoke(
scripts.start,
args,
)
# Should be blocked until stopped by signals (SIGTERM)
child_conn.send(result.output)
def _debug_die(output, assert_msg: str = ""):
print("!!!!")
print(result.output)
print(output)
print("!!!!")
assert False
assert False, assert_msg
def _fail_if_false(
predicate: bool, stdout: Optional[str] = "", assert_msg: Optional[str] = ""
):
if not predicate:
_debug_die(stdout, assert_msg)
def _die_on_error(result):
if result.exit_code == 0:
return
_debug_die(result)
_fail_if_false(result.exit_code == 0, result.output)
def _debug_check_line_by_line(result, expected_lines):
@ -303,6 +333,174 @@ def test_ray_start_hook(configure_lang, monkeypatch, tmp_path):
_die_on_error(runner.invoke(scripts.stop))
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support. "),
)
@pytest.mark.skipif(
sys.platform == "win32", reason="Windows signal handling not compatible"
)
def test_ray_start_head_block_and_signals(configure_lang, monkeypatch, tmp_path):
"""Test `ray start` with `--block` as heads and workers and signal handles"""
monkeypatch.setenv("RAY_USAGE_STATS_CONFIG_PATH", str(tmp_path / "config.json"))
runner = CliRunner()
head_parent_conn, head_child_conn = mp.Pipe()
# Run `ray start --block --head` in another process and blocks
head_proc = mp.Process(
target=_start_ray_and_block,
kwargs={"runner": runner, "child_conn": head_child_conn, "as_head": True},
)
# Run
head_proc.start()
# Give it some time to start various subprocesses and `ray stop`
# A smaller interval seems to cause occasional failure as the head process
# was stopped too early before spawning all the subprocesses.
time.sleep(5)
# Terminate some of the children process
children = psutil.Process(head_proc.pid).children()
# Terminate everyone other than GCS
# NOTE(rickyyx): The choice of picking GCS is arbitrary.
gcs_proc = None
for child in children:
if "gcs_server" in child.name():
gcs_proc = child
continue
child.terminate()
child.wait(5)
if not head_proc.is_alive():
# NOTE(rickyyx): call recv() here is safe since the process
# is guaranteed to be terminated.
_fail_if_false(
False,
head_parent_conn.recv(),
(
"`ray start --head --block` should not exit"
f"({head_proc.exitcode}) when a subprocess is "
"terminated with SIGTERM."
),
)
# Kill the GCS last should unblock the CLI
gcs_proc.kill()
gcs_proc.wait(5)
# NOTE(rickyyx): The wait here is needed for the `head_proc`
# process to exit
head_proc.join(5)
# Process with "--block" should be dead with a subprocess killed
if head_proc.is_alive() or head_proc.exitcode == 0:
# NOTE(rickyyx): call recv() here is safe since the process
# is guaranteed to be terminated thus invocation is non-blocking.
_fail_if_false(
False,
head_parent_conn.recv() if not head_proc.is_alive() else "still alive",
(
"Head process should have exited with errors when one of"
f" subprocesses killed. But exited={head_proc.exitcode}"
),
)
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support. "),
)
@pytest.mark.skipif(
sys.platform == "win32", reason="Windows signal handling not compatible"
)
def test_ray_start_block_and_stop(configure_lang, monkeypatch, tmp_path):
"""Test `ray start` with `--block` as heads and workers and `ray stop`"""
monkeypatch.setenv("RAY_USAGE_STATS_CONFIG_PATH", str(tmp_path / "config.json"))
runner = CliRunner()
head_parent_conn, head_child_conn = mp.Pipe()
worker_parent_conn, worker_child_conn = mp.Pipe()
# Run `ray start --block --head` in another process and blocks
head_proc = mp.Process(
target=_start_ray_and_block,
kwargs={"runner": runner, "child_conn": head_child_conn, "as_head": True},
)
# Run `ray start --block --address=localhost:DEFAULT_PORT`
worker_proc = mp.Process(
target=_start_ray_and_block,
kwargs={"runner": runner, "child_conn": worker_child_conn, "as_head": False},
)
# Run
head_proc.start()
worker_proc.start()
# Give it some time to start various subprocesses and `ray stop`
# A smaller interval seems to cause occasional failure as the head process
# was stopped too early before spawning all the subprocesses.
time.sleep(5)
stop_result = runner.invoke(scripts.stop)
_die_on_error(stop_result)
# Process with "--block" should be blocked forever w/o
# termination by signals
if not head_proc.is_alive():
# NOTE(rickyyx): call recv() here is safe since the process
# is guaranteed to be terminated.
_fail_if_false(
False,
head_parent_conn.recv(),
(
"`ray start --head --block` should block forever even"
" though Ray subprocesses are stopped normally. But "
f"it exited with {head_proc.exitcode} early. \n"
f"Stop command: {stop_result.output}"
),
)
if not worker_proc.is_alive():
_fail_if_false(
False,
worker_parent_conn.recv(),
(
"`ray start --block` should block forever even"
" though Ray subprocesses are stopped normally. But"
f"it exited with {worker_proc.exitcode} already. \n"
f"Stop command: {stop_result.output}"
),
)
# Stop both worker and head with SIGTERM
head_proc.terminate()
worker_proc.terminate()
if head_parent_conn.poll(3):
head_output = head_parent_conn.recv()
if worker_parent_conn.poll(3):
worker_output = worker_parent_conn.recv()
head_proc.join(5)
worker_proc.join(5)
_fail_if_false(
head_proc.exitcode == 0,
head_output,
f"Head process failed unexpectedly({head_proc.exitcode})",
)
_fail_if_false(
worker_proc.exitcode == 0,
worker_output,
f"Worker process failed unexpectedly({worker_proc.exitcode})",
)
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support"),

View file

@ -108,6 +108,7 @@ void AgentManager::StartAgent() {
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " exit, return value " << exit_code << ". ip "
<< agent_ip_address_ << ". pid " << agent_pid_;
RAY_LOG(ERROR)
<< "The raylet exited immediately because the Ray agent failed. "
"The raylet fate shares with the agent. This can happen because the "