[runtime env] support raylet sharing fate with agent (#22382)

- Remove the agent restart feature. 
- Raylet shares fate with agent to make the failover logic easier.
Refer to issue https://github.com/ray-project/ray/issues/21695#issuecomment-1032161528
This commit is contained in:
Guyang Song 2022-02-21 18:16:21 +08:00 committed by GitHub
parent 5783cdb254
commit 902243fb03
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 21 deletions

View file

@ -81,6 +81,25 @@ def prepare_test_files():
cleanup_test_files()
def search_agent(processes):
for p in processes:
try:
for c in p.cmdline():
if os.path.join("dashboard", "agent.py") in c:
return p
except Exception:
pass
def check_agent_register(raylet_proc, agent_pid):
# Check if agent register is OK.
for x in range(5):
logger.info("Check agent is alive.")
agent_proc = search_agent(raylet_proc.children())
assert agent_proc.pid == agent_pid
time.sleep(1)
@pytest.mark.parametrize(
"ray_start_with_dashboard",
[{"_system_config": {"agent_register_timeout_ms": 5000}}],
@ -107,18 +126,9 @@ def test_basic(ray_start_with_dashboard):
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
def _search_agent(processes):
for p in processes:
try:
for c in p.cmdline():
if os.path.join("dashboard", "agent.py") in c:
return p
except Exception:
pass
# Test for bad imports, the agent should be restarted.
logger.info("Test for bad imports.")
agent_proc = _search_agent(raylet_proc.children())
agent_proc = search_agent(raylet_proc.children())
prepare_test_files()
agent_pids = set()
try:
@ -127,7 +137,7 @@ def test_basic(ray_start_with_dashboard):
agent_proc.wait()
# The agent will be restarted for imports failure.
for _ in range(300):
agent_proc = _search_agent(raylet_proc.children())
agent_proc = search_agent(raylet_proc.children())
if agent_proc:
agent_pids.add(agent_proc.pid)
# The agent should be restarted,
@ -139,23 +149,18 @@ def test_basic(ray_start_with_dashboard):
cleanup_test_files()
assert len(agent_pids) > 1, agent_pids
agent_proc = _search_agent(raylet_proc.children())
agent_proc = search_agent(raylet_proc.children())
if agent_proc:
agent_proc.kill()
agent_proc.wait()
logger.info("Test agent register is OK.")
wait_for_condition(lambda: _search_agent(raylet_proc.children()))
wait_for_condition(lambda: search_agent(raylet_proc.children()))
assert dashboard_proc.status() in [psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING]
agent_proc = _search_agent(raylet_proc.children())
agent_proc = search_agent(raylet_proc.children())
agent_pid = agent_proc.pid
# Check if agent register is OK.
for x in range(5):
logger.info("Check agent is alive.")
agent_proc = _search_agent(raylet_proc.children())
assert agent_proc.pid == agent_pid
time.sleep(1)
check_agent_register(raylet_proc, agent_pid)
# The agent should be dead if raylet exits.
raylet_proc.kill()
@ -180,6 +185,48 @@ def test_basic(ray_start_with_dashboard):
assert agent_ports is not None
def test_raylet_and_agent_share_fate(shutdown_only):
"""Test raylet and agent share fate."""
system_config = {
"raylet_shares_fate_with_agent": True,
"agent_max_restart_count": 0,
}
ray.init(include_dashboard=True, _system_config=system_config)
all_processes = ray.worker._global_node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
wait_for_condition(lambda: search_agent(raylet_proc.children()))
agent_proc = search_agent(raylet_proc.children())
agent_pid = agent_proc.pid
check_agent_register(raylet_proc, agent_pid)
# The agent should be dead if raylet exits.
raylet_proc.kill()
raylet_proc.wait()
agent_proc.wait(5)
ray.shutdown()
ray.init(include_dashboard=True, _system_config=system_config)
all_processes = ray.worker._global_node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
wait_for_condition(lambda: search_agent(raylet_proc.children()))
agent_proc = search_agent(raylet_proc.children())
agent_pid = agent_proc.pid
check_agent_register(raylet_proc, agent_pid)
# The raylet should be dead if agent exits.
agent_proc.kill()
agent_proc.wait()
raylet_proc.wait(5)
@pytest.mark.parametrize(
"ray_start_with_dashboard",
[

View file

@ -349,6 +349,9 @@ RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000)
/// Max restart count for the dashboard agent.
RAY_CONFIG(uint32_t, agent_max_restart_count, 5)
/// Whether to fail raylet when agent fails.
RAY_CONFIG(bool, raylet_shares_fate_with_agent, false)
/// If the agent manager fails to communicate with the dashboard agent, we will retry
/// after this interval.
RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000);

View file

@ -21,6 +21,7 @@
#include "ray/util/event_label.h"
#include "ray/util/logging.h"
#include "ray/util/process.h"
#include "ray/util/util.h"
namespace ray {
namespace raylet {
@ -111,7 +112,7 @@ void AgentManager::StartAgent() {
RayConfig::instance().agent_restart_interval_ms() *
std::pow(2, (agent_restart_count_ + 1))));
} else {
RAY_LOG(WARNING) << "Agent has failed "
RAY_LOG(WARNING) << "Agent has failed to restart for "
<< RayConfig::instance().agent_max_restart_count()
<< " times in a row without registering the agent. This is highly "
"likely there's a bug in the dashboard agent. Please check out "
@ -122,6 +123,13 @@ void AgentManager::StartAgent() {
<< "Agent failed to be restarted "
<< RayConfig::instance().agent_max_restart_count()
<< " times. Agent won't be restarted.";
if (RayConfig::instance().raylet_shares_fate_with_agent()) {
RAY_LOG(ERROR) << "Raylet exits immediately because the ray agent has failed. "
"Raylet fate shares with the agent. It can happen because the "
"Ray agent is unexpectedly killed or failed. See "
"`dashboard_agent.log` for the root cause.";
QuickExit();
}
}
});
monitor_thread.detach();