[GCS]Fix bug that gcs client does not set last_resource_usage_ (#13856)

This commit is contained in:
fangfengbin 2021-02-05 11:51:25 +08:00 committed by GitHub
parent fb89f9c2c8
commit 8a5999c12a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 19 additions and 12 deletions

View file

@ -565,7 +565,7 @@ class NodeResourceInfoAccessor {
virtual void AsyncReReportResourceUsage() = 0;
/// Return resources in last report. Used by light heartbeat.
std::shared_ptr<SchedulingResources> &GetLastResourceUsage() {
const std::shared_ptr<SchedulingResources> &GetLastResourceUsage() {
return last_resource_usage_;
}
@ -589,7 +589,6 @@ class NodeResourceInfoAccessor {
protected:
NodeResourceInfoAccessor() = default;
private:
/// Cache which stores resource usage in last report used to check if they are changed.
/// Used by light resource usage report.
std::shared_ptr<SchedulingResources> last_resource_usage_ =

View file

@ -707,6 +707,12 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncUpdateResources(
Status ServiceBasedNodeResourceInfoAccessor::AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr, const StatusCallback &callback) {
absl::MutexLock lock(&mutex_);
last_resource_usage_->SetAvailableResources(
ResourceSet(MapFromProtobuf(data_ptr->resources_available())));
last_resource_usage_->SetTotalResources(
ResourceSet(MapFromProtobuf(data_ptr->resources_total())));
last_resource_usage_->SetLoadResources(
ResourceSet(MapFromProtobuf(data_ptr->resource_load())));
cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,

View file

@ -715,8 +715,16 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeResourceUsage) {
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
resource->set_should_global_gc(true);
std::string resource_name = "CPU";
double resource_value = 1.0;
(*resource->mutable_resources_total())[resource_name] = resource_value;
ASSERT_TRUE(ReportResourceUsage(resource));
WaitForExpectedCount(resource_batch_count, 1);
// Get and check last report resource usage.
auto last_resource_usage = gcs_client_->NodeResources().GetLastResourceUsage();
ASSERT_EQ(last_resource_usage->GetTotalResources().GetResource(resource_name),
resource_value);
}
TEST_F(ServiceBasedGcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) {

View file

@ -116,8 +116,6 @@ void NodeManager::FillResourceUsage(std::shared_ptr<rpc::ResourcesData> resource
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
}
// Add resource load by shape. This will be used by the new autoscaler.

View file

@ -387,7 +387,7 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
///
/// \param gcs_resources: The remote cache from gcs.
void UpdateLastResourceUsage(
std::shared_ptr<SchedulingResources> gcs_resources) override;
const std::shared_ptr<SchedulingResources> gcs_resources) override;
/// Return human-readable string for this scheduler state.
std::string DebugString() const;

View file

@ -54,7 +54,7 @@ class ClusterResourceSchedulerInterface {
///
/// \param gcs_resources: The remote cache from gcs.
virtual void UpdateLastResourceUsage(
std::shared_ptr<SchedulingResources> gcs_resources) {}
const std::shared_ptr<SchedulingResources> gcs_resources) {}
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in

View file

@ -71,8 +71,6 @@ void OldClusterResourceScheduler::FillResourceUsage(
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_->SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
}
if (!last_heartbeat_resources_->GetAvailableResources().IsEqual(
@ -83,8 +81,6 @@ void OldClusterResourceScheduler::FillResourceUsage(
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_->SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
}

View file

@ -23,7 +23,7 @@ class OldClusterResourceScheduler : public ClusterResourceSchedulerInterface {
explicit OldClusterResourceScheduler(
const NodeID &self_node_id, ResourceIdSet &local_available_resources,
std::unordered_map<NodeID, SchedulingResources> &cluster_resource_map,
std::shared_ptr<SchedulingResources> last_heartbeat_resources);
const std::shared_ptr<SchedulingResources> last_heartbeat_resources);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
@ -67,6 +67,6 @@ class OldClusterResourceScheduler : public ClusterResourceSchedulerInterface {
std::string self_node_id_string_;
ResourceIdSet &local_available_resources_;
std::unordered_map<NodeID, SchedulingResources> &cluster_resource_map_;
std::shared_ptr<SchedulingResources> last_heartbeat_resources_;
const std::shared_ptr<SchedulingResources> last_heartbeat_resources_;
};
} // namespace ray