[GCS]Only publish changed field when node dead (#13364)

* Only update changed field when node dead

* node_id missed
This commit is contained in:
Tao Wang 2021-01-18 13:28:35 +08:00 committed by GitHub
parent a4ebdbd7da
commit 3a0710130c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 31 additions and 19 deletions

View file

@ -373,7 +373,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// Register a callback to monitor removed nodes.
auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeInfo &data) {
if (data.state() == rpc::GcsNodeInfo::DEAD) {
OnNodeRemoved(data);
OnNodeRemoved(node_id);
}
};
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr));
@ -653,8 +653,7 @@ void CoreWorker::RunIOService() {
io_service_.run();
}
void CoreWorker::OnNodeRemoved(const rpc::GcsNodeInfo &node_info) {
const auto node_id = NodeID::FromBinary(node_info.node_id());
void CoreWorker::OnNodeRemoved(const NodeID &node_id) {
RAY_LOG(INFO) << "Node failure " << node_id;
const auto lost_objects = reference_counter_->ResetObjectsOnRemovedNode(node_id);
// Delete the objects from the in-memory store to indicate that they are not

View file

@ -1058,7 +1058,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
}
/// Handler if a raylet node is removed from the cluster.
void OnNodeRemoved(const rpc::GcsNodeInfo &node_info);
void OnNodeRemoved(const NodeID &node_id);
/// Request the spillage of an object that we own from the primary that hosts
/// the primary copy to spill.

View file

@ -558,7 +558,12 @@ void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_in
// Add the notification to our cache.
RAY_LOG(INFO) << "Received notification for node id = " << node_id
<< ", IsAlive = " << is_alive;
node_cache_[node_id] = node_info;
if (is_alive) {
node_cache_[node_id] = node_info;
} else {
node_cache_[node_id].set_state(rpc::GcsNodeInfo::DEAD);
node_cache_[node_id].set_timestamp(node_info.timestamp());
}
// If the notification is new, call registered callback.
if (is_notif_new) {

View file

@ -57,13 +57,17 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ
node->set_state(rpc::GcsNodeInfo::DEAD);
node->set_timestamp(current_sys_time_ms());
AddDeadNodeToCache(node);
auto node_info_delta = std::make_shared<rpc::GcsNodeInfo>();
node_info_delta->set_node_id(node->node_id());
node_info_delta->set_state(node->state());
node_info_delta->set_timestamp(node->timestamp());
auto on_done = [this, node_id, node, reply,
auto on_done = [this, node_id, node_info_delta, reply,
send_reply_callback](const Status &status) {
auto on_done = [this, node_id, node, reply,
auto on_done = [this, node_id, node_info_delta, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_pub_sub_->Publish(
NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id;
};
@ -179,10 +183,15 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id) {
node->set_state(rpc::GcsNodeInfo::DEAD);
node->set_timestamp(current_sys_time_ms());
AddDeadNodeToCache(node);
auto on_done = [this, node_id, node](const Status &status) {
auto on_done = [this, node_id, node](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
auto node_info_delta = std::make_shared<rpc::GcsNodeInfo>();
node_info_delta->set_node_id(node->node_id());
node_info_delta->set_state(node->state());
node_info_delta->set_timestamp(node->timestamp());
auto on_done = [this, node_id, node_info_delta](const Status &status) {
auto on_done = [this, node_id, node_info_delta](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr));
};
RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done));
};

View file

@ -255,7 +255,7 @@ ray::Status NodeManager::RegisterGcs() {
NodeAdded(data);
} else {
RAY_CHECK(data.state() == GcsNodeInfo::DEAD);
NodeRemoved(data);
NodeRemoved(node_id);
}
};
@ -680,10 +680,9 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) {
}));
}
void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) {
void NodeManager::NodeRemoved(const NodeID &node_id) {
// TODO(swang): If we receive a notification for our own death, clean up and
// exit immediately.
const NodeID node_id = NodeID::FromBinary(node_info.node_id());
RAY_LOG(DEBUG) << "[NodeRemoved] Received callback from node id " << node_id;
RAY_CHECK(node_id != self_node_id_)
@ -717,7 +716,7 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) {
// Clean up workers that were owned by processes that were on the failed
// node.
rpc::Address address;
address.set_raylet_id(node_info.node_id());
address.set_raylet_id(node_id.Binary());
HandleUnexpectedWorkerFailure(address);
}

View file

@ -182,9 +182,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
void NodeAdded(const GcsNodeInfo &data);
/// Handler for the removal of a GCS node.
/// \param node_info Data associated with the removed node.
/// \param node_id Id of the removed node.
/// \return Void.
void NodeRemoved(const GcsNodeInfo &node_info);
void NodeRemoved(const NodeID &node_id);
/// Handler for the addition or updation of a resource in the GCS
/// \param node_id ID of the node that created or updated resources.