[GCS]Use new flag to indicate whether resources are updated and update realtime resources view (#10906)

* Handle resources turning empty and update realtime view

* add up missing flag

* per comments

* use flag instead of special key to represent if resource changed

* Update src/ray/protobuf/gcs.proto

Co-authored-by: fangfengbin <869218239a@zju.edu.cn>

* fix lint in gcs.proto

* fix embarrassed mistake

Co-authored-by: fangfengbin <869218239a@zju.edu.cn>
This commit is contained in:
Tao Wang 2020-09-28 16:57:27 +08:00 committed by GitHub
parent 2e41a29c8f
commit 25ac8f9aa5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 35 deletions

View file

@ -56,9 +56,9 @@ void GcsNodeManager::NodeFailureDetector::HandleHeartbeat(
iter->second = num_heartbeats_timeout_;
if (!light_heartbeat_enabled_ || heartbeat_data.should_global_gc() ||
heartbeat_data.resources_available_size() > 0 ||
heartbeat_data.resources_total_size() > 0 ||
heartbeat_data.resource_load_size() > 0) {
heartbeat_data.resources_available_changed() ||
heartbeat_data.resource_load_changed()) {
heartbeat_buffer_[node_id] = heartbeat_data;
}
}
@ -459,9 +459,12 @@ void GcsNodeManager::StartNodeFailureDetector() {
void GcsNodeManager::UpdateNodeRealtimeResources(
const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) {
auto resources_available = MapFromProtobuf(heartbeat.resources_available());
cluster_realtime_resources_[node_id] =
std::make_shared<ResourceSet>(resources_available);
if (!RayConfig::instance().light_heartbeat_enabled() ||
heartbeat.resources_available_changed()) {
auto resources_available = MapFromProtobuf(heartbeat.resources_available());
cluster_realtime_resources_[node_id] =
std::make_shared<ResourceSet>(resources_available);
}
}
const absl::flat_hash_map<NodeID, std::shared_ptr<ResourceSet>>

View file

@ -281,16 +281,22 @@ message HeartbeatTableData {
bytes client_id = 1;
// Resource capacity currently available on this node manager.
map<string, double> resources_available = 2;
// Indicates whether avaialbe resources is changed. Only used when
// light heartbeat enabled.
bool resources_available_changed = 3;
// Total resource capacity configured for this node manager.
map<string, double> resources_total = 3;
map<string, double> resources_total = 4;
// Aggregate outstanding resource load on this node manager.
map<string, double> resource_load = 4;
map<string, double> resource_load = 5;
// Indicates whether resource load is changed. Only used when
// light heartbeat enabled.
bool resource_load_changed = 6;
// The resource load on this node, sorted by resource shape.
ResourceLoad resource_load_by_shape = 5;
ResourceLoad resource_load_by_shape = 7;
// Object IDs that are in use by workers on this node manager's node.
repeated bytes active_object_id = 6;
repeated bytes active_object_id = 8;
// Whether this node manager is requesting global GC.
bool should_global_gc = 7;
bool should_global_gc = 9;
}
message HeartbeatBatchTableData {

View file

@ -20,6 +20,7 @@
#include "ray/common/buffer.h"
#include "ray/common/common_protocol.h"
#include "ray/common/constants.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/gcs/pb_util.h"
@ -391,17 +392,6 @@ void NodeManager::Heartbeat() {
// TODO(atumanov): implement a ResourceSet const_iterator.
// If light heartbeat enabled, we only set filed that represent resources changed.
if (light_heartbeat_enabled_) {
if (!last_heartbeat_resources_.GetAvailableResources().IsEqual(
local_resources.GetAvailableResources())) {
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
if (!last_heartbeat_resources_.GetTotalResources().IsEqual(
local_resources.GetTotalResources())) {
for (const auto &resource_pair :
@ -413,9 +403,22 @@ void NodeManager::Heartbeat() {
ResourceSet(local_resources.GetTotalResources()));
}
if (!last_heartbeat_resources_.GetAvailableResources().IsEqual(
local_resources.GetAvailableResources())) {
heartbeat_data->set_resources_available_changed(true);
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
if (!last_heartbeat_resources_.GetLoadResources().IsEqual(
local_resources.GetLoadResources())) {
heartbeat_data->set_resource_load_changed(true);
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
@ -426,21 +429,17 @@ void NodeManager::Heartbeat() {
}
} else {
// If light heartbeat disabled, we send whole resources information every time.
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
for (const auto &resource_pair :
@ -448,8 +447,6 @@ void NodeManager::Heartbeat() {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
}
// Add resource load by shape. This will be used by the new autoscaler.
@ -953,11 +950,11 @@ void NodeManager::HeartbeatAdded(const NodeID &client_id,
ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total()));
remote_resources.SetTotalResources(std::move(remote_total));
}
if (heartbeat_data.resources_available_size() > 0) {
if (heartbeat_data.resource_load_changed()) {
ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available()));
remote_resources.SetAvailableResources(std::move(remote_available));
}
if (heartbeat_data.resource_load_size() > 0) {
if (heartbeat_data.resource_load_changed()) {
ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load()));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));