Check for raylet PID as ppid in dashboard agent fate-sharing (#12867)

This commit is contained in:
Edward Oakes 2020-12-15 12:13:11 -06:00 committed by Max Fitton
parent ffd7d121ad
commit 40f77101d5
3 changed files with 24 additions and 12 deletions

View file

@ -62,7 +62,9 @@ class DashboardAgent(object):
self.object_store_name = object_store_name
self.raylet_name = raylet_name
self.node_id = os.environ["RAY_NODE_ID"]
assert self.node_id, "Empty node id (RAY_NODE_ID)."
self.ppid = int(os.environ["RAY_RAYLET_PID"])
assert self.ppid > 0
logger.info("Parent pid is %s", self.ppid)
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), ))
self.grpc_port = self.server.add_insecure_port(
f"[::]:{self.dashboard_agent_port}")
@ -89,17 +91,21 @@ class DashboardAgent(object):
async def run(self):
async def _check_parent():
"""Check if raylet is dead."""
curr_proc = psutil.Process()
while True:
parent = curr_proc.parent()
if parent is None or parent.pid == 1:
logger.error("raylet is dead, agent will die because "
"it fate-shares with raylet.")
sys.exit(0)
await asyncio.sleep(
dashboard_consts.
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS)
"""Check if raylet is dead and fate-share if it is."""
try:
curr_proc = psutil.Process()
while True:
parent = curr_proc.parent()
if (parent is None or parent.pid == 1
or self.ppid != parent.pid):
logger.error("Raylet is dead, exiting.")
sys.exit(0)
await asyncio.sleep(
dashboard_consts.
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS)
except Exception:
logger.error("Failed to check parent PID, exiting.")
sys.exit(1)
check_parent_task = create_task(_check_parent())

View file

@ -137,6 +137,11 @@ def test_basic(ray_start_with_dashboard):
assert agent_proc.pid == agent_pid
time.sleep(1)
# The agent should be dead if raylet exits.
raylet_proc.kill()
raylet_proc.wait()
agent_proc.wait(5)
# Check redis keys are set.
logger.info("Check redis keys are set.")
dashboard_address = client.get(dashboard_consts.REDIS_KEY_DASHBOARD)

View file

@ -60,6 +60,7 @@ void AgentManager::StartAgent() {
// Set node id to agent.
ProcessEnvironment env;
env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});
Process child(argv.data(), nullptr, ec, false, env);
if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error.