[resource-reporting 4/n] Separate cluster resource manager from cluster resource scheduler (#21992)

As discussed, we need to separate the cluster resource management logic from scheduling logic. In this PR, we create the cluster_resource_manager to handle the resource management; and the cluster resource scheduler is only responsible for scheduling.

* more clean up

* refactor

* address comments
This commit is contained in:
Chen Shen 2022-01-31 21:16:58 -08:00 committed by GitHub
parent b3fd3c6828
commit 4b528a7255
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 593 additions and 524 deletions

View file

@ -1,41 +0,0 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
class MockClusterResourceScheduler : public ClusterResourceScheduler {
public:
MOCK_METHOD(bool, UpdateNode,
(const std::string &node_id_string,
const rpc::ResourcesData &resource_data),
(override));
MOCK_METHOD(bool, RemoveNode, (const std::string &node_id_string), (override));
MOCK_METHOD(void, UpdateResourceCapacity,
(const std::string &node_name, const std::string &resource_name,
double resource_total),
(override));
MOCK_METHOD(void, DeleteResource,
(const std::string &node_name, const std::string &resource_name),
(override));
MOCK_METHOD(std::string, GetLocalResourceViewString, (), (const, override));
MOCK_METHOD(void, FillResourceUsage, (rpc::ResourcesData & resources_data), (override));
MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals,
(const absl::flat_hash_map<std::string, double> &resource_map_filter),
(const, override));
MOCK_METHOD(void, UpdateLastResourceUsage,
(const std::shared_ptr<SchedulingResources> gcs_resources), (override));
MOCK_METHOD(double, GetLocalAvailableCpus, (), (const, override));
};
} // namespace ray

View file

@ -1,41 +0,0 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
class MockClusterResourceSchedulerInterface : public ClusterResourceSchedulerInterface {
public:
MOCK_METHOD(bool, RemoveNode, (const std::string &node_id_string), (override));
MOCK_METHOD(bool, UpdateNode,
(const std::string &node_id_string,
const rpc::ResourcesData &resource_data),
(override));
MOCK_METHOD(void, UpdateResourceCapacity,
(const std::string &node_id_string, const std::string &resource_name,
double resource_total),
(override));
MOCK_METHOD(void, DeleteResource,
(const std::string &node_id_string, const std::string &resource_name),
(override));
MOCK_METHOD(void, UpdateLastResourceUsage,
(const std::shared_ptr<SchedulingResources> gcs_resources), (override));
MOCK_METHOD(void, FillResourceUsage, (rpc::ResourcesData & data), (override));
MOCK_METHOD(double, GetLocalAvailableCpus, (), (const, override));
MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals,
(const absl::flat_hash_map<std::string, double> &resource_map_filter),
(const, override));
MOCK_METHOD(std::string, GetLocalResourceViewString, (), (const, override));
};
} // namespace ray

View file

@ -760,7 +760,8 @@ void NodeManager::WarnResourceDeadlock() {
<< exemplar.GetTaskSpecification().GetRequiredPlacementResources().ToString()
<< "\n"
<< "Available resources on this node: "
<< cluster_resource_scheduler_->GetLocalResourceViewString()
<< cluster_resource_scheduler_->GetClusterResourceManager()
.GetNodeResourceViewString(self_node_id_.Binary())
<< " In total there are " << pending_tasks << " pending tasks and "
<< pending_actor_creations << " pending actors on this node.";
@ -835,7 +836,8 @@ void NodeManager::NodeRemoved(const NodeID &node_id) {
// not be necessary.
// Remove the node from the resource map.
if (!cluster_resource_scheduler_->RemoveNode(node_id.Binary())) {
if (!cluster_resource_scheduler_->GetClusterResourceManager().RemoveNode(
node_id.Binary())) {
RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id
<< ".";
return;
@ -920,8 +922,8 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) {
const std::string &resource_label = resource_pair.first;
const double &new_resource_capacity = resource_pair.second;
cluster_resource_scheduler_->UpdateResourceCapacity(node_id.Binary(), resource_label,
new_resource_capacity);
cluster_resource_scheduler_->GetClusterResourceManager().UpdateResourceCapacity(
node_id.Binary(), resource_label, new_resource_capacity);
}
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";
cluster_task_manager_->ScheduleAndDispatchTasks();
@ -948,14 +950,16 @@ void NodeManager::ResourceDeleted(const NodeID &node_id,
// Update local_available_resources_ and SchedulingResources
for (const auto &resource_label : resource_names) {
cluster_resource_scheduler_->DeleteResource(node_id.Binary(), resource_label);
cluster_resource_scheduler_->GetClusterResourceManager().DeleteResource(
node_id.Binary(), resource_label);
}
return;
}
void NodeManager::UpdateResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resource_data) {
if (!cluster_resource_scheduler_->UpdateNode(node_id.Binary(), resource_data)) {
if (!cluster_resource_scheduler_->GetClusterResourceManager().UpdateNode(
node_id.Binary(), resource_data)) {
RAY_LOG(INFO)
<< "[UpdateResourceUsage]: received resource usage from unknown node id "
<< node_id;
@ -1591,7 +1595,8 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
<< " actor_id = " << actor_id
<< ", normal_task_resources = " << normal_task_resources.ToString()
<< ", local_resoruce_view = "
<< cluster_resource_scheduler_->GetLocalResourceViewString();
<< cluster_resource_scheduler_->GetClusterResourceManager()
.GetNodeResourceViewString(self_node_id_.Binary());
auto resources_data = reply->mutable_resources_data();
resources_data->set_node_id(self_node_id_.Binary());
resources_data->set_resources_normal_task_changed(true);

View file

@ -61,7 +61,9 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
}
void CheckRemainingResourceCorrect(NodeResources &node_resources) {
auto local_node_resource = cluster_resource_scheduler_->GetLocalNodeResources();
auto local_node_resource =
cluster_resource_scheduler_->GetClusterResourceManager().GetNodeResources(
"local");
ASSERT_TRUE(local_node_resource == node_resources);
}
@ -135,7 +137,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources(
unit_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
CheckRemainingResourceCorrect(remaining_resource_instance);
}
@ -162,7 +165,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", unit_resource, *gcs_client_);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
CheckRemainingResourceCorrect(remaining_resource_instance);
}
@ -207,7 +211,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources(
init_unit_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
CheckRemainingResourceCorrect(remaining_resource_instance);
/// 5. return second bundle.
@ -230,7 +235,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
{"CPU", 1.0},
{"bundle_group_" + group_id.Hex(), 1000}},
resource_instances));
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
CheckRemainingResourceCorrect(remaining_resource_instance);
/// 7. return first bundle.
new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec);
@ -238,7 +245,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
remaining_resources = {{"CPU", 2.0}};
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
ASSERT_TRUE(update_called_);
ASSERT_TRUE(delete_called_);
CheckRemainingResourceCorrect(remaining_resource_instance);
@ -268,7 +277,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare)
remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources(
unit_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
CheckRemainingResourceCorrect(remaining_resource_instance);
}
@ -304,7 +314,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources(
unit_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
CheckRemainingResourceCorrect(remaining_resource_instance);
new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
// 5. prepare bundle -> commit bundle -> commit bundle.
@ -326,7 +337,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
// 8. check remaining resources is correct.
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", available_resource, *gcs_client_);
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
CheckRemainingResourceCorrect(remaining_resource_instance);
}
@ -349,7 +362,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
CheckRemainingResourceCorrect(remaining_resource_instance);
// 5. re-init the local available resource with 4 CPUs.
available_resource = {std::make_pair("CPU", 4.0)};
@ -380,7 +394,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
ASSERT_TRUE(
remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources(
allocating_resource, resource_instances));
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
RAY_LOG(INFO) << "The current local resource view: "
<< cluster_resource_scheduler_->DebugString();
CheckRemainingResourceCorrect(remaining_resource_instance);
@ -425,7 +441,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) {
remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources(
allocating_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
RAY_LOG(INFO) << "The current local resource view: "
<< cluster_resource_scheduler_->DebugString();
CheckRemainingResourceCorrect(remaining_resource_instance);

View file

@ -0,0 +1,261 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/raylet/scheduling/cluster_resource_manager.h"
#include <boost/algorithm/string.hpp>
#include "ray/common/grpc_util.h"
#include "ray/common/ray_config.h"
#include "ray/util/container_util.h"
namespace ray {
ClusterResourceManager::ClusterResourceManager(StringIdMap &string_to_int_map)
: nodes_{}, string_to_int_map_{string_to_int_map} {}
void ClusterResourceManager::AddOrUpdateNode(
const std::string &node_id,
const absl::flat_hash_map<std::string, double> &resources_total,
const absl::flat_hash_map<std::string, double> &resources_available) {
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, resources_total, resources_available);
AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources);
}
void ClusterResourceManager::AddOrUpdateNode(int64_t node_id,
const NodeResources &node_resources) {
RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id << ", node_resources: "
<< node_resources.DebugString(string_to_int_map_);
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// This node is new, so add it to the map.
nodes_.emplace(node_id, node_resources);
} else {
// This node exists, so update its resources.
it->second = Node(node_resources);
}
}
bool ClusterResourceManager::UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) {
auto node_id = string_to_int_map_.Insert(node_id_string);
if (!nodes_.contains(node_id)) {
return false;
}
auto resources_total = MapFromProtobuf(resource_data.resources_total());
auto resources_available = MapFromProtobuf(resource_data.resources_available());
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, resources_total, resources_available);
NodeResources local_view;
RAY_CHECK(GetNodeResources(node_id, &local_view));
if (resource_data.resources_total_size() > 0) {
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
local_view.predefined_resources[i].total =
node_resources.predefined_resources[i].total;
}
for (auto &entry : node_resources.custom_resources) {
local_view.custom_resources[entry.first].total = entry.second.total;
}
}
if (resource_data.resources_available_changed()) {
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
local_view.predefined_resources[i].available =
node_resources.predefined_resources[i].available;
}
for (auto &entry : node_resources.custom_resources) {
local_view.custom_resources[entry.first].available = entry.second.available;
}
local_view.object_pulls_queued = resource_data.object_pulls_queued();
}
AddOrUpdateNode(node_id, local_view);
return true;
}
bool ClusterResourceManager::RemoveNode(int64_t node_id) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// Node not found.
return false;
} else {
nodes_.erase(it);
return true;
}
}
bool ClusterResourceManager::RemoveNode(const std::string &node_id_string) {
auto node_id = string_to_int_map_.Get(node_id_string);
if (node_id == -1) {
return false;
}
return RemoveNode(node_id);
}
bool ClusterResourceManager::GetNodeResources(int64_t node_id,
NodeResources *ret_resources) const {
auto it = nodes_.find(node_id);
if (it != nodes_.end()) {
*ret_resources = it->second.GetLocalView();
return true;
} else {
return false;
}
}
const NodeResources &ClusterResourceManager::GetNodeResources(
const std::string &node_name) const {
int64_t node_id = string_to_int_map_.Get(node_name);
const auto &node = map_find_or_die(nodes_, node_id);
return node.GetLocalView();
}
int64_t ClusterResourceManager::NumNodes() const { return nodes_.size(); }
void ClusterResourceManager::UpdateResourceCapacity(const std::string &node_id_string,
const std::string &resource_name,
double resource_total) {
int64_t node_id = string_to_int_map_.Get(node_id_string);
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
NodeResources node_resources;
node_resources.predefined_resources.resize(PredefinedResources_MAX);
node_id = string_to_int_map_.Insert(node_id_string);
it = nodes_.emplace(node_id, node_resources).first;
}
int idx = GetPredefinedResourceIndex(resource_name);
auto local_view = it->second.GetMutableLocalView();
FixedPoint resource_total_fp(resource_total);
if (idx != -1) {
auto diff_capacity = resource_total_fp - local_view->predefined_resources[idx].total;
local_view->predefined_resources[idx].total += diff_capacity;
local_view->predefined_resources[idx].available += diff_capacity;
if (local_view->predefined_resources[idx].available < 0) {
local_view->predefined_resources[idx].available = 0;
}
if (local_view->predefined_resources[idx].total < 0) {
local_view->predefined_resources[idx].total = 0;
}
} else {
string_to_int_map_.Insert(resource_name);
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = local_view->custom_resources.find(resource_id);
if (itr != local_view->custom_resources.end()) {
auto diff_capacity = resource_total_fp - itr->second.total;
itr->second.total += diff_capacity;
itr->second.available += diff_capacity;
if (itr->second.available < 0) {
itr->second.available = 0;
}
if (itr->second.total < 0) {
itr->second.total = 0;
}
} else {
ResourceCapacity resource_capacity;
resource_capacity.total = resource_capacity.available = resource_total_fp;
local_view->custom_resources.emplace(resource_id, resource_capacity);
}
}
}
void ClusterResourceManager::DeleteResource(const std::string &node_id_string,
const std::string &resource_name) {
int64_t node_id = string_to_int_map_.Get(node_id_string);
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return;
}
int idx = GetPredefinedResourceIndex(resource_name);
auto local_view = it->second.GetMutableLocalView();
if (idx != -1) {
local_view->predefined_resources[idx].available = 0;
local_view->predefined_resources[idx].total = 0;
} else {
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = local_view->custom_resources.find(resource_id);
if (itr != local_view->custom_resources.end()) {
local_view->custom_resources.erase(itr);
}
}
}
std::string ClusterResourceManager::GetNodeResourceViewString(
const std::string &node_name) const {
int64_t node_id = string_to_int_map_.Get(node_name);
const auto &node = map_find_or_die(nodes_, node_id);
return node.GetLocalView().DictString(string_to_int_map_);
}
std::string ClusterResourceManager::GetResourceNameFromIndex(int64_t res_idx) {
if (res_idx == CPU) {
return ray::kCPU_ResourceLabel;
} else if (res_idx == GPU) {
return ray::kGPU_ResourceLabel;
} else if (res_idx == OBJECT_STORE_MEM) {
return ray::kObjectStoreMemory_ResourceLabel;
} else if (res_idx == MEM) {
return ray::kMemory_ResourceLabel;
} else {
return string_to_int_map_.Get((uint64_t)res_idx);
}
}
const absl::flat_hash_map<int64_t, Node> &ClusterResourceManager::GetResourceView()
const {
return nodes_;
}
bool ClusterResourceManager::SubtractNodeAvailableResources(
int64_t node_id, const ResourceRequest &resource_request) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return false;
}
NodeResources *resources = it->second.GetMutableLocalView();
FixedPoint zero(0.);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
resources->predefined_resources[i].available =
std::max(FixedPoint(0), resources->predefined_resources[i].available -
resource_request.predefined_resources[i]);
}
for (const auto &task_req_custom_resource : resource_request.custom_resources) {
auto it = resources->custom_resources.find(task_req_custom_resource.first);
if (it != resources->custom_resources.end()) {
it->second.available =
std::max(FixedPoint(0), it->second.available - task_req_custom_resource.second);
}
}
// TODO(swang): We should also subtract object store memory if the task has
// arguments. Right now we do not modify object_pulls_queued in case of
// performance regressions in spillback.
return true;
}
} // namespace ray

View file

@ -0,0 +1,142 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <gtest/gtest_prod.h>
#include <iostream>
#include <sstream>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/raylet/scheduling/cluster_resource_data.h"
#include "ray/raylet/scheduling/fixed_point.h"
#include "ray/raylet/scheduling/local_resource_manager.h"
#include "ray/util/logging.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace raylet {
class ClusterTaskManagerTest;
}
/// Class manages the resources view of the entire cluster.
/// This class is not thread safe.
class ClusterResourceManager {
public:
explicit ClusterResourceManager(StringIdMap &string_to_int_map);
/// Get the resource view of the cluster.
const absl::flat_hash_map<int64_t, Node> &GetResourceView() const;
// Mapping from predefined resource indexes to resource strings
std::string GetResourceNameFromIndex(int64_t res_idx);
/// Update node resources. This hanppens when a node resource usage udpated.
///
/// \param node_id_string ID of the node which resoruces need to be udpated.
/// \param resource_data The node resource data.
bool UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id_string ID of the node to be removed.
bool RemoveNode(const std::string &node_id_string);
/// Get number of nodes in the cluster.
int64_t NumNodes() const;
/// Update total capacity of a given resource of a given node.
///
/// \param node_name: Node whose resource we want to update.
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void UpdateResourceCapacity(const std::string &node_name,
const std::string &resource_name, double resource_total);
/// Delete a given resource from a given node.
///
/// \param node_name: Node whose resource we want to delete.
/// \param resource_name: Resource we want to delete
void DeleteResource(const std::string &node_name, const std::string &resource_name);
/// Return local resources in human-readable string form.
std::string GetNodeResourceViewString(const std::string &node_name) const;
/// Get local resource.
const NodeResources &GetNodeResources(const std::string &node_name) const;
/// Subtract available resource from a given node.
//// Return false if such node doesn't exist.
bool SubtractNodeAvailableResources(int64_t node_id,
const ResourceRequest &resource_request);
private:
friend class ClusterResourceScheduler;
/// Add a new node or overwrite the resources of an existing node.
///
/// \param node_id: Node ID.
/// \param node_resources: Up to date total and available resources of the node.
void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources);
void AddOrUpdateNode(
const std::string &node_id,
const absl::flat_hash_map<std::string, double> &resource_map_total,
const absl::flat_hash_map<std::string, double> &resource_map_available);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id ID of the node to be removed.
bool RemoveNode(int64_t node_id);
/// Return resources associated to the given node_id in ret_resources.
/// If node_id not found, return false; otherwise return true.
bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const;
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
absl::flat_hash_map<int64_t, Node> nodes_;
/// Keep the mapping between node and resource IDs in string representation
/// to integer representation. Used for improving map performance.
StringIdMap &string_to_int_map_;
friend class ClusterResourceSchedulerTest;
friend class raylet::ClusterTaskManagerTest;
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest);
FRIEND_TEST(ClusterResourceSchedulerTest,
UpdateLocalAvailableResourcesFromResourceInstancesTest);
FRIEND_TEST(ClusterResourceSchedulerTest, ResourceUsageReportTest);
FRIEND_TEST(ClusterResourceSchedulerTest, DeadNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask);
FRIEND_TEST(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest);
FRIEND_TEST(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest);
FRIEND_TEST(ClusterResourceSchedulerTest, DirtyLocalViewTest);
FRIEND_TEST(ClusterResourceSchedulerTest, DynamicResourceTest);
FRIEND_TEST(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources);
FRIEND_TEST(ClusterResourceSchedulerTest, TestForceSpillback);
};
} // end namespace ray

View file

@ -21,21 +21,39 @@
namespace ray {
ClusterResourceScheduler::ClusterResourceScheduler() {
cluster_resource_manager_ =
std::make_unique<ClusterResourceManager>(string_to_int_map_);
NodeResources node_resources;
node_resources.predefined_resources.resize(PredefinedResources_MAX);
local_resource_manager_ = std::make_unique<LocalResourceManager>(
local_node_id_, string_to_int_map_, node_resources,
/*get_used_object_store_memory*/ nullptr, /*get_pull_manager_at_capacity*/ nullptr,
[&](const NodeResources &local_resource_update) {
cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update);
});
scheduling_policy_ = std::make_unique<raylet_scheduling_policy::SchedulingPolicy>(
local_node_id_, cluster_resource_manager_->GetResourceView());
}
ClusterResourceScheduler::ClusterResourceScheduler(
int64_t local_node_id, const NodeResources &local_node_resources,
gcs::GcsClient &gcs_client)
: local_node_id_(local_node_id),
: string_to_int_map_(),
local_node_id_(local_node_id),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()),
gcs_client_(&gcs_client) {
scheduling_policy_ = std::make_unique<raylet_scheduling_policy::SchedulingPolicy>(
local_node_id_, nodes_);
cluster_resource_manager_ =
std::make_unique<ClusterResourceManager>(string_to_int_map_);
local_resource_manager_ = std::make_unique<LocalResourceManager>(
local_node_id, string_to_int_map_, local_node_resources,
/*get_used_object_store_memory*/ nullptr, /*get_pull_manager_at_capacity*/ nullptr,
[&](const NodeResources &local_resource_update) {
this->AddOrUpdateNode(local_node_id_, local_resource_update);
cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update);
});
AddOrUpdateNode(local_node_id_, local_node_resources);
cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_node_resources);
scheduling_policy_ = std::make_unique<raylet_scheduling_policy::SchedulingPolicy>(
local_node_id_, cluster_resource_manager_->GetResourceView());
}
ClusterResourceScheduler::ClusterResourceScheduler(
@ -43,19 +61,23 @@ ClusterResourceScheduler::ClusterResourceScheduler(
const absl::flat_hash_map<std::string, double> &local_node_resources,
gcs::GcsClient &gcs_client, std::function<int64_t(void)> get_used_object_store_memory,
std::function<bool(void)> get_pull_manager_at_capacity)
: gcs_client_(&gcs_client) {
: string_to_int_map_(),
local_node_id_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()),
gcs_client_(&gcs_client) {
local_node_id_ = string_to_int_map_.Insert(local_node_id);
scheduling_policy_ = std::make_unique<raylet_scheduling_policy::SchedulingPolicy>(
local_node_id_, nodes_);
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, local_node_resources, local_node_resources);
cluster_resource_manager_ =
std::make_unique<ClusterResourceManager>(string_to_int_map_);
local_resource_manager_ = std::make_unique<LocalResourceManager>(
local_node_id_, string_to_int_map_, node_resources, get_used_object_store_memory,
get_pull_manager_at_capacity, [&](const NodeResources &local_resource_update) {
this->AddOrUpdateNode(local_node_id_, local_resource_update);
cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update);
});
AddOrUpdateNode(local_node_id_, node_resources);
cluster_resource_manager_->AddOrUpdateNode(local_node_id_, node_resources);
scheduling_policy_ = std::make_unique<raylet_scheduling_policy::SchedulingPolicy>(
local_node_id_, cluster_resource_manager_->GetResourceView());
}
bool ClusterResourceScheduler::NodeAlive(int64_t node_id) const {
@ -69,89 +91,6 @@ bool ClusterResourceScheduler::NodeAlive(int64_t node_id) const {
return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id_binary)) != nullptr;
}
void ClusterResourceScheduler::AddOrUpdateNode(
const std::string &node_id,
const absl::flat_hash_map<std::string, double> &resources_total,
const absl::flat_hash_map<std::string, double> &resources_available) {
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, resources_total, resources_available);
AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources);
}
void ClusterResourceScheduler::AddOrUpdateNode(int64_t node_id,
const NodeResources &node_resources) {
RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id << ", node_resources: "
<< node_resources.DebugString(string_to_int_map_);
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// This node is new, so add it to the map.
nodes_.emplace(node_id, node_resources);
} else {
// This node exists, so update its resources.
it->second = Node(node_resources);
}
}
bool ClusterResourceScheduler::UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) {
auto node_id = string_to_int_map_.Insert(node_id_string);
if (!nodes_.contains(node_id)) {
return false;
}
auto resources_total = MapFromProtobuf(resource_data.resources_total());
auto resources_available = MapFromProtobuf(resource_data.resources_available());
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, resources_total, resources_available);
NodeResources local_view;
RAY_CHECK(GetNodeResources(node_id, &local_view));
if (resource_data.resources_total_size() > 0) {
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
local_view.predefined_resources[i].total =
node_resources.predefined_resources[i].total;
}
for (auto &entry : node_resources.custom_resources) {
local_view.custom_resources[entry.first].total = entry.second.total;
}
}
if (resource_data.resources_available_changed()) {
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
local_view.predefined_resources[i].available =
node_resources.predefined_resources[i].available;
}
for (auto &entry : node_resources.custom_resources) {
local_view.custom_resources[entry.first].available = entry.second.available;
}
local_view.object_pulls_queued = resource_data.object_pulls_queued();
}
AddOrUpdateNode(node_id, local_view);
return true;
}
bool ClusterResourceScheduler::RemoveNode(int64_t node_id) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// Node not found.
return false;
} else {
nodes_.erase(it);
return true;
}
}
bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) {
auto node_id = string_to_int_map_.Get(node_id_string);
if (node_id == -1) {
return false;
}
return RemoveNode(node_id);
}
bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_request,
int64_t node_id,
const NodeResources &resources) const {
@ -202,11 +141,12 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(
int64_t best_node = -1;
// This is an actor which requires no resources.
// Pick a random node to to avoid scheduling all actors on the local node.
if (nodes_.size() > 0) {
std::uniform_int_distribution<int> distribution(0, nodes_.size() - 1);
const auto &resource_view = cluster_resource_manager_->GetResourceView();
if (resource_view.size() > 0) {
std::uniform_int_distribution<int> distribution(0, resource_view.size() - 1);
int idx = distribution(gen_);
auto iter = std::next(nodes_.begin(), idx);
for (size_t i = 0; i < nodes_.size(); ++i) {
auto iter = std::next(resource_view.begin(), idx);
for (size_t i = 0; i < resource_view.size(); ++i) {
// TODO(iycheng): Here is there are a lot of nodes died, the
// distribution might not be even.
if (NodeAlive(iter->first)) {
@ -214,13 +154,13 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(
break;
}
++iter;
if (iter == nodes_.end()) {
iter = nodes_.begin();
if (iter == resource_view.end()) {
iter = resource_view.begin();
}
}
}
RAY_LOG(DEBUG) << "GetBestSchedulableNode, best_node = " << best_node
<< ", # nodes = " << nodes_.size()
<< ", # nodes = " << resource_view.size()
<< ", resource_request = " << resource_request.DebugString();
return best_node;
}
@ -273,168 +213,36 @@ std::string ClusterResourceScheduler::GetBestSchedulableNode(
bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources(
int64_t node_id, const ResourceRequest &resource_request) {
RAY_CHECK(node_id != local_node_id_);
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
const auto &resource_view = cluster_resource_manager_->GetResourceView();
auto it = resource_view.find(node_id);
if (it == resource_view.end()) {
return false;
}
NodeResources *resources = it->second.GetMutableLocalView();
// Just double check this node can still schedule the resource request.
if (!IsSchedulable(resource_request, node_id, *resources)) {
if (!IsSchedulable(resource_request, node_id, it->second.GetLocalView())) {
return false;
}
FixedPoint zero(0.);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
resources->predefined_resources[i].available =
std::max(FixedPoint(0), resources->predefined_resources[i].available -
resource_request.predefined_resources[i]);
}
for (const auto &task_req_custom_resource : resource_request.custom_resources) {
auto it = resources->custom_resources.find(task_req_custom_resource.first);
if (it != resources->custom_resources.end()) {
it->second.available =
std::max(FixedPoint(0), it->second.available - task_req_custom_resource.second);
}
}
// TODO(swang): We should also subtract object store memory if the task has
// arguments. Right now we do not modify object_pulls_queued in case of
// performance regressions in spillback.
return true;
return cluster_resource_manager_->SubtractNodeAvailableResources(node_id,
resource_request);
}
bool ClusterResourceScheduler::GetNodeResources(int64_t node_id,
NodeResources *ret_resources) const {
auto it = nodes_.find(node_id);
if (it != nodes_.end()) {
*ret_resources = it->second.GetLocalView();
return true;
} else {
return false;
}
}
const NodeResources &ClusterResourceScheduler::GetLocalNodeResources() const {
const auto &node_it = nodes_.find(local_node_id_);
RAY_CHECK(node_it != nodes_.end());
return node_it->second.GetLocalView();
}
int64_t ClusterResourceScheduler::NumNodes() const { return nodes_.size(); }
const StringIdMap &ClusterResourceScheduler::GetStringIdMap() const {
return string_to_int_map_;
}
void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id_string,
const std::string &resource_name,
double resource_total) {
int64_t node_id = string_to_int_map_.Get(node_id_string);
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
NodeResources node_resources;
node_resources.predefined_resources.resize(PredefinedResources_MAX);
node_id = string_to_int_map_.Insert(node_id_string);
it = nodes_.emplace(node_id, node_resources).first;
}
int idx = GetPredefinedResourceIndex(resource_name);
auto local_view = it->second.GetMutableLocalView();
FixedPoint resource_total_fp(resource_total);
if (idx != -1) {
auto diff_capacity = resource_total_fp - local_view->predefined_resources[idx].total;
local_view->predefined_resources[idx].total += diff_capacity;
local_view->predefined_resources[idx].available += diff_capacity;
if (local_view->predefined_resources[idx].available < 0) {
local_view->predefined_resources[idx].available = 0;
}
if (local_view->predefined_resources[idx].total < 0) {
local_view->predefined_resources[idx].total = 0;
}
} else {
string_to_int_map_.Insert(resource_name);
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = local_view->custom_resources.find(resource_id);
if (itr != local_view->custom_resources.end()) {
auto diff_capacity = resource_total_fp - itr->second.total;
itr->second.total += diff_capacity;
itr->second.available += diff_capacity;
if (itr->second.available < 0) {
itr->second.available = 0;
}
if (itr->second.total < 0) {
itr->second.total = 0;
}
} else {
ResourceCapacity resource_capacity;
resource_capacity.total = resource_capacity.available = resource_total_fp;
local_view->custom_resources.emplace(resource_id, resource_capacity);
}
}
}
void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string,
const std::string &resource_name) {
int64_t node_id = string_to_int_map_.Get(node_id_string);
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return;
}
int idx = GetPredefinedResourceIndex(resource_name);
auto local_view = it->second.GetMutableLocalView();
if (idx != -1) {
local_view->predefined_resources[idx].available = 0;
local_view->predefined_resources[idx].total = 0;
} else {
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = local_view->custom_resources.find(resource_id);
if (itr != local_view->custom_resources.end()) {
local_view->custom_resources.erase(itr);
}
}
}
std::string ClusterResourceScheduler::DebugString(void) const {
std::stringstream buffer;
buffer << "\nLocal id: " << local_node_id_;
buffer << " Local resources: " << local_resource_manager_->DebugString();
for (auto &node : nodes_) {
for (auto &node : cluster_resource_manager_->GetResourceView()) {
buffer << "node id: " << node.first;
buffer << node.second.GetLocalView().DebugString(string_to_int_map_);
}
return buffer.str();
}
std::string ClusterResourceScheduler::GetLocalResourceViewString() const {
const auto &node_it = nodes_.find(local_node_id_);
RAY_CHECK(node_it != nodes_.end());
return node_it->second.GetLocalView().DictString(string_to_int_map_);
}
std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx) {
if (res_idx == CPU) {
return ray::kCPU_ResourceLabel;
} else if (res_idx == GPU) {
return ray::kGPU_ResourceLabel;
} else if (res_idx == OBJECT_STORE_MEM) {
return ray::kObjectStoreMemory_ResourceLabel;
} else if (res_idx == MEM) {
return ray::kMemory_ResourceLabel;
} else {
return string_to_int_map_.Get((uint64_t)res_idx);
}
}
bool ClusterResourceScheduler::AllocateRemoteTaskResources(
const std::string &node_string,
const absl::flat_hash_map<std::string, double> &task_resources) {
@ -445,11 +253,13 @@ bool ClusterResourceScheduler::AllocateRemoteTaskResources(
return SubtractRemoteNodeAvailableResources(node_id, resource_request);
}
bool ClusterResourceScheduler::IsLocallySchedulable(
const absl::flat_hash_map<std::string, double> &shape) {
bool ClusterResourceScheduler::IsSchedulableOnNode(
const std::string &node_name, const absl::flat_hash_map<std::string, double> &shape) {
int64_t node_id = string_to_int_map_.Get(node_name);
auto resource_request = ResourceMapToResourceRequest(
string_to_int_map_, shape, /*requires_object_store_memory=*/false);
return IsSchedulable(resource_request, local_node_id_, GetLocalNodeResources());
return IsSchedulable(resource_request, node_id,
cluster_resource_manager_->GetNodeResources(node_name));
}
} // namespace ray

View file

@ -26,7 +26,7 @@
#include "ray/gcs/gcs_client/accessor.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/raylet/scheduling/cluster_resource_data.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
#include "ray/raylet/scheduling/fixed_point.h"
#include "ray/raylet/scheduling/local_resource_manager.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
@ -41,9 +41,9 @@ using rpc::HeartbeatTableData;
/// Class encapsulating the cluster resources and the logic to assign
/// tasks to nodes based on the task's constraints and the available
/// resources at those nodes.
class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
class ClusterResourceScheduler {
public:
ClusterResourceScheduler() {}
ClusterResourceScheduler();
/// Constructor initializing the resources associated with the local node.
///
/// \param local_node_id: ID of local node,
@ -59,26 +59,7 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
std::function<int64_t(void)> get_used_object_store_memory = nullptr,
std::function<bool(void)> get_pull_manager_at_capacity = nullptr);
// Mapping from predefined resource indexes to resource strings
std::string GetResourceNameFromIndex(int64_t res_idx);
void AddOrUpdateNode(
const std::string &node_id,
const absl::flat_hash_map<std::string, double> &resource_map_total,
const absl::flat_hash_map<std::string, double> &resource_map_available);
/// Update node resources. This hanppens when a node resource usage udpated.
///
/// \param node_id_string ID of the node which resoruces need to be udpated.
/// \param resource_data The node resource data.
bool UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) override;
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id_string ID of the node to be removed.
bool RemoveNode(const std::string &node_id_string) override;
const StringIdMap &GetStringIdMap() const;
/// Find a node in the cluster on which we can schedule a given resource request.
/// In hybrid mode, see `scheduling_policy.h` for a description of the policy.
@ -114,31 +95,6 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
bool requires_object_store_memory, bool actor_creation, bool force_spillback,
int64_t *violations, bool *is_infeasible);
/// Get number of nodes in the cluster.
int64_t NumNodes() const;
/// Temporarily get the StringIDMap.
const StringIdMap &GetStringIdMap() const;
/// Update total capacity of a given resource of a given node.
///
/// \param node_name: Node whose resource we want to update.
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void UpdateResourceCapacity(const std::string &node_name,
const std::string &resource_name,
double resource_total) override;
/// Delete a given resource from a given node.
///
/// \param node_name: Node whose resource we want to delete.
/// \param resource_name: Resource we want to delete
void DeleteResource(const std::string &node_name,
const std::string &resource_name) override;
/// Return local resources in human-readable string form.
std::string GetLocalResourceViewString() const override;
/// Subtract the resources required by a given resource request (resource_request) from
/// a given remote node.
///
@ -153,16 +109,18 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
/// Return human-readable string for this scheduler state.
std::string DebugString() const;
/// Check whether a task request is schedulable on a the local node. A node is
/// Check whether a task request is schedulable on a given node. A node is
/// schedulable if it has the available resources needed to execute the task.
///
/// \param node_name Name of the node.
/// \param shape The resource demand's shape.
bool IsLocallySchedulable(const absl::flat_hash_map<std::string, double> &shape);
bool IsSchedulableOnNode(const std::string &node_name,
const absl::flat_hash_map<std::string, double> &shape);
LocalResourceManager &GetLocalResourceManager() { return *local_resource_manager_; }
/// Get local node resources; test only.
const NodeResources &GetLocalNodeResources() const;
ClusterResourceManager &GetClusterResourceManager() {
return *cluster_resource_manager_;
}
private:
bool NodeAlive(int64_t node_id) const;
@ -178,18 +136,6 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
bool SubtractRemoteNodeAvailableResources(int64_t node_id,
const ResourceRequest &resource_request);
/// Add a new node or overwrite the resources of an existing node.
///
/// \param node_id: Node ID.
/// \param node_resources: Up to date total and available resources of the node.
void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id ID of the node to be removed.
bool RemoveNode(int64_t node_id);
/// Check whether a resource request can be scheduled given a node.
///
/// \param resource_request: Resource request to be scheduled.
@ -204,40 +150,41 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
bool IsSchedulable(const ResourceRequest &resource_request, int64_t node_id,
const NodeResources &resources) const;
/// Return resources associated to the given node_id in ret_resources.
/// If node_id not found, return false; otherwise return true.
bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const;
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
absl::flat_hash_map<int64_t, Node> nodes_;
/// Identifier of local node.
int64_t local_node_id_;
/// The scheduling policy to use.
std::unique_ptr<raylet_scheduling_policy::SchedulingPolicy> scheduling_policy_;
/// Internally maintained random number generator.
std::mt19937_64 gen_;
/// Resources of local node.
std::unique_ptr<LocalResourceManager> local_resource_manager_;
/// Keep the mapping between node and resource IDs in string representation
/// to integer representation. Used for improving map performance.
StringIdMap string_to_int_map_;
/// Identifier of local node.
int64_t local_node_id_;
/// Internally maintained random number generator.
std::mt19937_64 gen_;
/// Gcs client. It's not owned by this class.
gcs::GcsClient *gcs_client_;
/// Resources of local node.
std::unique_ptr<LocalResourceManager> local_resource_manager_;
/// Resources of the entire cluster.
std::unique_ptr<ClusterResourceManager> cluster_resource_manager_;
/// The scheduling policy to use.
std::unique_ptr<raylet_scheduling_policy::SchedulingPolicy> scheduling_policy_;
friend class ClusterResourceSchedulerTest;
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest);
FRIEND_TEST(ClusterResourceSchedulerTest,
UpdateLocalAvailableResourcesFromResourceInstancesTest);
FRIEND_TEST(ClusterResourceSchedulerTest, ResourceUsageReportTest);
FRIEND_TEST(ClusterResourceSchedulerTest, DeadNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask);
FRIEND_TEST(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest);
FRIEND_TEST(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest);
FRIEND_TEST(ClusterResourceSchedulerTest, DirtyLocalViewTest);
FRIEND_TEST(ClusterResourceSchedulerTest, DynamicResourceTest);
FRIEND_TEST(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources);
FRIEND_TEST(ClusterResourceSchedulerTest, TestForceSpillback);
};
} // end namespace ray

View file

@ -1,56 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/common/task/scheduling_resources.h"
#include "src/ray/protobuf/gcs.pb.h"
#include "src/ray/protobuf/gcs_service.pb.h"
namespace ray {
class ClusterResourceSchedulerInterface {
public:
virtual ~ClusterResourceSchedulerInterface() = default;
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id_string ID of the node to be removed.
virtual bool RemoveNode(const std::string &node_id_string) = 0;
/// Update node resources. This hanppens when a node resource usage udpated.
///
/// \param node_id_string ID of the node which resoruces need to be udpated.
/// \param resource_data The node resource data.
virtual bool UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) = 0;
/// \param node_name: Node whose resource we want to update.
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
virtual void UpdateResourceCapacity(const std::string &node_id_string,
const std::string &resource_name,
double resource_total) = 0;
/// Delete a given resource from a given node.
///
/// \param node_name: Node whose resource we want to delete.
/// \param resource_name: Resource we want to delete
virtual void DeleteResource(const std::string &node_id_string,
const std::string &resource_name) = 0;
/// Return local resources in human-readable string form.
virtual std::string GetLocalResourceViewString() const = 0;
};
} // namespace ray

View file

@ -150,7 +150,8 @@ class ClusterResourceSchedulerTest : public ::testing::Test {
// `scheduling_policy_test.cc` for comprehensive testing of the hybrid scheduling
// policy.
gcs_client_ = std::make_unique<gcs::MockGcsClient>();
node_info.set_node_id(NodeID::FromRandom().Binary());
node_name = NodeID::FromRandom().Binary();
node_info.set_node_id(node_name);
ON_CALL(*gcs_client_->mock_node_accessor, Get(::testing::_, ::testing::_))
.WillByDefault(::testing::Return(&node_info));
}
@ -184,12 +185,13 @@ class ClusterResourceSchedulerTest : public ::testing::Test {
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.AddOrUpdateNode(i, node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(i, node_resources);
node_resources.custom_resources.clear();
}
}
std::unique_ptr<gcs::MockGcsClient> gcs_client_;
std::string node_name;
rpc::GcsNodeInfo node_info;
};
@ -268,7 +270,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingInitClusterTest) {
initCluster(resource_scheduler, num_nodes);
ASSERT_EQ(resource_scheduler.NumNodes(), num_nodes);
ASSERT_EQ(resource_scheduler.GetClusterResourceManager().NumNodes(), num_nodes);
}
TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) {
@ -278,9 +280,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) {
ClusterResourceScheduler resource_scheduler;
initCluster(resource_scheduler, num_nodes);
resource_scheduler.RemoveNode(remove_id);
resource_scheduler.GetClusterResourceManager().RemoveNode(remove_id);
ASSERT_TRUE(num_nodes - 1 == resource_scheduler.NumNodes());
ASSERT_TRUE(num_nodes - 1 == resource_scheduler.GetClusterResourceManager().NumNodes());
}
TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) {
@ -312,9 +314,10 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) {
cust_capacities.push_back(rand() % 10);
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.AddOrUpdateNode(update_id, node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(update_id,
node_resources);
}
ASSERT_TRUE(num_nodes == resource_scheduler.NumNodes());
ASSERT_TRUE(num_nodes == resource_scheduler.GetClusterResourceManager().NumNodes());
}
TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) {
@ -323,7 +326,8 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) {
ClusterResourceScheduler resource_scheduler(local_node_id, resource_total,
*gcs_client_);
auto remote_node_id = NodeID::FromRandom().Binary();
resource_scheduler.AddOrUpdateNode(remote_node_id, resource_total, resource_total);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
remote_node_id, resource_total, resource_total);
absl::flat_hash_map<std::string, double> resource_request({{"CPU", 1}});
int64_t violations;
@ -335,7 +339,8 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) {
&is_infeasible);
ASSERT_EQ(node_id, local_node_id);
absl::flat_hash_map<std::string, double> resource_available({{"CPU", 9}});
resource_scheduler.AddOrUpdateNode(local_node_id, resource_total, resource_available);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
local_node_id, resource_total, resource_available);
node_id = resource_scheduler.GetBestSchedulableNode(resource_request,
scheduling_strategy, false, false,
false, &violations, &is_infeasible);
@ -368,11 +373,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
ASSERT_TRUE(violations == 0);
NodeResources nr1, nr2;
ASSERT_TRUE(resource_scheduler.GetNodeResources(node_id, &nr1));
ASSERT_TRUE(
resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr1));
auto task_allocation = std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources(
resource_request, task_allocation));
ASSERT_TRUE(resource_scheduler.GetNodeResources(node_id, &nr2));
ASSERT_TRUE(
resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr2));
for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) {
auto t = nr1.predefined_resources[i].available -
@ -398,20 +405,23 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest) {
absl::flat_hash_map<std::string, double> initial_resources = {
{ray::kCPU_ResourceLabel, 1}, {"custom", 1}};
ClusterResourceScheduler resource_scheduler(
NodeID::FromRandom().Binary(), initial_resources, *gcs_client_, nullptr, nullptr);
std::string name = NodeID::FromRandom().Binary();
ClusterResourceScheduler resource_scheduler(name, initial_resources, *gcs_client_,
nullptr, nullptr);
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances(
ray::kCPU_ResourceLabel, {0, 1, 1});
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances("custom",
{0, 1, 1});
const auto &predefined_resources =
resource_scheduler.GetLocalNodeResources().predefined_resources;
const auto &predefined_resources = resource_scheduler.GetClusterResourceManager()
.GetNodeResources(name)
.predefined_resources;
ASSERT_EQ(predefined_resources[CPU].total.Double(), 3);
const auto &custom_resources =
resource_scheduler.GetLocalNodeResources().custom_resources;
const auto &custom_resources = resource_scheduler.GetClusterResourceManager()
.GetNodeResources(name)
.custom_resources;
auto resource_id = resource_scheduler.string_to_int_map_.Get("custom");
ASSERT_EQ(custom_resources.find(resource_id)->second.total.Double(), 3);
}
@ -428,12 +438,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) {
vector<int64_t> cust_ids{1, 2};
vector<FixedPoint> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.AddOrUpdateNode(node_id, node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id,
node_resources);
nr = node_resources;
}
// Check whether node resources were correctly added.
if (resource_scheduler.GetNodeResources(node_id, &nr_out)) {
if (resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr_out)) {
ASSERT_TRUE(nodeResourcesEqual(nr, nr_out));
} else {
ASSERT_TRUE(false);
@ -446,10 +457,11 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) {
vector<int64_t> cust_ids{2, 3};
vector<FixedPoint> cust_capacities{6, 6};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.AddOrUpdateNode(node_id, node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id,
node_resources);
nr = node_resources;
}
if (resource_scheduler.GetNodeResources(node_id, &nr_out)) {
if (resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr_out)) {
ASSERT_TRUE(nodeResourcesEqual(nr, nr_out));
} else {
ASSERT_TRUE(false);
@ -474,7 +486,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
vector<int64_t> cust_ids{1, 2};
vector<FixedPoint> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.AddOrUpdateNode(node_internal_id, node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_internal_id,
node_resources);
}
// Predefined resources, hard constraint violation
{
@ -855,7 +868,8 @@ TEST_F(ClusterResourceSchedulerTest, DeadNodeTest) {
absl::flat_hash_map<std::string, double> resource;
resource["CPU"] = 10000.0;
auto node_id = NodeID::FromRandom();
resource_scheduler.AddOrUpdateNode(node_id.Binary(), resource, resource);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id.Binary(),
resource, resource);
int64_t violations = 0;
bool is_infeasible = false;
rpc::SchedulingStrategy scheduling_strategy;
@ -960,7 +974,7 @@ TEST_F(ClusterResourceSchedulerTest,
expected_available_gpu_instances.begin()));
NodeResources nr;
resource_scheduler.GetNodeResources(0, &nr);
resource_scheduler.GetClusterResourceManager().GetNodeResources(0, &nr);
ASSERT_TRUE(nr.predefined_resources[GPU].available == 1.5);
}
@ -981,7 +995,7 @@ TEST_F(ClusterResourceSchedulerTest,
expected_available_gpu_instances.begin()));
NodeResources nr;
resource_scheduler.GetNodeResources(0, &nr);
resource_scheduler.GetClusterResourceManager().GetNodeResources(0, &nr);
ASSERT_TRUE(nr.predefined_resources[GPU].available == 3.8);
}
}
@ -1043,7 +1057,8 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) {
absl::flat_hash_map<std::string, double> resource_spec({{"CPU", 1}});
ClusterResourceScheduler resource_scheduler("local", {}, *gcs_client_);
for (int i = 0; i < 100; i++) {
resource_scheduler.AddOrUpdateNode(NodeID::FromRandom().Binary(), {}, {});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
NodeID::FromRandom().Binary(), {}, {});
}
// No feasible nodes.
@ -1059,7 +1074,8 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) {
// Feasible remote node, but doesn't currently have resources available. We
// should spill there.
auto remote_feasible = NodeID::FromRandom().Binary();
resource_scheduler.AddOrUpdateNode(remote_feasible, resource_spec, {{"CPU", 0.}});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
remote_feasible, resource_spec, {{"CPU", 0.}});
ASSERT_EQ(remote_feasible, resource_scheduler.GetBestSchedulableNode(
resource_spec, scheduling_strategy, false, false, false,
&total_violations, &is_infeasible));
@ -1067,7 +1083,8 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) {
// Feasible remote node, and it currently has resources available. We should
// prefer to spill there.
auto remote_available = NodeID::FromRandom().Binary();
resource_scheduler.AddOrUpdateNode(remote_available, resource_spec, resource_spec);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
remote_available, resource_spec, resource_spec);
ASSERT_EQ(remote_available, resource_scheduler.GetBestSchedulableNode(
resource_spec, scheduling_strategy, false, false, false,
&total_violations, &is_infeasible));
@ -1086,7 +1103,8 @@ TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) {
vector<FixedPoint> other_cust_capacities{5., 4., 3., 2., 1.};
initNodeResources(other_node_resources, other_pred_capacities, cust_ids,
other_cust_capacities);
resource_scheduler.AddOrUpdateNode(12345, other_node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(12345,
other_node_resources);
{ // Cluster is idle.
rpc::ResourcesData data;
@ -1169,7 +1187,8 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) {
vector<FixedPoint> other_cust_capacities{10.};
initNodeResources(other_node_resources, other_pred_capacities, cust_ids,
other_cust_capacities);
resource_scheduler.AddOrUpdateNode(12345, other_node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(12345,
other_node_resources);
{
rpc::ResourcesData data;
@ -1212,7 +1231,8 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
absl::flat_hash_map<std::string, double> initial_resources({{"CPU", 1}});
ClusterResourceScheduler resource_scheduler("local", initial_resources, *gcs_client_);
auto remote = NodeID::FromRandom().Binary();
resource_scheduler.AddOrUpdateNode(remote, {{"CPU", 2.}}, {{"CPU", 2.}});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(remote, {{"CPU", 2.}},
{{"CPU", 2.}});
const absl::flat_hash_map<std::string, double> task_spec = {{"CPU", 1.}};
// Allocate local resources to force tasks onto the remote node when
@ -1238,8 +1258,8 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
scheduling_strategy.mutable_default_scheduling_strategy();
for (int i = 0; i < 3; i++) {
// Remote node reports update local view.
resource_scheduler.AddOrUpdateNode(remote, {{"CPU", 2.}},
{{"CPU", num_slots_available}});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
remote, {{"CPU", 2.}}, {{"CPU", num_slots_available}});
for (int j = 0; j < num_slots_available; j++) {
ASSERT_EQ(remote, resource_scheduler.GetBestSchedulableNode(
task_spec, scheduling_strategy, false, false, true, &t,
@ -1317,7 +1337,8 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) {
std::vector<string> node_ids;
for (int i = 0; i < 100; i++) {
node_ids.push_back(NodeID::FromRandom().Binary());
resource_scheduler.AddOrUpdateNode(node_ids.back(), {}, {});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_ids.back(), {},
{});
}
// No feasible nodes.
@ -1337,12 +1358,14 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) {
/*force_spillback=*/true, &total_violations, &is_infeasible),
"");
// Choose a remote node that has the resources available.
resource_scheduler.AddOrUpdateNode(node_ids[50], resource_spec, {});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_ids[50],
resource_spec, {});
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(
resource_spec, scheduling_strategy, false, false,
/*force_spillback=*/true, &total_violations, &is_infeasible),
"");
resource_scheduler.AddOrUpdateNode(node_ids[51], resource_spec, resource_spec);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
node_ids[51], resource_spec, resource_spec);
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(
resource_spec, scheduling_strategy, false, false,
/*force_spillback=*/true, &total_violations, &is_infeasible),

View file

@ -1255,8 +1255,8 @@ void ClusterTaskManager::Dispatch(
if (predefined_resources[res_idx][inst_idx] > 0.) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(
cluster_resource_scheduler_->GetResourceNameFromIndex(res_idx));
resource->set_name(cluster_resource_scheduler_->GetClusterResourceManager()
.GetResourceNameFromIndex(res_idx));
first = false;
}
auto rid = resource->add_resource_ids();
@ -1273,8 +1273,8 @@ void ClusterTaskManager::Dispatch(
if (it->second[inst_idx] > 0.) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(
cluster_resource_scheduler_->GetResourceNameFromIndex(it->first));
resource->set_name(cluster_resource_scheduler_->GetClusterResourceManager()
.GetResourceNameFromIndex(it->first));
first = false;
}
auto rid = resource->add_resource_ids();
@ -1500,8 +1500,8 @@ void ClusterTaskManager::SpillWaitingTasks() {
bool ClusterTaskManager::IsLocallySchedulable(const RayTask &task) const {
const auto &spec = task.GetTaskSpecification();
return cluster_resource_scheduler_->IsLocallySchedulable(
spec.GetRequiredResources().GetResourceMap());
return cluster_resource_scheduler_->IsSchedulableOnNode(
self_node_id_.Binary(), spec.GetRequiredResources().GetResourceMap());
}
ResourceSet ClusterTaskManager::CalcNormalTaskResources() const {

View file

@ -275,7 +275,8 @@ class ClusterTaskManagerTest : public ::testing::Test {
node_resources[ray::kCPU_ResourceLabel] = num_cpus;
node_resources[ray::kGPU_ResourceLabel] = num_gpus;
node_resources[ray::kMemory_ResourceLabel] = memory;
scheduler_->AddOrUpdateNode(id.Binary(), node_resources, node_resources);
scheduler_->GetClusterResourceManager().AddOrUpdateNode(id.Binary(), node_resources,
node_resources);
rpc::GcsNodeInfo info;
node_info_[id] = info;
@ -1433,7 +1434,8 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) {
}
TEST_F(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources) {
const NodeResources &node_resources = scheduler_->GetLocalNodeResources();
const NodeResources &node_resources =
scheduler_->GetClusterResourceManager().GetNodeResources(id_.Binary());
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8);
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 4);