diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 54453343c..fbcaee68b 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -56,9 +56,9 @@ void GcsNodeManager::NodeFailureDetector::HandleHeartbeat( iter->second = num_heartbeats_timeout_; if (!light_heartbeat_enabled_ || heartbeat_data.should_global_gc() || - heartbeat_data.resources_available_size() > 0 || heartbeat_data.resources_total_size() > 0 || - heartbeat_data.resource_load_size() > 0) { + heartbeat_data.resources_available_changed() || + heartbeat_data.resource_load_changed()) { heartbeat_buffer_[node_id] = heartbeat_data; } } @@ -459,9 +459,12 @@ void GcsNodeManager::StartNodeFailureDetector() { void GcsNodeManager::UpdateNodeRealtimeResources( const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) { - auto resources_available = MapFromProtobuf(heartbeat.resources_available()); - cluster_realtime_resources_[node_id] = - std::make_shared(resources_available); + if (!RayConfig::instance().light_heartbeat_enabled() || + heartbeat.resources_available_changed()) { + auto resources_available = MapFromProtobuf(heartbeat.resources_available()); + cluster_realtime_resources_[node_id] = + std::make_shared(resources_available); + } } const absl::flat_hash_map> diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 6c3157d30..2bc54e865 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -281,16 +281,22 @@ message HeartbeatTableData { bytes client_id = 1; // Resource capacity currently available on this node manager. map resources_available = 2; + // Indicates whether avaialbe resources is changed. Only used when + // light heartbeat enabled. + bool resources_available_changed = 3; // Total resource capacity configured for this node manager. - map resources_total = 3; + map resources_total = 4; // Aggregate outstanding resource load on this node manager. - map resource_load = 4; + map resource_load = 5; + // Indicates whether resource load is changed. Only used when + // light heartbeat enabled. + bool resource_load_changed = 6; // The resource load on this node, sorted by resource shape. - ResourceLoad resource_load_by_shape = 5; + ResourceLoad resource_load_by_shape = 7; // Object IDs that are in use by workers on this node manager's node. - repeated bytes active_object_id = 6; + repeated bytes active_object_id = 8; // Whether this node manager is requesting global GC. - bool should_global_gc = 7; + bool should_global_gc = 9; } message HeartbeatBatchTableData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 8b42e5efa..c7d618408 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -20,6 +20,7 @@ #include "ray/common/buffer.h" #include "ray/common/common_protocol.h" +#include "ray/common/constants.h" #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/gcs/pb_util.h" @@ -391,17 +392,6 @@ void NodeManager::Heartbeat() { // TODO(atumanov): implement a ResourceSet const_iterator. // If light heartbeat enabled, we only set filed that represent resources changed. if (light_heartbeat_enabled_) { - if (!last_heartbeat_resources_.GetAvailableResources().IsEqual( - local_resources.GetAvailableResources())) { - for (const auto &resource_pair : - local_resources.GetAvailableResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources_.SetAvailableResources( - ResourceSet(local_resources.GetAvailableResources())); - } - if (!last_heartbeat_resources_.GetTotalResources().IsEqual( local_resources.GetTotalResources())) { for (const auto &resource_pair : @@ -413,9 +403,22 @@ void NodeManager::Heartbeat() { ResourceSet(local_resources.GetTotalResources())); } + if (!last_heartbeat_resources_.GetAvailableResources().IsEqual( + local_resources.GetAvailableResources())) { + heartbeat_data->set_resources_available_changed(true); + for (const auto &resource_pair : + local_resources.GetAvailableResources().GetResourceMap()) { + (*heartbeat_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources_.SetAvailableResources( + ResourceSet(local_resources.GetAvailableResources())); + } + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); if (!last_heartbeat_resources_.GetLoadResources().IsEqual( local_resources.GetLoadResources())) { + heartbeat_data->set_resource_load_changed(true); for (const auto &resource_pair : local_resources.GetLoadResources().GetResourceMap()) { (*heartbeat_data->mutable_resource_load())[resource_pair.first] = @@ -426,21 +429,17 @@ void NodeManager::Heartbeat() { } } else { // If light heartbeat disabled, we send whole resources information every time. - for (const auto &resource_pair : - local_resources.GetAvailableResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources_.SetAvailableResources( - ResourceSet(local_resources.GetAvailableResources())); - for (const auto &resource_pair : local_resources.GetTotalResources().GetResourceMap()) { (*heartbeat_data->mutable_resources_total())[resource_pair.first] = resource_pair.second; } - last_heartbeat_resources_.SetTotalResources( - ResourceSet(local_resources.GetTotalResources())); + + for (const auto &resource_pair : + local_resources.GetAvailableResources().GetResourceMap()) { + (*heartbeat_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); for (const auto &resource_pair : @@ -448,8 +447,6 @@ void NodeManager::Heartbeat() { (*heartbeat_data->mutable_resource_load())[resource_pair.first] = resource_pair.second; } - last_heartbeat_resources_.SetLoadResources( - ResourceSet(local_resources.GetLoadResources())); } // Add resource load by shape. This will be used by the new autoscaler. @@ -953,11 +950,11 @@ void NodeManager::HeartbeatAdded(const NodeID &client_id, ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total())); remote_resources.SetTotalResources(std::move(remote_total)); } - if (heartbeat_data.resources_available_size() > 0) { + if (heartbeat_data.resource_load_changed()) { ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available())); remote_resources.SetAvailableResources(std::move(remote_available)); } - if (heartbeat_data.resource_load_size() > 0) { + if (heartbeat_data.resource_load_changed()) { ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load())); // Extract the load information and save it locally. remote_resources.SetLoadResources(std::move(remote_load));