diff --git a/src/mock/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/mock/ray/raylet/scheduling/cluster_resource_scheduler.h deleted file mode 100644 index 2eb93a662..000000000 --- a/src/mock/ray/raylet/scheduling/cluster_resource_scheduler.h +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2021 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace ray { - -class MockClusterResourceScheduler : public ClusterResourceScheduler { - public: - MOCK_METHOD(bool, UpdateNode, - (const std::string &node_id_string, - const rpc::ResourcesData &resource_data), - (override)); - MOCK_METHOD(bool, RemoveNode, (const std::string &node_id_string), (override)); - MOCK_METHOD(void, UpdateResourceCapacity, - (const std::string &node_name, const std::string &resource_name, - double resource_total), - (override)); - MOCK_METHOD(void, DeleteResource, - (const std::string &node_name, const std::string &resource_name), - (override)); - MOCK_METHOD(std::string, GetLocalResourceViewString, (), (const, override)); - MOCK_METHOD(void, FillResourceUsage, (rpc::ResourcesData & resources_data), (override)); - MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals, - (const absl::flat_hash_map &resource_map_filter), - (const, override)); - MOCK_METHOD(void, UpdateLastResourceUsage, - (const std::shared_ptr gcs_resources), (override)); - MOCK_METHOD(double, GetLocalAvailableCpus, (), (const, override)); -}; - -} // namespace ray diff --git a/src/mock/ray/raylet/scheduling/cluster_resource_scheduler_interface.h b/src/mock/ray/raylet/scheduling/cluster_resource_scheduler_interface.h deleted file mode 100644 index c4247d793..000000000 --- a/src/mock/ray/raylet/scheduling/cluster_resource_scheduler_interface.h +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2021 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace ray { - -class MockClusterResourceSchedulerInterface : public ClusterResourceSchedulerInterface { - public: - MOCK_METHOD(bool, RemoveNode, (const std::string &node_id_string), (override)); - MOCK_METHOD(bool, UpdateNode, - (const std::string &node_id_string, - const rpc::ResourcesData &resource_data), - (override)); - MOCK_METHOD(void, UpdateResourceCapacity, - (const std::string &node_id_string, const std::string &resource_name, - double resource_total), - (override)); - MOCK_METHOD(void, DeleteResource, - (const std::string &node_id_string, const std::string &resource_name), - (override)); - MOCK_METHOD(void, UpdateLastResourceUsage, - (const std::shared_ptr gcs_resources), (override)); - MOCK_METHOD(void, FillResourceUsage, (rpc::ResourcesData & data), (override)); - MOCK_METHOD(double, GetLocalAvailableCpus, (), (const, override)); - MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals, - (const absl::flat_hash_map &resource_map_filter), - (const, override)); - MOCK_METHOD(std::string, GetLocalResourceViewString, (), (const, override)); -}; - -} // namespace ray diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d122c6dce..5a79e0c55 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -760,7 +760,8 @@ void NodeManager::WarnResourceDeadlock() { << exemplar.GetTaskSpecification().GetRequiredPlacementResources().ToString() << "\n" << "Available resources on this node: " - << cluster_resource_scheduler_->GetLocalResourceViewString() + << cluster_resource_scheduler_->GetClusterResourceManager() + .GetNodeResourceViewString(self_node_id_.Binary()) << " In total there are " << pending_tasks << " pending tasks and " << pending_actor_creations << " pending actors on this node."; @@ -835,7 +836,8 @@ void NodeManager::NodeRemoved(const NodeID &node_id) { // not be necessary. // Remove the node from the resource map. - if (!cluster_resource_scheduler_->RemoveNode(node_id.Binary())) { + if (!cluster_resource_scheduler_->GetClusterResourceManager().RemoveNode( + node_id.Binary())) { RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id << "."; return; @@ -920,8 +922,8 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id, for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) { const std::string &resource_label = resource_pair.first; const double &new_resource_capacity = resource_pair.second; - cluster_resource_scheduler_->UpdateResourceCapacity(node_id.Binary(), resource_label, - new_resource_capacity); + cluster_resource_scheduler_->GetClusterResourceManager().UpdateResourceCapacity( + node_id.Binary(), resource_label, new_resource_capacity); } RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map."; cluster_task_manager_->ScheduleAndDispatchTasks(); @@ -948,14 +950,16 @@ void NodeManager::ResourceDeleted(const NodeID &node_id, // Update local_available_resources_ and SchedulingResources for (const auto &resource_label : resource_names) { - cluster_resource_scheduler_->DeleteResource(node_id.Binary(), resource_label); + cluster_resource_scheduler_->GetClusterResourceManager().DeleteResource( + node_id.Binary(), resource_label); } return; } void NodeManager::UpdateResourceUsage(const NodeID &node_id, const rpc::ResourcesData &resource_data) { - if (!cluster_resource_scheduler_->UpdateNode(node_id.Binary(), resource_data)) { + if (!cluster_resource_scheduler_->GetClusterResourceManager().UpdateNode( + node_id.Binary(), resource_data)) { RAY_LOG(INFO) << "[UpdateResourceUsage]: received resource usage from unknown node id " << node_id; @@ -1591,7 +1595,8 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest << " actor_id = " << actor_id << ", normal_task_resources = " << normal_task_resources.ToString() << ", local_resoruce_view = " - << cluster_resource_scheduler_->GetLocalResourceViewString(); + << cluster_resource_scheduler_->GetClusterResourceManager() + .GetNodeResourceViewString(self_node_id_.Binary()); auto resources_data = reply->mutable_resources_data(); resources_data->set_node_id(self_node_id_.Binary()); resources_data->set_resources_normal_task_changed(true); diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc index 0fec1a32d..b751119d9 100644 --- a/src/ray/raylet/placement_group_resource_manager_test.cc +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -61,7 +61,9 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { } void CheckRemainingResourceCorrect(NodeResources &node_resources) { - auto local_node_resource = cluster_resource_scheduler_->GetLocalNodeResources(); + auto local_node_resource = + cluster_resource_scheduler_->GetClusterResourceManager().GetNodeResources( + "local"); ASSERT_TRUE(local_node_resource == node_resources); } @@ -135,7 +137,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( unit_resource, resource_instances)); auto remaining_resource_instance = - remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -162,7 +165,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { auto remaining_resource_scheduler = std::make_shared( "remaining", unit_resource, *gcs_client_); auto remaining_resource_instance = - remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -207,7 +211,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( init_unit_resource, resource_instances)); auto remaining_resource_instance = - remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); CheckRemainingResourceCorrect(remaining_resource_instance); /// 5. return second bundle. @@ -230,7 +235,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu {"CPU", 1.0}, {"bundle_group_" + group_id.Hex(), 1000}}, resource_instances)); - remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_instance = + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); CheckRemainingResourceCorrect(remaining_resource_instance); /// 7. return first bundle. new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec); @@ -238,7 +245,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu remaining_resources = {{"CPU", 2.0}}; remaining_resource_scheduler = std::make_shared( "remaining", remaining_resources, *gcs_client_); - remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_instance = + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); ASSERT_TRUE(update_called_); ASSERT_TRUE(delete_called_); CheckRemainingResourceCorrect(remaining_resource_instance); @@ -268,7 +277,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( unit_resource, resource_instances)); auto remaining_resource_instance = - remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -304,7 +314,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( unit_resource, resource_instances)); auto remaining_resource_instance = - remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); CheckRemainingResourceCorrect(remaining_resource_instance); new_placement_group_resource_manager_->ReturnBundle(bundle_spec); // 5. prepare bundle -> commit bundle -> commit bundle. @@ -326,7 +337,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) // 8. check remaining resources is correct. remaining_resource_scheduler = std::make_shared( "remaining", available_resource, *gcs_client_); - remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_instance = + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -349,7 +362,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { auto remaining_resource_scheduler = std::make_shared( "remaining", remaining_resources, *gcs_client_); auto remaining_resource_instance = - remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); CheckRemainingResourceCorrect(remaining_resource_instance); // 5. re-init the local available resource with 4 CPUs. available_resource = {std::make_pair("CPU", 4.0)}; @@ -380,7 +394,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { ASSERT_TRUE( remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( allocating_resource, resource_instances)); - remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_instance = + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); RAY_LOG(INFO) << "The current local resource view: " << cluster_resource_scheduler_->DebugString(); CheckRemainingResourceCorrect(remaining_resource_instance); @@ -425,7 +441,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( allocating_resource, resource_instances)); auto remaining_resource_instance = - remaining_resource_scheduler->GetLocalNodeResources(); + remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( + "remaining"); RAY_LOG(INFO) << "The current local resource view: " << cluster_resource_scheduler_->DebugString(); CheckRemainingResourceCorrect(remaining_resource_instance); diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc new file mode 100644 index 000000000..47851741c --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -0,0 +1,261 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/scheduling/cluster_resource_manager.h" + +#include + +#include "ray/common/grpc_util.h" +#include "ray/common/ray_config.h" +#include "ray/util/container_util.h" + +namespace ray { + +ClusterResourceManager::ClusterResourceManager(StringIdMap &string_to_int_map) + : nodes_{}, string_to_int_map_{string_to_int_map} {} + +void ClusterResourceManager::AddOrUpdateNode( + const std::string &node_id, + const absl::flat_hash_map &resources_total, + const absl::flat_hash_map &resources_available) { + NodeResources node_resources = ResourceMapToNodeResources( + string_to_int_map_, resources_total, resources_available); + AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources); +} + +void ClusterResourceManager::AddOrUpdateNode(int64_t node_id, + const NodeResources &node_resources) { + RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id << ", node_resources: " + << node_resources.DebugString(string_to_int_map_); + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + // This node is new, so add it to the map. + nodes_.emplace(node_id, node_resources); + } else { + // This node exists, so update its resources. + it->second = Node(node_resources); + } +} + +bool ClusterResourceManager::UpdateNode(const std::string &node_id_string, + const rpc::ResourcesData &resource_data) { + auto node_id = string_to_int_map_.Insert(node_id_string); + if (!nodes_.contains(node_id)) { + return false; + } + + auto resources_total = MapFromProtobuf(resource_data.resources_total()); + auto resources_available = MapFromProtobuf(resource_data.resources_available()); + NodeResources node_resources = ResourceMapToNodeResources( + string_to_int_map_, resources_total, resources_available); + NodeResources local_view; + RAY_CHECK(GetNodeResources(node_id, &local_view)); + + if (resource_data.resources_total_size() > 0) { + for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) { + local_view.predefined_resources[i].total = + node_resources.predefined_resources[i].total; + } + for (auto &entry : node_resources.custom_resources) { + local_view.custom_resources[entry.first].total = entry.second.total; + } + } + + if (resource_data.resources_available_changed()) { + for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) { + local_view.predefined_resources[i].available = + node_resources.predefined_resources[i].available; + } + for (auto &entry : node_resources.custom_resources) { + local_view.custom_resources[entry.first].available = entry.second.available; + } + + local_view.object_pulls_queued = resource_data.object_pulls_queued(); + } + + AddOrUpdateNode(node_id, local_view); + return true; +} + +bool ClusterResourceManager::RemoveNode(int64_t node_id) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + // Node not found. + return false; + } else { + nodes_.erase(it); + return true; + } +} + +bool ClusterResourceManager::RemoveNode(const std::string &node_id_string) { + auto node_id = string_to_int_map_.Get(node_id_string); + if (node_id == -1) { + return false; + } + + return RemoveNode(node_id); +} + +bool ClusterResourceManager::GetNodeResources(int64_t node_id, + NodeResources *ret_resources) const { + auto it = nodes_.find(node_id); + if (it != nodes_.end()) { + *ret_resources = it->second.GetLocalView(); + return true; + } else { + return false; + } +} + +const NodeResources &ClusterResourceManager::GetNodeResources( + const std::string &node_name) const { + int64_t node_id = string_to_int_map_.Get(node_name); + const auto &node = map_find_or_die(nodes_, node_id); + return node.GetLocalView(); +} + +int64_t ClusterResourceManager::NumNodes() const { return nodes_.size(); } + +void ClusterResourceManager::UpdateResourceCapacity(const std::string &node_id_string, + const std::string &resource_name, + double resource_total) { + int64_t node_id = string_to_int_map_.Get(node_id_string); + + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + NodeResources node_resources; + node_resources.predefined_resources.resize(PredefinedResources_MAX); + node_id = string_to_int_map_.Insert(node_id_string); + it = nodes_.emplace(node_id, node_resources).first; + } + + int idx = GetPredefinedResourceIndex(resource_name); + + auto local_view = it->second.GetMutableLocalView(); + FixedPoint resource_total_fp(resource_total); + if (idx != -1) { + auto diff_capacity = resource_total_fp - local_view->predefined_resources[idx].total; + local_view->predefined_resources[idx].total += diff_capacity; + local_view->predefined_resources[idx].available += diff_capacity; + if (local_view->predefined_resources[idx].available < 0) { + local_view->predefined_resources[idx].available = 0; + } + if (local_view->predefined_resources[idx].total < 0) { + local_view->predefined_resources[idx].total = 0; + } + } else { + string_to_int_map_.Insert(resource_name); + int64_t resource_id = string_to_int_map_.Get(resource_name); + auto itr = local_view->custom_resources.find(resource_id); + if (itr != local_view->custom_resources.end()) { + auto diff_capacity = resource_total_fp - itr->second.total; + itr->second.total += diff_capacity; + itr->second.available += diff_capacity; + if (itr->second.available < 0) { + itr->second.available = 0; + } + if (itr->second.total < 0) { + itr->second.total = 0; + } + } else { + ResourceCapacity resource_capacity; + resource_capacity.total = resource_capacity.available = resource_total_fp; + local_view->custom_resources.emplace(resource_id, resource_capacity); + } + } +} + +void ClusterResourceManager::DeleteResource(const std::string &node_id_string, + const std::string &resource_name) { + int64_t node_id = string_to_int_map_.Get(node_id_string); + + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + return; + } + + int idx = GetPredefinedResourceIndex(resource_name); + auto local_view = it->second.GetMutableLocalView(); + if (idx != -1) { + local_view->predefined_resources[idx].available = 0; + local_view->predefined_resources[idx].total = 0; + + } else { + int64_t resource_id = string_to_int_map_.Get(resource_name); + auto itr = local_view->custom_resources.find(resource_id); + if (itr != local_view->custom_resources.end()) { + local_view->custom_resources.erase(itr); + } + } +} + +std::string ClusterResourceManager::GetNodeResourceViewString( + const std::string &node_name) const { + int64_t node_id = string_to_int_map_.Get(node_name); + const auto &node = map_find_or_die(nodes_, node_id); + return node.GetLocalView().DictString(string_to_int_map_); +} + +std::string ClusterResourceManager::GetResourceNameFromIndex(int64_t res_idx) { + if (res_idx == CPU) { + return ray::kCPU_ResourceLabel; + } else if (res_idx == GPU) { + return ray::kGPU_ResourceLabel; + } else if (res_idx == OBJECT_STORE_MEM) { + return ray::kObjectStoreMemory_ResourceLabel; + } else if (res_idx == MEM) { + return ray::kMemory_ResourceLabel; + } else { + return string_to_int_map_.Get((uint64_t)res_idx); + } +} + +const absl::flat_hash_map &ClusterResourceManager::GetResourceView() + const { + return nodes_; +} + +bool ClusterResourceManager::SubtractNodeAvailableResources( + int64_t node_id, const ResourceRequest &resource_request) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + return false; + } + + NodeResources *resources = it->second.GetMutableLocalView(); + + FixedPoint zero(0.); + + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + resources->predefined_resources[i].available = + std::max(FixedPoint(0), resources->predefined_resources[i].available - + resource_request.predefined_resources[i]); + } + + for (const auto &task_req_custom_resource : resource_request.custom_resources) { + auto it = resources->custom_resources.find(task_req_custom_resource.first); + if (it != resources->custom_resources.end()) { + it->second.available = + std::max(FixedPoint(0), it->second.available - task_req_custom_resource.second); + } + } + + // TODO(swang): We should also subtract object store memory if the task has + // arguments. Right now we do not modify object_pulls_queued in case of + // performance regressions in spillback. + + return true; +} +} // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.h b/src/ray/raylet/scheduling/cluster_resource_manager.h new file mode 100644 index 000000000..c3a41498e --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_resource_manager.h @@ -0,0 +1,142 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" +#include "ray/raylet/scheduling/cluster_resource_data.h" +#include "ray/raylet/scheduling/fixed_point.h" +#include "ray/raylet/scheduling/local_resource_manager.h" +#include "ray/util/logging.h" +#include "src/ray/protobuf/gcs.pb.h" + +namespace ray { +namespace raylet { +class ClusterTaskManagerTest; +} + +/// Class manages the resources view of the entire cluster. +/// This class is not thread safe. +class ClusterResourceManager { + public: + explicit ClusterResourceManager(StringIdMap &string_to_int_map); + + /// Get the resource view of the cluster. + const absl::flat_hash_map &GetResourceView() const; + + // Mapping from predefined resource indexes to resource strings + std::string GetResourceNameFromIndex(int64_t res_idx); + + /// Update node resources. This hanppens when a node resource usage udpated. + /// + /// \param node_id_string ID of the node which resoruces need to be udpated. + /// \param resource_data The node resource data. + bool UpdateNode(const std::string &node_id_string, + const rpc::ResourcesData &resource_data); + + /// Remove node from the cluster data structure. This happens + /// when a node fails or it is removed from the cluster. + /// + /// \param node_id_string ID of the node to be removed. + bool RemoveNode(const std::string &node_id_string); + + /// Get number of nodes in the cluster. + int64_t NumNodes() const; + + /// Update total capacity of a given resource of a given node. + /// + /// \param node_name: Node whose resource we want to update. + /// \param resource_name: Resource which we want to update. + /// \param resource_total: New capacity of the resource. + void UpdateResourceCapacity(const std::string &node_name, + const std::string &resource_name, double resource_total); + + /// Delete a given resource from a given node. + /// + /// \param node_name: Node whose resource we want to delete. + /// \param resource_name: Resource we want to delete + void DeleteResource(const std::string &node_name, const std::string &resource_name); + + /// Return local resources in human-readable string form. + std::string GetNodeResourceViewString(const std::string &node_name) const; + + /// Get local resource. + const NodeResources &GetNodeResources(const std::string &node_name) const; + + /// Subtract available resource from a given node. + //// Return false if such node doesn't exist. + bool SubtractNodeAvailableResources(int64_t node_id, + const ResourceRequest &resource_request); + + private: + friend class ClusterResourceScheduler; + + /// Add a new node or overwrite the resources of an existing node. + /// + /// \param node_id: Node ID. + /// \param node_resources: Up to date total and available resources of the node. + void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources); + + void AddOrUpdateNode( + const std::string &node_id, + const absl::flat_hash_map &resource_map_total, + const absl::flat_hash_map &resource_map_available); + + /// Remove node from the cluster data structure. This happens + /// when a node fails or it is removed from the cluster. + /// + /// \param node_id ID of the node to be removed. + bool RemoveNode(int64_t node_id); + + /// Return resources associated to the given node_id in ret_resources. + /// If node_id not found, return false; otherwise return true. + bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const; + + /// List of nodes in the clusters and their resources organized as a map. + /// The key of the map is the node ID. + absl::flat_hash_map nodes_; + /// Keep the mapping between node and resource IDs in string representation + /// to integer representation. Used for improving map performance. + StringIdMap &string_to_int_map_; + + friend class ClusterResourceSchedulerTest; + friend class raylet::ClusterTaskManagerTest; + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest); + FRIEND_TEST(ClusterResourceSchedulerTest, + UpdateLocalAvailableResourcesFromResourceInstancesTest); + FRIEND_TEST(ClusterResourceSchedulerTest, ResourceUsageReportTest); + FRIEND_TEST(ClusterResourceSchedulerTest, DeadNodeTest); + FRIEND_TEST(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask); + FRIEND_TEST(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest); + FRIEND_TEST(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest); + FRIEND_TEST(ClusterResourceSchedulerTest, DirtyLocalViewTest); + FRIEND_TEST(ClusterResourceSchedulerTest, DynamicResourceTest); + FRIEND_TEST(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources); + FRIEND_TEST(ClusterResourceSchedulerTest, TestForceSpillback); +}; + +} // end namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index f15616f9b..e481044ee 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -21,21 +21,39 @@ namespace ray { +ClusterResourceScheduler::ClusterResourceScheduler() { + cluster_resource_manager_ = + std::make_unique(string_to_int_map_); + NodeResources node_resources; + node_resources.predefined_resources.resize(PredefinedResources_MAX); + local_resource_manager_ = std::make_unique( + local_node_id_, string_to_int_map_, node_resources, + /*get_used_object_store_memory*/ nullptr, /*get_pull_manager_at_capacity*/ nullptr, + [&](const NodeResources &local_resource_update) { + cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update); + }); + scheduling_policy_ = std::make_unique( + local_node_id_, cluster_resource_manager_->GetResourceView()); +} + ClusterResourceScheduler::ClusterResourceScheduler( int64_t local_node_id, const NodeResources &local_node_resources, gcs::GcsClient &gcs_client) - : local_node_id_(local_node_id), + : string_to_int_map_(), + local_node_id_(local_node_id), gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()), gcs_client_(&gcs_client) { - scheduling_policy_ = std::make_unique( - local_node_id_, nodes_); + cluster_resource_manager_ = + std::make_unique(string_to_int_map_); local_resource_manager_ = std::make_unique( local_node_id, string_to_int_map_, local_node_resources, /*get_used_object_store_memory*/ nullptr, /*get_pull_manager_at_capacity*/ nullptr, [&](const NodeResources &local_resource_update) { - this->AddOrUpdateNode(local_node_id_, local_resource_update); + cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update); }); - AddOrUpdateNode(local_node_id_, local_node_resources); + cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_node_resources); + scheduling_policy_ = std::make_unique( + local_node_id_, cluster_resource_manager_->GetResourceView()); } ClusterResourceScheduler::ClusterResourceScheduler( @@ -43,19 +61,23 @@ ClusterResourceScheduler::ClusterResourceScheduler( const absl::flat_hash_map &local_node_resources, gcs::GcsClient &gcs_client, std::function get_used_object_store_memory, std::function get_pull_manager_at_capacity) - : gcs_client_(&gcs_client) { + : string_to_int_map_(), + local_node_id_(), + gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()), + gcs_client_(&gcs_client) { local_node_id_ = string_to_int_map_.Insert(local_node_id); - scheduling_policy_ = std::make_unique( - local_node_id_, nodes_); NodeResources node_resources = ResourceMapToNodeResources( string_to_int_map_, local_node_resources, local_node_resources); - + cluster_resource_manager_ = + std::make_unique(string_to_int_map_); local_resource_manager_ = std::make_unique( local_node_id_, string_to_int_map_, node_resources, get_used_object_store_memory, get_pull_manager_at_capacity, [&](const NodeResources &local_resource_update) { - this->AddOrUpdateNode(local_node_id_, local_resource_update); + cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update); }); - AddOrUpdateNode(local_node_id_, node_resources); + cluster_resource_manager_->AddOrUpdateNode(local_node_id_, node_resources); + scheduling_policy_ = std::make_unique( + local_node_id_, cluster_resource_manager_->GetResourceView()); } bool ClusterResourceScheduler::NodeAlive(int64_t node_id) const { @@ -69,89 +91,6 @@ bool ClusterResourceScheduler::NodeAlive(int64_t node_id) const { return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id_binary)) != nullptr; } -void ClusterResourceScheduler::AddOrUpdateNode( - const std::string &node_id, - const absl::flat_hash_map &resources_total, - const absl::flat_hash_map &resources_available) { - NodeResources node_resources = ResourceMapToNodeResources( - string_to_int_map_, resources_total, resources_available); - AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources); -} - -void ClusterResourceScheduler::AddOrUpdateNode(int64_t node_id, - const NodeResources &node_resources) { - RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id << ", node_resources: " - << node_resources.DebugString(string_to_int_map_); - auto it = nodes_.find(node_id); - if (it == nodes_.end()) { - // This node is new, so add it to the map. - nodes_.emplace(node_id, node_resources); - } else { - // This node exists, so update its resources. - it->second = Node(node_resources); - } -} - -bool ClusterResourceScheduler::UpdateNode(const std::string &node_id_string, - const rpc::ResourcesData &resource_data) { - auto node_id = string_to_int_map_.Insert(node_id_string); - if (!nodes_.contains(node_id)) { - return false; - } - - auto resources_total = MapFromProtobuf(resource_data.resources_total()); - auto resources_available = MapFromProtobuf(resource_data.resources_available()); - NodeResources node_resources = ResourceMapToNodeResources( - string_to_int_map_, resources_total, resources_available); - NodeResources local_view; - RAY_CHECK(GetNodeResources(node_id, &local_view)); - - if (resource_data.resources_total_size() > 0) { - for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) { - local_view.predefined_resources[i].total = - node_resources.predefined_resources[i].total; - } - for (auto &entry : node_resources.custom_resources) { - local_view.custom_resources[entry.first].total = entry.second.total; - } - } - - if (resource_data.resources_available_changed()) { - for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) { - local_view.predefined_resources[i].available = - node_resources.predefined_resources[i].available; - } - for (auto &entry : node_resources.custom_resources) { - local_view.custom_resources[entry.first].available = entry.second.available; - } - - local_view.object_pulls_queued = resource_data.object_pulls_queued(); - } - - AddOrUpdateNode(node_id, local_view); - return true; -} - -bool ClusterResourceScheduler::RemoveNode(int64_t node_id) { - auto it = nodes_.find(node_id); - if (it == nodes_.end()) { - // Node not found. - return false; - } else { - nodes_.erase(it); - return true; - } -} - -bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) { - auto node_id = string_to_int_map_.Get(node_id_string); - if (node_id == -1) { - return false; - } - - return RemoveNode(node_id); -} - bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_request, int64_t node_id, const NodeResources &resources) const { @@ -202,11 +141,12 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode( int64_t best_node = -1; // This is an actor which requires no resources. // Pick a random node to to avoid scheduling all actors on the local node. - if (nodes_.size() > 0) { - std::uniform_int_distribution distribution(0, nodes_.size() - 1); + const auto &resource_view = cluster_resource_manager_->GetResourceView(); + if (resource_view.size() > 0) { + std::uniform_int_distribution distribution(0, resource_view.size() - 1); int idx = distribution(gen_); - auto iter = std::next(nodes_.begin(), idx); - for (size_t i = 0; i < nodes_.size(); ++i) { + auto iter = std::next(resource_view.begin(), idx); + for (size_t i = 0; i < resource_view.size(); ++i) { // TODO(iycheng): Here is there are a lot of nodes died, the // distribution might not be even. if (NodeAlive(iter->first)) { @@ -214,13 +154,13 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode( break; } ++iter; - if (iter == nodes_.end()) { - iter = nodes_.begin(); + if (iter == resource_view.end()) { + iter = resource_view.begin(); } } } RAY_LOG(DEBUG) << "GetBestSchedulableNode, best_node = " << best_node - << ", # nodes = " << nodes_.size() + << ", # nodes = " << resource_view.size() << ", resource_request = " << resource_request.DebugString(); return best_node; } @@ -273,168 +213,36 @@ std::string ClusterResourceScheduler::GetBestSchedulableNode( bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources( int64_t node_id, const ResourceRequest &resource_request) { RAY_CHECK(node_id != local_node_id_); - - auto it = nodes_.find(node_id); - if (it == nodes_.end()) { + const auto &resource_view = cluster_resource_manager_->GetResourceView(); + auto it = resource_view.find(node_id); + if (it == resource_view.end()) { return false; } - NodeResources *resources = it->second.GetMutableLocalView(); // Just double check this node can still schedule the resource request. - if (!IsSchedulable(resource_request, node_id, *resources)) { + if (!IsSchedulable(resource_request, node_id, it->second.GetLocalView())) { return false; } - FixedPoint zero(0.); - - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - resources->predefined_resources[i].available = - std::max(FixedPoint(0), resources->predefined_resources[i].available - - resource_request.predefined_resources[i]); - } - - for (const auto &task_req_custom_resource : resource_request.custom_resources) { - auto it = resources->custom_resources.find(task_req_custom_resource.first); - if (it != resources->custom_resources.end()) { - it->second.available = - std::max(FixedPoint(0), it->second.available - task_req_custom_resource.second); - } - } - - // TODO(swang): We should also subtract object store memory if the task has - // arguments. Right now we do not modify object_pulls_queued in case of - // performance regressions in spillback. - - return true; + return cluster_resource_manager_->SubtractNodeAvailableResources(node_id, + resource_request); } -bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, - NodeResources *ret_resources) const { - auto it = nodes_.find(node_id); - if (it != nodes_.end()) { - *ret_resources = it->second.GetLocalView(); - return true; - } else { - return false; - } -} - -const NodeResources &ClusterResourceScheduler::GetLocalNodeResources() const { - const auto &node_it = nodes_.find(local_node_id_); - RAY_CHECK(node_it != nodes_.end()); - return node_it->second.GetLocalView(); -} - -int64_t ClusterResourceScheduler::NumNodes() const { return nodes_.size(); } - const StringIdMap &ClusterResourceScheduler::GetStringIdMap() const { return string_to_int_map_; } -void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id_string, - const std::string &resource_name, - double resource_total) { - int64_t node_id = string_to_int_map_.Get(node_id_string); - - auto it = nodes_.find(node_id); - if (it == nodes_.end()) { - NodeResources node_resources; - node_resources.predefined_resources.resize(PredefinedResources_MAX); - node_id = string_to_int_map_.Insert(node_id_string); - it = nodes_.emplace(node_id, node_resources).first; - } - - int idx = GetPredefinedResourceIndex(resource_name); - - auto local_view = it->second.GetMutableLocalView(); - FixedPoint resource_total_fp(resource_total); - if (idx != -1) { - auto diff_capacity = resource_total_fp - local_view->predefined_resources[idx].total; - local_view->predefined_resources[idx].total += diff_capacity; - local_view->predefined_resources[idx].available += diff_capacity; - if (local_view->predefined_resources[idx].available < 0) { - local_view->predefined_resources[idx].available = 0; - } - if (local_view->predefined_resources[idx].total < 0) { - local_view->predefined_resources[idx].total = 0; - } - } else { - string_to_int_map_.Insert(resource_name); - int64_t resource_id = string_to_int_map_.Get(resource_name); - auto itr = local_view->custom_resources.find(resource_id); - if (itr != local_view->custom_resources.end()) { - auto diff_capacity = resource_total_fp - itr->second.total; - itr->second.total += diff_capacity; - itr->second.available += diff_capacity; - if (itr->second.available < 0) { - itr->second.available = 0; - } - if (itr->second.total < 0) { - itr->second.total = 0; - } - } else { - ResourceCapacity resource_capacity; - resource_capacity.total = resource_capacity.available = resource_total_fp; - local_view->custom_resources.emplace(resource_id, resource_capacity); - } - } -} - -void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string, - const std::string &resource_name) { - int64_t node_id = string_to_int_map_.Get(node_id_string); - - auto it = nodes_.find(node_id); - if (it == nodes_.end()) { - return; - } - - int idx = GetPredefinedResourceIndex(resource_name); - auto local_view = it->second.GetMutableLocalView(); - if (idx != -1) { - local_view->predefined_resources[idx].available = 0; - local_view->predefined_resources[idx].total = 0; - - } else { - int64_t resource_id = string_to_int_map_.Get(resource_name); - auto itr = local_view->custom_resources.find(resource_id); - if (itr != local_view->custom_resources.end()) { - local_view->custom_resources.erase(itr); - } - } -} - std::string ClusterResourceScheduler::DebugString(void) const { std::stringstream buffer; buffer << "\nLocal id: " << local_node_id_; buffer << " Local resources: " << local_resource_manager_->DebugString(); - for (auto &node : nodes_) { + for (auto &node : cluster_resource_manager_->GetResourceView()) { buffer << "node id: " << node.first; buffer << node.second.GetLocalView().DebugString(string_to_int_map_); } return buffer.str(); } -std::string ClusterResourceScheduler::GetLocalResourceViewString() const { - const auto &node_it = nodes_.find(local_node_id_); - RAY_CHECK(node_it != nodes_.end()); - return node_it->second.GetLocalView().DictString(string_to_int_map_); -} - -std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx) { - if (res_idx == CPU) { - return ray::kCPU_ResourceLabel; - } else if (res_idx == GPU) { - return ray::kGPU_ResourceLabel; - } else if (res_idx == OBJECT_STORE_MEM) { - return ray::kObjectStoreMemory_ResourceLabel; - } else if (res_idx == MEM) { - return ray::kMemory_ResourceLabel; - } else { - return string_to_int_map_.Get((uint64_t)res_idx); - } -} - bool ClusterResourceScheduler::AllocateRemoteTaskResources( const std::string &node_string, const absl::flat_hash_map &task_resources) { @@ -445,11 +253,13 @@ bool ClusterResourceScheduler::AllocateRemoteTaskResources( return SubtractRemoteNodeAvailableResources(node_id, resource_request); } -bool ClusterResourceScheduler::IsLocallySchedulable( - const absl::flat_hash_map &shape) { +bool ClusterResourceScheduler::IsSchedulableOnNode( + const std::string &node_name, const absl::flat_hash_map &shape) { + int64_t node_id = string_to_int_map_.Get(node_name); auto resource_request = ResourceMapToResourceRequest( string_to_int_map_, shape, /*requires_object_store_memory=*/false); - return IsSchedulable(resource_request, local_node_id_, GetLocalNodeResources()); + return IsSchedulable(resource_request, node_id, + cluster_resource_manager_->GetNodeResources(node_name)); } } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 2f8a5cd3c..d7b5a039f 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -26,7 +26,7 @@ #include "ray/gcs/gcs_client/accessor.h" #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/raylet/scheduling/cluster_resource_data.h" -#include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h" +#include "ray/raylet/scheduling/cluster_resource_manager.h" #include "ray/raylet/scheduling/fixed_point.h" #include "ray/raylet/scheduling/local_resource_manager.h" #include "ray/raylet/scheduling/scheduling_ids.h" @@ -41,9 +41,9 @@ using rpc::HeartbeatTableData; /// Class encapsulating the cluster resources and the logic to assign /// tasks to nodes based on the task's constraints and the available /// resources at those nodes. -class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { +class ClusterResourceScheduler { public: - ClusterResourceScheduler() {} + ClusterResourceScheduler(); /// Constructor initializing the resources associated with the local node. /// /// \param local_node_id: ID of local node, @@ -59,26 +59,7 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { std::function get_used_object_store_memory = nullptr, std::function get_pull_manager_at_capacity = nullptr); - // Mapping from predefined resource indexes to resource strings - std::string GetResourceNameFromIndex(int64_t res_idx); - - void AddOrUpdateNode( - const std::string &node_id, - const absl::flat_hash_map &resource_map_total, - const absl::flat_hash_map &resource_map_available); - - /// Update node resources. This hanppens when a node resource usage udpated. - /// - /// \param node_id_string ID of the node which resoruces need to be udpated. - /// \param resource_data The node resource data. - bool UpdateNode(const std::string &node_id_string, - const rpc::ResourcesData &resource_data) override; - - /// Remove node from the cluster data structure. This happens - /// when a node fails or it is removed from the cluster. - /// - /// \param node_id_string ID of the node to be removed. - bool RemoveNode(const std::string &node_id_string) override; + const StringIdMap &GetStringIdMap() const; /// Find a node in the cluster on which we can schedule a given resource request. /// In hybrid mode, see `scheduling_policy.h` for a description of the policy. @@ -114,31 +95,6 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { bool requires_object_store_memory, bool actor_creation, bool force_spillback, int64_t *violations, bool *is_infeasible); - /// Get number of nodes in the cluster. - int64_t NumNodes() const; - - /// Temporarily get the StringIDMap. - const StringIdMap &GetStringIdMap() const; - - /// Update total capacity of a given resource of a given node. - /// - /// \param node_name: Node whose resource we want to update. - /// \param resource_name: Resource which we want to update. - /// \param resource_total: New capacity of the resource. - void UpdateResourceCapacity(const std::string &node_name, - const std::string &resource_name, - double resource_total) override; - - /// Delete a given resource from a given node. - /// - /// \param node_name: Node whose resource we want to delete. - /// \param resource_name: Resource we want to delete - void DeleteResource(const std::string &node_name, - const std::string &resource_name) override; - - /// Return local resources in human-readable string form. - std::string GetLocalResourceViewString() const override; - /// Subtract the resources required by a given resource request (resource_request) from /// a given remote node. /// @@ -153,16 +109,18 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { /// Return human-readable string for this scheduler state. std::string DebugString() const; - /// Check whether a task request is schedulable on a the local node. A node is + /// Check whether a task request is schedulable on a given node. A node is /// schedulable if it has the available resources needed to execute the task. /// + /// \param node_name Name of the node. /// \param shape The resource demand's shape. - bool IsLocallySchedulable(const absl::flat_hash_map &shape); + bool IsSchedulableOnNode(const std::string &node_name, + const absl::flat_hash_map &shape); LocalResourceManager &GetLocalResourceManager() { return *local_resource_manager_; } - - /// Get local node resources; test only. - const NodeResources &GetLocalNodeResources() const; + ClusterResourceManager &GetClusterResourceManager() { + return *cluster_resource_manager_; + } private: bool NodeAlive(int64_t node_id) const; @@ -178,18 +136,6 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { bool SubtractRemoteNodeAvailableResources(int64_t node_id, const ResourceRequest &resource_request); - /// Add a new node or overwrite the resources of an existing node. - /// - /// \param node_id: Node ID. - /// \param node_resources: Up to date total and available resources of the node. - void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources); - - /// Remove node from the cluster data structure. This happens - /// when a node fails or it is removed from the cluster. - /// - /// \param node_id ID of the node to be removed. - bool RemoveNode(int64_t node_id); - /// Check whether a resource request can be scheduled given a node. /// /// \param resource_request: Resource request to be scheduled. @@ -204,40 +150,41 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { bool IsSchedulable(const ResourceRequest &resource_request, int64_t node_id, const NodeResources &resources) const; - /// Return resources associated to the given node_id in ret_resources. - /// If node_id not found, return false; otherwise return true. - bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const; - - /// List of nodes in the clusters and their resources organized as a map. - /// The key of the map is the node ID. - absl::flat_hash_map nodes_; - /// Identifier of local node. - int64_t local_node_id_; - /// The scheduling policy to use. - std::unique_ptr scheduling_policy_; - /// Internally maintained random number generator. - std::mt19937_64 gen_; - /// Resources of local node. - std::unique_ptr local_resource_manager_; /// Keep the mapping between node and resource IDs in string representation /// to integer representation. Used for improving map performance. StringIdMap string_to_int_map_; + /// Identifier of local node. + int64_t local_node_id_; + /// Internally maintained random number generator. + std::mt19937_64 gen_; /// Gcs client. It's not owned by this class. gcs::GcsClient *gcs_client_; + /// Resources of local node. + std::unique_ptr local_resource_manager_; + /// Resources of the entire cluster. + std::unique_ptr cluster_resource_manager_; + /// The scheduling policy to use. + std::unique_ptr scheduling_policy_; friend class ClusterResourceSchedulerTest; FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest); FRIEND_TEST(ClusterResourceSchedulerTest, UpdateLocalAvailableResourcesFromResourceInstancesTest); FRIEND_TEST(ClusterResourceSchedulerTest, ResourceUsageReportTest); + FRIEND_TEST(ClusterResourceSchedulerTest, DeadNodeTest); + FRIEND_TEST(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask); FRIEND_TEST(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest); FRIEND_TEST(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest); + FRIEND_TEST(ClusterResourceSchedulerTest, DirtyLocalViewTest); + FRIEND_TEST(ClusterResourceSchedulerTest, DynamicResourceTest); FRIEND_TEST(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources); + FRIEND_TEST(ClusterResourceSchedulerTest, TestForceSpillback); }; } // end namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h b/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h deleted file mode 100644 index cbd8fc610..000000000 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include "ray/common/task/scheduling_resources.h" -#include "src/ray/protobuf/gcs.pb.h" -#include "src/ray/protobuf/gcs_service.pb.h" - -namespace ray { -class ClusterResourceSchedulerInterface { - public: - virtual ~ClusterResourceSchedulerInterface() = default; - - /// Remove node from the cluster data structure. This happens - /// when a node fails or it is removed from the cluster. - /// - /// \param node_id_string ID of the node to be removed. - virtual bool RemoveNode(const std::string &node_id_string) = 0; - - /// Update node resources. This hanppens when a node resource usage udpated. - /// - /// \param node_id_string ID of the node which resoruces need to be udpated. - /// \param resource_data The node resource data. - virtual bool UpdateNode(const std::string &node_id_string, - const rpc::ResourcesData &resource_data) = 0; - - /// \param node_name: Node whose resource we want to update. - /// \param resource_name: Resource which we want to update. - /// \param resource_total: New capacity of the resource. - virtual void UpdateResourceCapacity(const std::string &node_id_string, - const std::string &resource_name, - double resource_total) = 0; - - /// Delete a given resource from a given node. - /// - /// \param node_name: Node whose resource we want to delete. - /// \param resource_name: Resource we want to delete - virtual void DeleteResource(const std::string &node_id_string, - const std::string &resource_name) = 0; - - /// Return local resources in human-readable string form. - virtual std::string GetLocalResourceViewString() const = 0; -}; -} // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 87977adfe..e75000dd1 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -150,7 +150,8 @@ class ClusterResourceSchedulerTest : public ::testing::Test { // `scheduling_policy_test.cc` for comprehensive testing of the hybrid scheduling // policy. gcs_client_ = std::make_unique(); - node_info.set_node_id(NodeID::FromRandom().Binary()); + node_name = NodeID::FromRandom().Binary(); + node_info.set_node_id(node_name); ON_CALL(*gcs_client_->mock_node_accessor, Get(::testing::_, ::testing::_)) .WillByDefault(::testing::Return(&node_info)); } @@ -184,12 +185,13 @@ class ClusterResourceSchedulerTest : public ::testing::Test { initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.AddOrUpdateNode(i, node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(i, node_resources); node_resources.custom_resources.clear(); } } std::unique_ptr gcs_client_; + std::string node_name; rpc::GcsNodeInfo node_info; }; @@ -268,7 +270,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingInitClusterTest) { initCluster(resource_scheduler, num_nodes); - ASSERT_EQ(resource_scheduler.NumNodes(), num_nodes); + ASSERT_EQ(resource_scheduler.GetClusterResourceManager().NumNodes(), num_nodes); } TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) { @@ -278,9 +280,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) { ClusterResourceScheduler resource_scheduler; initCluster(resource_scheduler, num_nodes); - resource_scheduler.RemoveNode(remove_id); + resource_scheduler.GetClusterResourceManager().RemoveNode(remove_id); - ASSERT_TRUE(num_nodes - 1 == resource_scheduler.NumNodes()); + ASSERT_TRUE(num_nodes - 1 == resource_scheduler.GetClusterResourceManager().NumNodes()); } TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) { @@ -312,9 +314,10 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) { cust_capacities.push_back(rand() % 10); initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.AddOrUpdateNode(update_id, node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(update_id, + node_resources); } - ASSERT_TRUE(num_nodes == resource_scheduler.NumNodes()); + ASSERT_TRUE(num_nodes == resource_scheduler.GetClusterResourceManager().NumNodes()); } TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { @@ -323,7 +326,8 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { ClusterResourceScheduler resource_scheduler(local_node_id, resource_total, *gcs_client_); auto remote_node_id = NodeID::FromRandom().Binary(); - resource_scheduler.AddOrUpdateNode(remote_node_id, resource_total, resource_total); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + remote_node_id, resource_total, resource_total); absl::flat_hash_map resource_request({{"CPU", 1}}); int64_t violations; @@ -335,7 +339,8 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { &is_infeasible); ASSERT_EQ(node_id, local_node_id); absl::flat_hash_map resource_available({{"CPU", 9}}); - resource_scheduler.AddOrUpdateNode(local_node_id, resource_total, resource_available); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + local_node_id, resource_total, resource_available); node_id = resource_scheduler.GetBestSchedulableNode(resource_request, scheduling_strategy, false, false, false, &violations, &is_infeasible); @@ -368,11 +373,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { ASSERT_TRUE(violations == 0); NodeResources nr1, nr2; - ASSERT_TRUE(resource_scheduler.GetNodeResources(node_id, &nr1)); + ASSERT_TRUE( + resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr1)); auto task_allocation = std::make_shared(); ASSERT_TRUE(resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources( resource_request, task_allocation)); - ASSERT_TRUE(resource_scheduler.GetNodeResources(node_id, &nr2)); + ASSERT_TRUE( + resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr2)); for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) { auto t = nr1.predefined_resources[i].available - @@ -398,20 +405,23 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest) { absl::flat_hash_map initial_resources = { {ray::kCPU_ResourceLabel, 1}, {"custom", 1}}; - ClusterResourceScheduler resource_scheduler( - NodeID::FromRandom().Binary(), initial_resources, *gcs_client_, nullptr, nullptr); + std::string name = NodeID::FromRandom().Binary(); + ClusterResourceScheduler resource_scheduler(name, initial_resources, *gcs_client_, + nullptr, nullptr); resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( ray::kCPU_ResourceLabel, {0, 1, 1}); resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances("custom", {0, 1, 1}); - const auto &predefined_resources = - resource_scheduler.GetLocalNodeResources().predefined_resources; + const auto &predefined_resources = resource_scheduler.GetClusterResourceManager() + .GetNodeResources(name) + .predefined_resources; ASSERT_EQ(predefined_resources[CPU].total.Double(), 3); - const auto &custom_resources = - resource_scheduler.GetLocalNodeResources().custom_resources; + const auto &custom_resources = resource_scheduler.GetClusterResourceManager() + .GetNodeResources(name) + .custom_resources; auto resource_id = resource_scheduler.string_to_int_map_.Get("custom"); ASSERT_EQ(custom_resources.find(resource_id)->second.total.Double(), 3); } @@ -428,12 +438,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) { vector cust_ids{1, 2}; vector cust_capacities{5, 5}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.AddOrUpdateNode(node_id, node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id, + node_resources); nr = node_resources; } // Check whether node resources were correctly added. - if (resource_scheduler.GetNodeResources(node_id, &nr_out)) { + if (resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr_out)) { ASSERT_TRUE(nodeResourcesEqual(nr, nr_out)); } else { ASSERT_TRUE(false); @@ -446,10 +457,11 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) { vector cust_ids{2, 3}; vector cust_capacities{6, 6}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.AddOrUpdateNode(node_id, node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id, + node_resources); nr = node_resources; } - if (resource_scheduler.GetNodeResources(node_id, &nr_out)) { + if (resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr_out)) { ASSERT_TRUE(nodeResourcesEqual(nr, nr_out)); } else { ASSERT_TRUE(false); @@ -474,7 +486,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { vector cust_ids{1, 2}; vector cust_capacities{5, 5}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.AddOrUpdateNode(node_internal_id, node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_internal_id, + node_resources); } // Predefined resources, hard constraint violation { @@ -855,7 +868,8 @@ TEST_F(ClusterResourceSchedulerTest, DeadNodeTest) { absl::flat_hash_map resource; resource["CPU"] = 10000.0; auto node_id = NodeID::FromRandom(); - resource_scheduler.AddOrUpdateNode(node_id.Binary(), resource, resource); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id.Binary(), + resource, resource); int64_t violations = 0; bool is_infeasible = false; rpc::SchedulingStrategy scheduling_strategy; @@ -960,7 +974,7 @@ TEST_F(ClusterResourceSchedulerTest, expected_available_gpu_instances.begin())); NodeResources nr; - resource_scheduler.GetNodeResources(0, &nr); + resource_scheduler.GetClusterResourceManager().GetNodeResources(0, &nr); ASSERT_TRUE(nr.predefined_resources[GPU].available == 1.5); } @@ -981,7 +995,7 @@ TEST_F(ClusterResourceSchedulerTest, expected_available_gpu_instances.begin())); NodeResources nr; - resource_scheduler.GetNodeResources(0, &nr); + resource_scheduler.GetClusterResourceManager().GetNodeResources(0, &nr); ASSERT_TRUE(nr.predefined_resources[GPU].available == 3.8); } } @@ -1043,7 +1057,8 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { absl::flat_hash_map resource_spec({{"CPU", 1}}); ClusterResourceScheduler resource_scheduler("local", {}, *gcs_client_); for (int i = 0; i < 100; i++) { - resource_scheduler.AddOrUpdateNode(NodeID::FromRandom().Binary(), {}, {}); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + NodeID::FromRandom().Binary(), {}, {}); } // No feasible nodes. @@ -1059,7 +1074,8 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { // Feasible remote node, but doesn't currently have resources available. We // should spill there. auto remote_feasible = NodeID::FromRandom().Binary(); - resource_scheduler.AddOrUpdateNode(remote_feasible, resource_spec, {{"CPU", 0.}}); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + remote_feasible, resource_spec, {{"CPU", 0.}}); ASSERT_EQ(remote_feasible, resource_scheduler.GetBestSchedulableNode( resource_spec, scheduling_strategy, false, false, false, &total_violations, &is_infeasible)); @@ -1067,7 +1083,8 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { // Feasible remote node, and it currently has resources available. We should // prefer to spill there. auto remote_available = NodeID::FromRandom().Binary(); - resource_scheduler.AddOrUpdateNode(remote_available, resource_spec, resource_spec); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + remote_available, resource_spec, resource_spec); ASSERT_EQ(remote_available, resource_scheduler.GetBestSchedulableNode( resource_spec, scheduling_strategy, false, false, false, &total_violations, &is_infeasible)); @@ -1086,7 +1103,8 @@ TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) { vector other_cust_capacities{5., 4., 3., 2., 1.}; initNodeResources(other_node_resources, other_pred_capacities, cust_ids, other_cust_capacities); - resource_scheduler.AddOrUpdateNode(12345, other_node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(12345, + other_node_resources); { // Cluster is idle. rpc::ResourcesData data; @@ -1169,7 +1187,8 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) { vector other_cust_capacities{10.}; initNodeResources(other_node_resources, other_pred_capacities, cust_ids, other_cust_capacities); - resource_scheduler.AddOrUpdateNode(12345, other_node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(12345, + other_node_resources); { rpc::ResourcesData data; @@ -1212,7 +1231,8 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) { absl::flat_hash_map initial_resources({{"CPU", 1}}); ClusterResourceScheduler resource_scheduler("local", initial_resources, *gcs_client_); auto remote = NodeID::FromRandom().Binary(); - resource_scheduler.AddOrUpdateNode(remote, {{"CPU", 2.}}, {{"CPU", 2.}}); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(remote, {{"CPU", 2.}}, + {{"CPU", 2.}}); const absl::flat_hash_map task_spec = {{"CPU", 1.}}; // Allocate local resources to force tasks onto the remote node when @@ -1238,8 +1258,8 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) { scheduling_strategy.mutable_default_scheduling_strategy(); for (int i = 0; i < 3; i++) { // Remote node reports update local view. - resource_scheduler.AddOrUpdateNode(remote, {{"CPU", 2.}}, - {{"CPU", num_slots_available}}); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + remote, {{"CPU", 2.}}, {{"CPU", num_slots_available}}); for (int j = 0; j < num_slots_available; j++) { ASSERT_EQ(remote, resource_scheduler.GetBestSchedulableNode( task_spec, scheduling_strategy, false, false, true, &t, @@ -1317,7 +1337,8 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { std::vector node_ids; for (int i = 0; i < 100; i++) { node_ids.push_back(NodeID::FromRandom().Binary()); - resource_scheduler.AddOrUpdateNode(node_ids.back(), {}, {}); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_ids.back(), {}, + {}); } // No feasible nodes. @@ -1337,12 +1358,14 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { /*force_spillback=*/true, &total_violations, &is_infeasible), ""); // Choose a remote node that has the resources available. - resource_scheduler.AddOrUpdateNode(node_ids[50], resource_spec, {}); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_ids[50], + resource_spec, {}); ASSERT_EQ(resource_scheduler.GetBestSchedulableNode( resource_spec, scheduling_strategy, false, false, /*force_spillback=*/true, &total_violations, &is_infeasible), ""); - resource_scheduler.AddOrUpdateNode(node_ids[51], resource_spec, resource_spec); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + node_ids[51], resource_spec, resource_spec); ASSERT_EQ(resource_scheduler.GetBestSchedulableNode( resource_spec, scheduling_strategy, false, false, /*force_spillback=*/true, &total_violations, &is_infeasible), diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index f12102dfd..f3ee6f6a8 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -1255,8 +1255,8 @@ void ClusterTaskManager::Dispatch( if (predefined_resources[res_idx][inst_idx] > 0.) { if (first) { resource = reply->add_resource_mapping(); - resource->set_name( - cluster_resource_scheduler_->GetResourceNameFromIndex(res_idx)); + resource->set_name(cluster_resource_scheduler_->GetClusterResourceManager() + .GetResourceNameFromIndex(res_idx)); first = false; } auto rid = resource->add_resource_ids(); @@ -1273,8 +1273,8 @@ void ClusterTaskManager::Dispatch( if (it->second[inst_idx] > 0.) { if (first) { resource = reply->add_resource_mapping(); - resource->set_name( - cluster_resource_scheduler_->GetResourceNameFromIndex(it->first)); + resource->set_name(cluster_resource_scheduler_->GetClusterResourceManager() + .GetResourceNameFromIndex(it->first)); first = false; } auto rid = resource->add_resource_ids(); @@ -1500,8 +1500,8 @@ void ClusterTaskManager::SpillWaitingTasks() { bool ClusterTaskManager::IsLocallySchedulable(const RayTask &task) const { const auto &spec = task.GetTaskSpecification(); - return cluster_resource_scheduler_->IsLocallySchedulable( - spec.GetRequiredResources().GetResourceMap()); + return cluster_resource_scheduler_->IsSchedulableOnNode( + self_node_id_.Binary(), spec.GetRequiredResources().GetResourceMap()); } ResourceSet ClusterTaskManager::CalcNormalTaskResources() const { diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index c934f3179..343bf248d 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -275,7 +275,8 @@ class ClusterTaskManagerTest : public ::testing::Test { node_resources[ray::kCPU_ResourceLabel] = num_cpus; node_resources[ray::kGPU_ResourceLabel] = num_gpus; node_resources[ray::kMemory_ResourceLabel] = memory; - scheduler_->AddOrUpdateNode(id.Binary(), node_resources, node_resources); + scheduler_->GetClusterResourceManager().AddOrUpdateNode(id.Binary(), node_resources, + node_resources); rpc::GcsNodeInfo info; node_info_[id] = info; @@ -1433,7 +1434,8 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) { } TEST_F(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources) { - const NodeResources &node_resources = scheduler_->GetLocalNodeResources(); + const NodeResources &node_resources = + scheduler_->GetClusterResourceManager().GetNodeResources(id_.Binary()); ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8); ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 4);