From 7fcf1bf57ec7fc135868d382895e8fa2ea14289b Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 4 Oct 2021 21:01:51 +0900 Subject: [PATCH] [Dashboard] Refine the dashboard restart logic. (#18973) * in progress * Refine the dashboard agent retry logic * refine * done * lint --- dashboard/agent.py | 42 +++++++++++++++++------- dashboard/tests/test_dashboard.py | 2 +- python/ray/tests/test_dashboard.py | 52 +++++++++++++++++++++++++++--- src/ray/common/ray_config_def.h | 3 ++ src/ray/raylet/agent_manager.cc | 28 +++++++++++++--- src/ray/raylet/agent_manager.h | 2 ++ src/ray/raylet/node_manager.cc | 3 +- 7 files changed, 109 insertions(+), 23 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index 552587107..1cb9fdd5f 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -362,14 +362,34 @@ if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(agent.run()) except Exception as e: - # Something went wrong, so push an error to all drivers. - redis_client = ray._private.services.create_redis_client( - args.redis_address, password=args.redis_password) - traceback_str = ray._private.utils.format_error_message( - traceback.format_exc()) - message = ("The agent on node {} failed with the following " - "error:\n{}".format(platform.uname()[1], traceback_str)) - ray._private.utils.push_error_to_driver_through_redis( - redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR, message) - logger.exception(message) - raise e + # All these env vars should be available because + # they are provided by the parent raylet. + restart_count = os.environ["RESTART_COUNT"] + max_restart_count = os.environ["MAX_RESTART_COUNT"] + raylet_pid = os.environ["RAY_RAYLET_PID"] + node_ip = args.node_ip_address + if restart_count >= max_restart_count: + # Agent is failed to be started many times. + # Push an error to all drivers, so that users can know the + # impact of the issue. + redis_client = ray._private.services.create_redis_client( + args.redis_address, password=args.redis_password) + traceback_str = ray._private.utils.format_error_message( + traceback.format_exc()) + message = ( + f"(ip={node_ip}) " + f"The agent on node {platform.uname()[1]} failed to " + f"be restarted {max_restart_count} " + "times. There are 3 possible problems if you see this error." + "\n 1. The dashboard might not display correct " + "information on this node." + "\n 2. Metrics on this node won't be reported." + "\n 3. runtime_env APIs won't work." + "\nCheck out the `dashboard_agent.log` to see the " + "detailed failure messages.") + ray._private.utils.push_error_to_driver_through_redis( + redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR, + message) + logger.error(message) + logger.exception(e) + exit(1) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index ea335c61b..6565ea088 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -107,7 +107,7 @@ def test_basic(ray_start_with_dashboard): agent_proc.kill() agent_proc.wait() # The agent will be restarted for imports failure. - for x in range(50): + for _ in range(300): agent_proc = _search_agent(raylet_proc.children()) if agent_proc: agent_pids.add(agent_proc.pid) diff --git a/python/ray/tests/test_dashboard.py b/python/ray/tests/test_dashboard.py index c92d9610e..578707bae 100644 --- a/python/ray/tests/test_dashboard.py +++ b/python/ray/tests/test_dashboard.py @@ -4,14 +4,34 @@ import subprocess import sys import time +import psutil import pytest import requests -from ray._private.test_utils import run_string_as_driver, wait_for_condition +from ray._private.test_utils import (run_string_as_driver, wait_for_condition, + get_error_message) import ray from ray import ray_constants +def search_agents(cluster): + all_processes = cluster.head_node.all_processes + raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0] + raylet_proc = psutil.Process(raylet_proc_info.process.pid) + + def _search_agent(processes): + for p in processes: + try: + for c in p.cmdline(): + if "dashboard/agent.py" in c: + return p + except Exception: + pass + + agent_proc = _search_agent(raylet_proc.children()) + return agent_proc + + def test_ray_start_default_port_conflict(call_ray_stop_only, shutdown_only): subprocess.check_call(["ray", "start", "--head"]) ray.init(address="auto") @@ -90,8 +110,6 @@ def test_port_conflict(call_ray_stop_only, shutdown_only): sock.close() -@pytest.mark.skipif( - sys.version_info < (3, 5, 3), reason="requires python3.5.3 or higher") def test_dashboard(shutdown_only): addresses = ray.init(include_dashboard=True, num_cpus=1) dashboard_url = addresses["webui_url"] @@ -121,8 +139,32 @@ def test_dashboard(shutdown_only): f"Dashboard output log: {out_log}\n") -if __name__ == "__main__": - import sys +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "metrics_export_port": 6379, + "_system_config": { + "agent_restart_interval_ms": 10, + "agent_max_restart_count": 5 + } + }], + indirect=True) +def test_dashboard_agent_restart(ray_start_cluster_head, error_pubsub): + """Test that when the agent fails to start many times in a row + if the error message is suppressed correctly without spamming + the driver. + """ + # Choose a duplicated port for the agent so that it will crash. + p = error_pubsub + errors = get_error_message( + p, 1, ray_constants.DASHBOARD_AGENT_DIED_ERROR, timeout=10) + for e in errors: + assert ("There are 2 possible problems " + "if you see this error." in e.error_message) + # Make sure the agent process is not started anymore. + cluster = ray_start_cluster_head + wait_for_condition(lambda: search_agents(cluster) is None) + +if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 54411b789..ee71fc54e 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -310,6 +310,9 @@ RAY_CONFIG(uint32_t, agent_restart_interval_ms, 1000) /// Wait timeout for dashboard agent register. RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000) +/// Max restart count for the dashboard agent. +RAY_CONFIG(uint32_t, agent_max_restart_count, 5) + /// If the agent manager fails to communicate with the dashboard agent, we will retry /// after this interval. RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000); diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index 55fe1392f..3239bca9a 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -36,6 +36,8 @@ void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request, RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_ << ", port: " << agent_port_ << ", pid: " << agent_pid_; reply->set_status(rpc::AGENT_RPC_STATUS_OK); + // Reset the restart count after registration is done. + agent_restart_count_ = 0; send_reply_callback(ray::Status::OK(), nullptr, nullptr); } @@ -65,14 +67,16 @@ void AgentManager::StartAgent() { ProcessEnvironment env; env.insert({"RAY_NODE_ID", options_.node_id.Hex()}); env.insert({"RAY_RAYLET_PID", std::to_string(getpid())}); + // Report the restart count to the agent so that we can decide whether or not + // report the error message to drivers. + env.insert({"RESTART_COUNT", std::to_string(agent_restart_count_)}); + env.insert({"MAX_RESTART_COUNT", + std::to_string(RayConfig::instance().agent_max_restart_count())}); Process child(argv.data(), nullptr, ec, false, env); if (!child.IsValid() || ec) { // The worker failed to start. This is a fatal error. RAY_LOG(FATAL) << "Failed to start agent with return value " << ec << ": " << ec.message(); - RAY_UNUSED(delay_executor_([this] { StartAgent(); }, - RayConfig::instance().agent_restart_interval_ms())); - return; } std::thread monitor_thread([this, child]() mutable { @@ -101,8 +105,22 @@ void AgentManager::StartAgent() { .WithField("pid", agent_pid_) << "Agent process with pid " << child.GetId() << " exit, return value " << exit_code; - RAY_UNUSED(delay_executor_([this] { StartAgent(); }, - RayConfig::instance().agent_restart_interval_ms())); + if (agent_restart_count_ < RayConfig::instance().agent_max_restart_count()) { + RAY_UNUSED(delay_executor_( + [this] { + agent_restart_count_++; + StartAgent(); + }, + // Retrying with exponential backoff + RayConfig::instance().agent_restart_interval_ms() * + std::pow(2, (agent_restart_count_ + 1)))); + } else { + RAY_LOG(INFO) << "Agent has failed " + << RayConfig::instance().agent_max_restart_count() + << " times in a row without registering the agent. This is highly " + "likely there's a bug in the dashboard agent. Please check out " + "the dashboard_agent.log file."; + } }); monitor_thread.detach(); } diff --git a/src/ray/raylet/agent_manager.h b/src/ray/raylet/agent_manager.h index bb12df0f6..1d71f3368 100644 --- a/src/ray/raylet/agent_manager.h +++ b/src/ray/raylet/agent_manager.h @@ -80,6 +80,8 @@ class AgentManager : public rpc::AgentManagerServiceHandler { Options options_; pid_t agent_pid_ = 0; int agent_port_ = 0; + /// The number of times the agent is restarted. + std::atomic agent_restart_count_ = 0; std::string agent_ip_address_; DelayExecutorFn delay_executor_; RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b988f9e2f..e6d04e2c2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -373,7 +373,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self }, /*runtime_env_agent_factory=*/ [this](const std::string &ip_address, int port) { - RAY_CHECK(!ip_address.empty() && port != 0); + RAY_CHECK(!ip_address.empty() && port != 0) + << "ip_address: " << ip_address << " port: " << port; return std::shared_ptr( new rpc::RuntimeEnvAgentClient(ip_address, port, client_call_manager_)); });