[Dashboard] Refine the dashboard restart logic. (#18973)

* in progress

* Refine the dashboard agent retry logic

* refine

* done

* lint
This commit is contained in:
SangBin Cho 2021-10-04 21:01:51 +09:00 committed by GitHub
parent b4300dd532
commit 7fcf1bf57e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 109 additions and 23 deletions

View file

@ -362,14 +362,34 @@ if __name__ == "__main__":
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.run_until_complete(agent.run()) loop.run_until_complete(agent.run())
except Exception as e: except Exception as e:
# Something went wrong, so push an error to all drivers. # All these env vars should be available because
redis_client = ray._private.services.create_redis_client( # they are provided by the parent raylet.
args.redis_address, password=args.redis_password) restart_count = os.environ["RESTART_COUNT"]
traceback_str = ray._private.utils.format_error_message( max_restart_count = os.environ["MAX_RESTART_COUNT"]
traceback.format_exc()) raylet_pid = os.environ["RAY_RAYLET_PID"]
message = ("The agent on node {} failed with the following " node_ip = args.node_ip_address
"error:\n{}".format(platform.uname()[1], traceback_str)) if restart_count >= max_restart_count:
ray._private.utils.push_error_to_driver_through_redis( # Agent is failed to be started many times.
redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR, message) # Push an error to all drivers, so that users can know the
logger.exception(message) # impact of the issue.
raise e redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray._private.utils.format_error_message(
traceback.format_exc())
message = (
f"(ip={node_ip}) "
f"The agent on node {platform.uname()[1]} failed to "
f"be restarted {max_restart_count} "
"times. There are 3 possible problems if you see this error."
"\n 1. The dashboard might not display correct "
"information on this node."
"\n 2. Metrics on this node won't be reported."
"\n 3. runtime_env APIs won't work."
"\nCheck out the `dashboard_agent.log` to see the "
"detailed failure messages.")
ray._private.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR,
message)
logger.error(message)
logger.exception(e)
exit(1)

View file

@ -107,7 +107,7 @@ def test_basic(ray_start_with_dashboard):
agent_proc.kill() agent_proc.kill()
agent_proc.wait() agent_proc.wait()
# The agent will be restarted for imports failure. # The agent will be restarted for imports failure.
for x in range(50): for _ in range(300):
agent_proc = _search_agent(raylet_proc.children()) agent_proc = _search_agent(raylet_proc.children())
if agent_proc: if agent_proc:
agent_pids.add(agent_proc.pid) agent_pids.add(agent_proc.pid)

View file

@ -4,14 +4,34 @@ import subprocess
import sys import sys
import time import time
import psutil
import pytest import pytest
import requests import requests
from ray._private.test_utils import run_string_as_driver, wait_for_condition from ray._private.test_utils import (run_string_as_driver, wait_for_condition,
get_error_message)
import ray import ray
from ray import ray_constants from ray import ray_constants
def search_agents(cluster):
all_processes = cluster.head_node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
def _search_agent(processes):
for p in processes:
try:
for c in p.cmdline():
if "dashboard/agent.py" in c:
return p
except Exception:
pass
agent_proc = _search_agent(raylet_proc.children())
return agent_proc
def test_ray_start_default_port_conflict(call_ray_stop_only, shutdown_only): def test_ray_start_default_port_conflict(call_ray_stop_only, shutdown_only):
subprocess.check_call(["ray", "start", "--head"]) subprocess.check_call(["ray", "start", "--head"])
ray.init(address="auto") ray.init(address="auto")
@ -90,8 +110,6 @@ def test_port_conflict(call_ray_stop_only, shutdown_only):
sock.close() sock.close()
@pytest.mark.skipif(
sys.version_info < (3, 5, 3), reason="requires python3.5.3 or higher")
def test_dashboard(shutdown_only): def test_dashboard(shutdown_only):
addresses = ray.init(include_dashboard=True, num_cpus=1) addresses = ray.init(include_dashboard=True, num_cpus=1)
dashboard_url = addresses["webui_url"] dashboard_url = addresses["webui_url"]
@ -121,8 +139,32 @@ def test_dashboard(shutdown_only):
f"Dashboard output log: {out_log}\n") f"Dashboard output log: {out_log}\n")
if __name__ == "__main__": @pytest.mark.parametrize(
import sys "ray_start_cluster_head", [{
"metrics_export_port": 6379,
"_system_config": {
"agent_restart_interval_ms": 10,
"agent_max_restart_count": 5
}
}],
indirect=True)
def test_dashboard_agent_restart(ray_start_cluster_head, error_pubsub):
"""Test that when the agent fails to start many times in a row
if the error message is suppressed correctly without spamming
the driver.
"""
# Choose a duplicated port for the agent so that it will crash.
p = error_pubsub
errors = get_error_message(
p, 1, ray_constants.DASHBOARD_AGENT_DIED_ERROR, timeout=10)
for e in errors:
assert ("There are 2 possible problems "
"if you see this error." in e.error_message)
# Make sure the agent process is not started anymore.
cluster = ray_start_cluster_head
wait_for_condition(lambda: search_agents(cluster) is None)
if __name__ == "__main__":
import pytest import pytest
sys.exit(pytest.main(["-v", __file__])) sys.exit(pytest.main(["-v", __file__]))

View file

@ -310,6 +310,9 @@ RAY_CONFIG(uint32_t, agent_restart_interval_ms, 1000)
/// Wait timeout for dashboard agent register. /// Wait timeout for dashboard agent register.
RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000) RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000)
/// Max restart count for the dashboard agent.
RAY_CONFIG(uint32_t, agent_max_restart_count, 5)
/// If the agent manager fails to communicate with the dashboard agent, we will retry /// If the agent manager fails to communicate with the dashboard agent, we will retry
/// after this interval. /// after this interval.
RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000); RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000);

View file

@ -36,6 +36,8 @@ void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request,
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_ RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_
<< ", port: " << agent_port_ << ", pid: " << agent_pid_; << ", port: " << agent_port_ << ", pid: " << agent_pid_;
reply->set_status(rpc::AGENT_RPC_STATUS_OK); reply->set_status(rpc::AGENT_RPC_STATUS_OK);
// Reset the restart count after registration is done.
agent_restart_count_ = 0;
send_reply_callback(ray::Status::OK(), nullptr, nullptr); send_reply_callback(ray::Status::OK(), nullptr, nullptr);
} }
@ -65,14 +67,16 @@ void AgentManager::StartAgent() {
ProcessEnvironment env; ProcessEnvironment env;
env.insert({"RAY_NODE_ID", options_.node_id.Hex()}); env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
env.insert({"RAY_RAYLET_PID", std::to_string(getpid())}); env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});
// Report the restart count to the agent so that we can decide whether or not
// report the error message to drivers.
env.insert({"RESTART_COUNT", std::to_string(agent_restart_count_)});
env.insert({"MAX_RESTART_COUNT",
std::to_string(RayConfig::instance().agent_max_restart_count())});
Process child(argv.data(), nullptr, ec, false, env); Process child(argv.data(), nullptr, ec, false, env);
if (!child.IsValid() || ec) { if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error. // The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start agent with return value " << ec << ": " RAY_LOG(FATAL) << "Failed to start agent with return value " << ec << ": "
<< ec.message(); << ec.message();
RAY_UNUSED(delay_executor_([this] { StartAgent(); },
RayConfig::instance().agent_restart_interval_ms()));
return;
} }
std::thread monitor_thread([this, child]() mutable { std::thread monitor_thread([this, child]() mutable {
@ -101,8 +105,22 @@ void AgentManager::StartAgent() {
.WithField("pid", agent_pid_) .WithField("pid", agent_pid_)
<< "Agent process with pid " << child.GetId() << " exit, return value " << "Agent process with pid " << child.GetId() << " exit, return value "
<< exit_code; << exit_code;
RAY_UNUSED(delay_executor_([this] { StartAgent(); }, if (agent_restart_count_ < RayConfig::instance().agent_max_restart_count()) {
RayConfig::instance().agent_restart_interval_ms())); RAY_UNUSED(delay_executor_(
[this] {
agent_restart_count_++;
StartAgent();
},
// Retrying with exponential backoff
RayConfig::instance().agent_restart_interval_ms() *
std::pow(2, (agent_restart_count_ + 1))));
} else {
RAY_LOG(INFO) << "Agent has failed "
<< RayConfig::instance().agent_max_restart_count()
<< " times in a row without registering the agent. This is highly "
"likely there's a bug in the dashboard agent. Please check out "
"the dashboard_agent.log file.";
}
}); });
monitor_thread.detach(); monitor_thread.detach();
} }

View file

@ -80,6 +80,8 @@ class AgentManager : public rpc::AgentManagerServiceHandler {
Options options_; Options options_;
pid_t agent_pid_ = 0; pid_t agent_pid_ = 0;
int agent_port_ = 0; int agent_port_ = 0;
/// The number of times the agent is restarted.
std::atomic<uint32_t> agent_restart_count_ = 0;
std::string agent_ip_address_; std::string agent_ip_address_;
DelayExecutorFn delay_executor_; DelayExecutorFn delay_executor_;
RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_; RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_;

View file

@ -373,7 +373,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
}, },
/*runtime_env_agent_factory=*/ /*runtime_env_agent_factory=*/
[this](const std::string &ip_address, int port) { [this](const std::string &ip_address, int port) {
RAY_CHECK(!ip_address.empty() && port != 0); RAY_CHECK(!ip_address.empty() && port != 0)
<< "ip_address: " << ip_address << " port: " << port;
return std::shared_ptr<rpc::RuntimeEnvAgentClientInterface>( return std::shared_ptr<rpc::RuntimeEnvAgentClientInterface>(
new rpc::RuntimeEnvAgentClient(ip_address, port, client_call_manager_)); new rpc::RuntimeEnvAgentClient(ip_address, port, client_call_manager_));
}); });