[GCS]Tell dead nodes to commit suicide (#12792)

* [GCS]Tell dead nodes to commit suicide

* fix comment, add ut
This commit is contained in:
Tao Wang 2020-12-15 03:42:00 +08:00 committed by GitHub
parent becca1424d
commit ac53e2f857
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 4 deletions

View file

@ -61,9 +61,9 @@ void GcsHeartbeatManager::HandleReportHeartbeat(
NodeID node_id = NodeID::FromBinary(request.heartbeat().node_id());
auto iter = heartbeats_.find(node_id);
if (iter == heartbeats_.end()) {
// Ignore this heartbeat as the node is not registered.
// TODO(Shanly): Maybe we should reply the raylet with an error. So the raylet can
// crash itself as soon as possible.
// Reply the raylet with an error so the raylet can crash itself.
GCS_RPC_SEND_REPLY(send_reply_callback, reply,
Status::Disconnected("Node has been dead"));
return;
}

View file

@ -490,6 +490,46 @@ TEST_F(GcsServerTest, TestNodeInfo) {
rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD);
}
TEST_F(GcsServerTest, TestHeartbeatWithNoRegistering) {
// Create gcs node info
auto gcs_node_info = Mocker::GenNodeInfo();
// Report heartbeat wit no registering
rpc::ReportHeartbeatRequest report_heartbeat_request;
report_heartbeat_request.mutable_heartbeat()->set_node_id(gcs_node_info->node_id());
std::promise<bool> disconnected;
client_->ReportHeartbeat(
report_heartbeat_request,
[&disconnected](const Status &status, const rpc::ReportHeartbeatReply &reply) {
if (status.IsDisconnected()) {
disconnected.set_value(true);
}
});
WaitReady(disconnected.get_future(), timeout_ms_);
// Register node info
rpc::RegisterNodeRequest register_node_info_request;
register_node_info_request.mutable_node_info()->CopyFrom(*gcs_node_info);
ASSERT_TRUE(RegisterNode(register_node_info_request));
std::vector<rpc::GcsNodeInfo> node_info_list = GetAllNodeInfo();
ASSERT_TRUE(node_info_list.size() == 1);
ASSERT_TRUE(node_info_list[0].state() ==
rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE);
// Report heartbeat
report_heartbeat_request.mutable_heartbeat()->set_node_id(gcs_node_info->node_id());
ASSERT_TRUE(ReportHeartbeat(report_heartbeat_request));
// Unregister node info
rpc::UnregisterNodeRequest unregister_node_info_request;
unregister_node_info_request.set_node_id(gcs_node_info->node_id());
ASSERT_TRUE(UnregisterNode(unregister_node_info_request));
node_info_list = GetAllNodeInfo();
ASSERT_TRUE(node_info_list.size() == 1);
ASSERT_TRUE(node_info_list[0].state() ==
rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD);
}
TEST_F(GcsServerTest, TestObjectInfo) {
// Create object table data
ObjectID object_id = ObjectID::FromRandom();

View file

@ -413,7 +413,11 @@ void NodeManager::Heartbeat() {
auto heartbeat_data = std::make_shared<HeartbeatTableData>();
heartbeat_data->set_node_id(self_node_id_.Binary());
RAY_CHECK_OK(
gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, /*done*/ nullptr));
gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, [](Status status) {
if (status.IsDisconnected()) {
RAY_LOG(FATAL) << "This node has beem marked as dead.";
}
}));
if (debug_dump_period_ > 0 &&
static_cast<int64_t>(now_ms - last_debug_dump_at_ms_) > debug_dump_period_) {