From ac53e2f857c813b06b29ba8be1caa13275346267 Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Tue, 15 Dec 2020 03:42:00 +0800 Subject: [PATCH] [GCS]Tell dead nodes to commit suicide (#12792) * [GCS]Tell dead nodes to commit suicide * fix comment, add ut --- .../gcs/gcs_server/gcs_heartbeat_manager.cc | 6 +-- .../gcs_server/test/gcs_server_rpc_test.cc | 40 +++++++++++++++++++ src/ray/raylet/node_manager.cc | 6 ++- 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc index 592500e72..b16383097 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc @@ -61,9 +61,9 @@ void GcsHeartbeatManager::HandleReportHeartbeat( NodeID node_id = NodeID::FromBinary(request.heartbeat().node_id()); auto iter = heartbeats_.find(node_id); if (iter == heartbeats_.end()) { - // Ignore this heartbeat as the node is not registered. - // TODO(Shanly): Maybe we should reply the raylet with an error. So the raylet can - // crash itself as soon as possible. + // Reply the raylet with an error so the raylet can crash itself. + GCS_RPC_SEND_REPLY(send_reply_callback, reply, + Status::Disconnected("Node has been dead")); return; } diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index c2f4657e0..fd2084168 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -490,6 +490,46 @@ TEST_F(GcsServerTest, TestNodeInfo) { rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD); } +TEST_F(GcsServerTest, TestHeartbeatWithNoRegistering) { + // Create gcs node info + auto gcs_node_info = Mocker::GenNodeInfo(); + + // Report heartbeat wit no registering + rpc::ReportHeartbeatRequest report_heartbeat_request; + report_heartbeat_request.mutable_heartbeat()->set_node_id(gcs_node_info->node_id()); + std::promise disconnected; + client_->ReportHeartbeat( + report_heartbeat_request, + [&disconnected](const Status &status, const rpc::ReportHeartbeatReply &reply) { + if (status.IsDisconnected()) { + disconnected.set_value(true); + } + }); + WaitReady(disconnected.get_future(), timeout_ms_); + + // Register node info + rpc::RegisterNodeRequest register_node_info_request; + register_node_info_request.mutable_node_info()->CopyFrom(*gcs_node_info); + ASSERT_TRUE(RegisterNode(register_node_info_request)); + std::vector node_info_list = GetAllNodeInfo(); + ASSERT_TRUE(node_info_list.size() == 1); + ASSERT_TRUE(node_info_list[0].state() == + rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE); + + // Report heartbeat + report_heartbeat_request.mutable_heartbeat()->set_node_id(gcs_node_info->node_id()); + ASSERT_TRUE(ReportHeartbeat(report_heartbeat_request)); + + // Unregister node info + rpc::UnregisterNodeRequest unregister_node_info_request; + unregister_node_info_request.set_node_id(gcs_node_info->node_id()); + ASSERT_TRUE(UnregisterNode(unregister_node_info_request)); + node_info_list = GetAllNodeInfo(); + ASSERT_TRUE(node_info_list.size() == 1); + ASSERT_TRUE(node_info_list[0].state() == + rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD); +} + TEST_F(GcsServerTest, TestObjectInfo) { // Create object table data ObjectID object_id = ObjectID::FromRandom(); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 277e3c9df..3119bd8d4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -413,7 +413,11 @@ void NodeManager::Heartbeat() { auto heartbeat_data = std::make_shared(); heartbeat_data->set_node_id(self_node_id_.Binary()); RAY_CHECK_OK( - gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, /*done*/ nullptr)); + gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, [](Status status) { + if (status.IsDisconnected()) { + RAY_LOG(FATAL) << "This node has beem marked as dead."; + } + })); if (debug_dump_period_ > 0 && static_cast(now_ms - last_debug_dump_at_ms_) > debug_dump_period_) {