diff --git a/BUILD.bazel b/BUILD.bazel index 0d56e82bd..9d987813b 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1066,6 +1066,20 @@ cc_test( ], ) +cc_test( + name = "scheduling_ids_test", + size = "small", + srcs = [ + "src/ray/raylet/scheduling/scheduling_ids_test.cc", + ], + copts = COPTS, + tags = ["team:core"], + deps = [ + ":raylet_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "local_object_manager_test", size = "small", diff --git a/src/ray/common/task/scheduling_resources.h b/src/ray/common/task/scheduling_resources.h index 734adea1b..07fb240a2 100644 --- a/src/ray/common/task/scheduling_resources.h +++ b/src/ray/common/task/scheduling_resources.h @@ -30,12 +30,6 @@ namespace ray { /// divide to convert from internal to actual. constexpr double kResourceConversionFactor = 10000; -const std::string kCPU_ResourceLabel = "CPU"; -const std::string kGPU_ResourceLabel = "GPU"; -const std::string kObjectStoreMemory_ResourceLabel = "object_store_memory"; -const std::string kMemory_ResourceLabel = "memory"; -const std::string kBundle_ResourceLabel = "bundle"; - /// \class ResourceSet /// \brief Encapsulates and operates on a set of resources, including CPUs, /// GPUs, and custom labels. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7ff29d631..989cd3821 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -313,8 +313,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self SchedulingResources local_resources(config.resource_config); cluster_resource_scheduler_ = std::shared_ptr(new ClusterResourceScheduler( - self_node_id_.Binary(), local_resources.GetTotalResources().GetResourceMap(), - *gcs_client_, + scheduling::NodeID(self_node_id_.Binary()), + local_resources.GetTotalResources().GetResourceMap(), *gcs_client_, [this]() { if (RayConfig::instance().scheduler_report_pinned_bytes_only()) { return local_object_manager_.GetPinnedBytes(); @@ -771,7 +771,7 @@ void NodeManager::WarnResourceDeadlock() { << "\n" << "Available resources on this node: " << cluster_resource_scheduler_->GetClusterResourceManager() - .GetNodeResourceViewString(self_node_id_.Binary()) + .GetNodeResourceViewString(scheduling::NodeID(self_node_id_.Binary())) << " In total there are " << pending_tasks << " pending tasks and " << pending_actor_creations << " pending actors on this node."; @@ -847,7 +847,7 @@ void NodeManager::NodeRemoved(const NodeID &node_id) { // Remove the node from the resource map. if (!cluster_resource_scheduler_->GetClusterResourceManager().RemoveNode( - node_id.Binary())) { + scheduling::NodeID(node_id.Binary()))) { RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id << "."; return; @@ -933,7 +933,8 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id, const std::string &resource_label = resource_pair.first; const double &new_resource_capacity = resource_pair.second; cluster_resource_scheduler_->GetClusterResourceManager().UpdateResourceCapacity( - node_id.Binary(), resource_label, new_resource_capacity); + scheduling::NodeID(node_id.Binary()), scheduling::ResourceID(resource_label), + new_resource_capacity); } RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map."; cluster_task_manager_->ScheduleAndDispatchTasks(); @@ -961,7 +962,7 @@ void NodeManager::ResourceDeleted(const NodeID &node_id, // Update local_available_resources_ and SchedulingResources for (const auto &resource_label : resource_names) { cluster_resource_scheduler_->GetClusterResourceManager().DeleteResource( - node_id.Binary(), resource_label); + scheduling::NodeID(node_id.Binary()), scheduling::ResourceID(resource_label)); } return; } @@ -969,7 +970,7 @@ void NodeManager::ResourceDeleted(const NodeID &node_id, void NodeManager::UpdateResourceUsage(const NodeID &node_id, const rpc::ResourcesData &resource_data) { if (!cluster_resource_scheduler_->GetClusterResourceManager().UpdateNode( - node_id.Binary(), resource_data)) { + scheduling::NodeID(node_id.Binary()), resource_data)) { RAY_LOG(INFO) << "[UpdateResourceUsage]: received resource usage from unknown node id " << node_id; @@ -1605,7 +1606,8 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest << ", normal_task_resources = " << normal_task_resources.ToString() << ", local_resoruce_view = " << cluster_resource_scheduler_->GetClusterResourceManager() - .GetNodeResourceViewString(self_node_id_.Binary()); + .GetNodeResourceViewString( + scheduling::NodeID(self_node_id_.Binary())); auto resources_data = reply->mutable_resources_data(); resources_data->set_node_id(self_node_id_.Binary()); resources_data->set_resources_normal_task_changed(true); diff --git a/src/ray/raylet/placement_group_resource_manager.cc b/src/ray/raylet/placement_group_resource_manager.cc index 83a52a316..3564ad440 100644 --- a/src/ray/raylet/placement_group_resource_manager.cc +++ b/src/ray/raylet/placement_group_resource_manager.cc @@ -128,7 +128,6 @@ void NewPlacementGroupResourceManager::CommitBundle( const auto &bundle_state = it->second; bundle_state->state_ = CommitState::COMMITTED; - const auto &string_id_map = cluster_resource_scheduler_->GetStringIdMap(); const auto &task_resource_instances = *bundle_state->resources_; const auto &resources = bundle_spec.GetFormattedResources(); @@ -136,13 +135,12 @@ void NewPlacementGroupResourceManager::CommitBundle( const auto &resource_name = resource.first; const auto &original_resource_name = GetOriginalResourceName(resource_name); if (original_resource_name != kBundle_ResourceLabel) { - const auto &instances = - task_resource_instances.Get(original_resource_name, string_id_map); + const auto &instances = task_resource_instances.Get(original_resource_name); cluster_resource_scheduler_->GetLocalResourceManager().AddLocalResourceInstances( - resource_name, instances); + scheduling::ResourceID{resource_name}, instances); } else { cluster_resource_scheduler_->GetLocalResourceManager().AddLocalResourceInstances( - resource_name, {resource.second}); + scheduling::ResourceID{resource_name}, {resource.second}); } } update_resources_( @@ -185,14 +183,15 @@ void NewPlacementGroupResourceManager::ReturnBundle( std::vector deleted; for (const auto &resource : placement_group_resources) { + auto resource_id = scheduling::ResourceID{resource.first}; if (cluster_resource_scheduler_->GetLocalResourceManager().IsAvailableResourceEmpty( - resource.first)) { + resource_id)) { RAY_LOG(DEBUG) << "Available bundle resource:[" << resource.first << "] is empty, Will delete it from local resource"; // Delete local resource if available resource is empty when return bundle, or there // will be resource leak. cluster_resource_scheduler_->GetLocalResourceManager().DeleteLocalResource( - resource.first); + resource_id); deleted.push_back(resource.first); } else { RAY_LOG(DEBUG) << "Available bundle resource:[" << resource.first diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc index b751119d9..50e63307f 100644 --- a/src/ray/raylet/placement_group_resource_manager_test.cc +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -41,8 +41,8 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { } void InitLocalAvailableResource( absl::flat_hash_map &unit_resource) { - cluster_resource_scheduler_ = - std::make_shared("local", unit_resource, *gcs_client_); + cluster_resource_scheduler_ = std::make_shared( + scheduling::NodeID("local"), unit_resource, *gcs_client_); new_placement_group_resource_manager_.reset( new raylet::NewPlacementGroupResourceManager( cluster_resource_scheduler_, @@ -57,13 +57,13 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { void CheckAvailableResoueceEmpty(const std::string &resource) { ASSERT_TRUE( cluster_resource_scheduler_->GetLocalResourceManager().IsAvailableResourceEmpty( - resource)); + scheduling::ResourceID(resource))); } void CheckRemainingResourceCorrect(NodeResources &node_resources) { auto local_node_resource = cluster_resource_scheduler_->GetClusterResourceManager().GetNodeResources( - "local"); + scheduling::NodeID("local")); ASSERT_TRUE(local_node_resource == node_resources); } @@ -130,7 +130,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 1000}}; auto remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); std::shared_ptr resource_instances = std::make_shared(); ASSERT_TRUE( @@ -138,7 +138,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { unit_resource, resource_instances)); auto remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -163,10 +163,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { ASSERT_TRUE(delete_called_); /// 5. check remaining resources is correct. auto remaining_resource_scheduler = std::make_shared( - "remaining", unit_resource, *gcs_client_); + scheduling::NodeID("remaining"), unit_resource, *gcs_client_); auto remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -204,7 +204,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu {"bundle_group_2_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 2000}}; auto remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); std::shared_ptr resource_instances = std::make_shared(); ASSERT_TRUE( @@ -212,7 +212,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu init_unit_resource, resource_instances)); auto remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); CheckRemainingResourceCorrect(remaining_resource_instance); /// 5. return second bundle. @@ -228,7 +228,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 2000}}; remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); ASSERT_TRUE( remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( {{"CPU_group_" + group_id.Hex(), 1.0}, @@ -237,17 +237,17 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu resource_instances)); remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); CheckRemainingResourceCorrect(remaining_resource_instance); /// 7. return first bundle. new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec); /// 8. check remaining resources is correct after all bundle returned. remaining_resources = {{"CPU", 2.0}}; remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); ASSERT_TRUE(update_called_); ASSERT_TRUE(delete_called_); CheckRemainingResourceCorrect(remaining_resource_instance); @@ -270,7 +270,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) /// 4. check remaining resources is correct. absl::flat_hash_map remaining_resources = {{"CPU", 3.0}}; auto remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); std::shared_ptr resource_instances = std::make_shared(); ASSERT_TRUE( @@ -278,7 +278,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) unit_resource, resource_instances)); auto remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -307,7 +307,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 1000}}; auto remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); std::shared_ptr resource_instances = std::make_shared(); ASSERT_TRUE( @@ -315,7 +315,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) unit_resource, resource_instances)); auto remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); CheckRemainingResourceCorrect(remaining_resource_instance); new_placement_group_resource_manager_->ReturnBundle(bundle_spec); // 5. prepare bundle -> commit bundle -> commit bundle. @@ -336,10 +336,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) ConvertSingleSpecToVectorPtrs(bundle_spec)); // 8. check remaining resources is correct. remaining_resource_scheduler = std::make_shared( - "remaining", available_resource, *gcs_client_); + scheduling::NodeID("remaining"), available_resource, *gcs_client_); remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -360,10 +360,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { // 4. check remaining resources is correct. absl::flat_hash_map remaining_resources = {{"CPU", 3.0}}; auto remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); auto remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); CheckRemainingResourceCorrect(remaining_resource_instance); // 5. re-init the local available resource with 4 CPUs. available_resource = {std::make_pair("CPU", 4.0)}; @@ -386,7 +386,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { {"bundle_group_4_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 4000}}; remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); std::shared_ptr resource_instances = std::make_shared(); absl::flat_hash_map allocating_resource; @@ -396,7 +396,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { allocating_resource, resource_instances)); remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); RAY_LOG(INFO) << "The current local resource view: " << cluster_resource_scheduler_->DebugString(); CheckRemainingResourceCorrect(remaining_resource_instance); @@ -432,7 +432,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { {"bundle_group_4_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 4000}}; auto remaining_resource_scheduler = std::make_shared( - "remaining", remaining_resources, *gcs_client_); + scheduling::NodeID("remaining"), remaining_resources, *gcs_client_); std::shared_ptr resource_instances = std::make_shared(); absl::flat_hash_map allocating_resource; @@ -442,7 +442,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { allocating_resource, resource_instances)); auto remaining_resource_instance = remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - "remaining"); + scheduling::NodeID("remaining")); RAY_LOG(INFO) << "The current local resource view: " << cluster_resource_scheduler_->DebugString(); CheckRemainingResourceCorrect(remaining_resource_instance); diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc index fdb518138..aca6ca315 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.cc +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -18,6 +18,7 @@ #include "ray/common/task/scheduling_resources.h" namespace ray { +using namespace ::ray::scheduling; const std::string resource_labels[] = { ray::kCPU_ResourceLabel, ray::kMemory_ResourceLabel, ray::kGPU_ResourceLabel, @@ -89,7 +90,6 @@ std::vector VectorFixedPointToVectorDouble( /// Convert a map of resources to a ResourceRequest data structure. ResourceRequest ResourceMapToResourceRequest( - StringIdMap &string_to_int_map, const absl::flat_hash_map &resource_map, bool requires_object_store_memory) { ResourceRequest resource_request; @@ -107,8 +107,8 @@ ResourceRequest ResourceMapToResourceRequest( } else if (resource.first == ray::kMemory_ResourceLabel) { resource_request.predefined_resources[MEM] = resource.second; } else { - int64_t id = string_to_int_map.Insert(resource.first); - resource_request.custom_resources[id] = resource.second; + resource_request.custom_resources[ResourceID(resource.first).ToInt()] = + resource.second; } } @@ -116,7 +116,7 @@ ResourceRequest ResourceMapToResourceRequest( } const std::vector &TaskResourceInstances::Get( - const std::string &resource_name, const StringIdMap &string_id_map) const { + const std::string &resource_name) const { if (ray::kCPU_ResourceLabel == resource_name) { return predefined_resources[CPU]; } else if (ray::kGPU_ResourceLabel == resource_name) { @@ -126,8 +126,7 @@ const std::vector &TaskResourceInstances::Get( } else if (ray::kMemory_ResourceLabel == resource_name) { return predefined_resources[MEM]; } else { - int64_t resource_id = string_id_map.Get(resource_name); - auto it = custom_resources.find(resource_id); + auto it = custom_resources.find(ResourceID(resource_name).ToInt()); RAY_CHECK(it != custom_resources.end()); return it->second; } @@ -162,7 +161,6 @@ ResourceRequest TaskResourceInstances::ToResourceRequest() const { /// /// \request Conversion result to a ResourceRequest data structure. NodeResources ResourceMapToNodeResources( - StringIdMap &string_to_int_map, const absl::flat_hash_map &resource_map_total, const absl::flat_hash_map &resource_map_available) { NodeResources node_resources; @@ -191,7 +189,7 @@ NodeResources ResourceMapToNodeResources( node_resources.predefined_resources[MEM] = resource_capacity; } else { // This is a custom resource. - node_resources.custom_resources.emplace(string_to_int_map.Insert(resource.first), + node_resources.custom_resources.emplace(ResourceID(resource.first).ToInt(), resource_capacity); } } @@ -325,7 +323,7 @@ bool NodeResources::operator==(const NodeResources &other) { bool NodeResources::operator!=(const NodeResources &other) { return !(*this == other); } -std::string NodeResources::DebugString(StringIdMap string_to_in_map) const { +std::string NodeResources::DebugString() const { std::stringstream buffer; buffer << " {\n"; for (size_t i = 0; i < this->predefined_resources.size(); i++) { @@ -352,14 +350,14 @@ std::string NodeResources::DebugString(StringIdMap string_to_in_map) const { } for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - buffer << "\t" << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":" + buffer << "\t" << ResourceID(it->first).Binary() << ":(" << it->second.total << ":" << it->second.available << ")\n"; } buffer << "}" << std::endl; return buffer.str(); } -std::string NodeResources::DictString(StringIdMap string_to_in_map) const { +std::string NodeResources::DictString() const { std::stringstream buffer; bool first = true; buffer << "{"; @@ -397,7 +395,7 @@ std::string NodeResources::DictString(StringIdMap string_to_in_map) const { } for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - auto name = string_to_in_map.Get(it->first); + auto name = ResourceID(it->first).Binary(); buffer << ", " << format_resource(name, it->second.available.Double()) << "/" << format_resource(name, it->second.total.Double()); buffer << " " << name; @@ -445,7 +443,7 @@ void TaskResourceInstances::ClearCPUInstances() { } } -std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) const { +std::string NodeResourceInstances::DebugString() const { std::stringstream buffer; buffer << "{\n"; for (size_t i = 0; i < this->predefined_resources.size(); i++) { @@ -472,7 +470,7 @@ std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) co } for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - buffer << "\t" << string_to_int_map.Get(it->first) << ":(" + buffer << "\t" << ResourceID(it->first).Binary() << ":(" << VectorToString(it->second.total) << ":" << VectorToString(it->second.available) << ")\n"; } @@ -545,7 +543,7 @@ bool TaskResourceInstances::IsEmpty() const { return true; } -std::string TaskResourceInstances::DebugString(const StringIdMap &string_id_map) const { +std::string TaskResourceInstances::DebugString() const { std::stringstream buffer; buffer << std::endl << " Allocation: {"; for (size_t i = 0; i < this->predefined_resources.size(); i++) { @@ -556,7 +554,7 @@ std::string TaskResourceInstances::DebugString(const StringIdMap &string_id_map) buffer << " ["; for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - buffer << string_id_map.Get(it->first) << ":" << VectorToString(it->second) << ", "; + buffer << ResourceID(it->first).Binary() << ":" << VectorToString(it->second) << ", "; } buffer << "]" << std::endl; diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index cb19bd5d2..a527cc062 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -27,9 +27,6 @@ namespace ray { -/// List of predefined resources. -enum PredefinedResources { CPU, MEM, GPU, OBJECT_STORE_MEM, PredefinedResources_MAX }; - const std::string ResourceEnumToString(PredefinedResources resource); const PredefinedResources ResourceStringToEnum(const std::string &resource); @@ -84,8 +81,7 @@ class TaskResourceInstances { absl::flat_hash_map> custom_resources; bool operator==(const TaskResourceInstances &other); /// Get instances based on the string. - const std::vector &Get(const std::string &resource_name, - const StringIdMap &string_id_map) const; + const std::vector &Get(const std::string &resource_name) const; /// For each resource of this request aggregate its instances. ResourceRequest ToResourceRequest() const; /// Get CPU instances only. @@ -138,7 +134,7 @@ class TaskResourceInstances { /// Check whether there are no resource instances. bool IsEmpty() const; /// Returns human-readable string for these resources. - [[nodiscard]] std::string DebugString(const StringIdMap &string_id_map) const; + [[nodiscard]] std::string DebugString() const; }; /// Total and available capacities of each resource of a node. @@ -170,9 +166,9 @@ class NodeResources { bool operator==(const NodeResources &other); bool operator!=(const NodeResources &other); /// Returns human-readable string for these resources. - std::string DebugString(StringIdMap string_to_int_map) const; + std::string DebugString() const; /// Returns compact dict-like string. - std::string DictString(StringIdMap string_to_int_map) const; + std::string DictString() const; }; /// Total and available capacities of each resource instance. @@ -189,7 +185,7 @@ class NodeResourceInstances { /// Returns if this equals another node resources. bool operator==(const NodeResourceInstances &other); /// Returns human-readable string for these resources. - [[nodiscard]] std::string DebugString(StringIdMap string_to_int_map) const; + [[nodiscard]] std::string DebugString() const; }; struct Node { @@ -211,13 +207,11 @@ struct Node { /// \request Conversion result to a ResourceRequest data structure. NodeResources ResourceMapToNodeResources( - StringIdMap &string_to_int_map, const absl::flat_hash_map &resource_map_total, const absl::flat_hash_map &resource_map_available); /// Convert a map of resources to a ResourceRequest data structure. ResourceRequest ResourceMapToResourceRequest( - StringIdMap &string_to_int_map, const absl::flat_hash_map &resource_map, bool requires_object_store_memory); diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index 6b6934487..8475de504 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -22,22 +22,21 @@ namespace ray { -ClusterResourceManager::ClusterResourceManager(StringIdMap &string_to_int_map) - : nodes_{}, string_to_int_map_{string_to_int_map} {} +ClusterResourceManager::ClusterResourceManager() : nodes_{} {} void ClusterResourceManager::AddOrUpdateNode( - const std::string &node_id, + scheduling::NodeID node_id, const absl::flat_hash_map &resources_total, const absl::flat_hash_map &resources_available) { - NodeResources node_resources = ResourceMapToNodeResources( - string_to_int_map_, resources_total, resources_available); - AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources); + NodeResources node_resources = + ResourceMapToNodeResources(resources_total, resources_available); + AddOrUpdateNode(node_id, node_resources); } -void ClusterResourceManager::AddOrUpdateNode(int64_t node_id, +void ClusterResourceManager::AddOrUpdateNode(scheduling::NodeID node_id, const NodeResources &node_resources) { - RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id << ", node_resources: " - << node_resources.DebugString(string_to_int_map_); + RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id.ToInt() + << ", node_resources: " << node_resources.DebugString(); auto it = nodes_.find(node_id); if (it == nodes_.end()) { // This node is new, so add it to the map. @@ -48,17 +47,16 @@ void ClusterResourceManager::AddOrUpdateNode(int64_t node_id, } } -bool ClusterResourceManager::UpdateNode(const std::string &node_id_string, +bool ClusterResourceManager::UpdateNode(scheduling::NodeID node_id, const rpc::ResourcesData &resource_data) { - auto node_id = string_to_int_map_.Insert(node_id_string); if (!nodes_.contains(node_id)) { return false; } auto resources_total = MapFromProtobuf(resource_data.resources_total()); auto resources_available = MapFromProtobuf(resource_data.resources_available()); - NodeResources node_resources = ResourceMapToNodeResources( - string_to_int_map_, resources_total, resources_available); + NodeResources node_resources = + ResourceMapToNodeResources(resources_total, resources_available); NodeResources local_view; RAY_CHECK(GetNodeResources(node_id, &local_view)); @@ -88,7 +86,7 @@ bool ClusterResourceManager::UpdateNode(const std::string &node_id_string, return true; } -bool ClusterResourceManager::RemoveNode(int64_t node_id) { +bool ClusterResourceManager::RemoveNode(scheduling::NodeID node_id) { auto it = nodes_.find(node_id); if (it == nodes_.end()) { // Node not found. @@ -99,16 +97,7 @@ bool ClusterResourceManager::RemoveNode(int64_t node_id) { } } -bool ClusterResourceManager::RemoveNode(const std::string &node_id_string) { - auto node_id = string_to_int_map_.Get(node_id_string); - if (node_id == -1) { - return false; - } - - return RemoveNode(node_id); -} - -bool ClusterResourceManager::GetNodeResources(int64_t node_id, +bool ClusterResourceManager::GetNodeResources(scheduling::NodeID node_id, NodeResources *ret_resources) const { auto it = nodes_.find(node_id); if (it != nodes_.end()) { @@ -120,28 +109,24 @@ bool ClusterResourceManager::GetNodeResources(int64_t node_id, } const NodeResources &ClusterResourceManager::GetNodeResources( - const std::string &node_name) const { - int64_t node_id = string_to_int_map_.Get(node_name); + scheduling::NodeID node_id) const { const auto &node = map_find_or_die(nodes_, node_id); return node.GetLocalView(); } int64_t ClusterResourceManager::NumNodes() const { return nodes_.size(); } -void ClusterResourceManager::UpdateResourceCapacity(const std::string &node_id_string, - const std::string &resource_name, +void ClusterResourceManager::UpdateResourceCapacity(scheduling::NodeID node_id, + scheduling::ResourceID resource_id, double resource_total) { - int64_t node_id = string_to_int_map_.Get(node_id_string); - auto it = nodes_.find(node_id); if (it == nodes_.end()) { NodeResources node_resources; node_resources.predefined_resources.resize(PredefinedResources_MAX); - node_id = string_to_int_map_.Insert(node_id_string); it = nodes_.emplace(node_id, node_resources).first; } - int idx = GetPredefinedResourceIndex(resource_name); + int idx = GetPredefinedResourceIndex(resource_id); auto local_view = it->second.GetMutableLocalView(); FixedPoint resource_total_fp(resource_total); @@ -156,9 +141,7 @@ void ClusterResourceManager::UpdateResourceCapacity(const std::string &node_id_s local_view->predefined_resources[idx].total = 0; } } else { - string_to_int_map_.Insert(resource_name); - int64_t resource_id = string_to_int_map_.Get(resource_name); - auto itr = local_view->custom_resources.find(resource_id); + auto itr = local_view->custom_resources.find(resource_id.ToInt()); if (itr != local_view->custom_resources.end()) { auto diff_capacity = resource_total_fp - itr->second.total; itr->second.total += diff_capacity; @@ -172,29 +155,26 @@ void ClusterResourceManager::UpdateResourceCapacity(const std::string &node_id_s } else { ResourceCapacity resource_capacity; resource_capacity.total = resource_capacity.available = resource_total_fp; - local_view->custom_resources.emplace(resource_id, resource_capacity); + local_view->custom_resources.emplace(resource_id.ToInt(), resource_capacity); } } } -void ClusterResourceManager::DeleteResource(const std::string &node_id_string, - const std::string &resource_name) { - int64_t node_id = string_to_int_map_.Get(node_id_string); - +void ClusterResourceManager::DeleteResource(scheduling::NodeID node_id, + scheduling::ResourceID resource_id) { auto it = nodes_.find(node_id); if (it == nodes_.end()) { return; } - int idx = GetPredefinedResourceIndex(resource_name); + int idx = GetPredefinedResourceIndex(resource_id); auto local_view = it->second.GetMutableLocalView(); if (idx != -1) { local_view->predefined_resources[idx].available = 0; local_view->predefined_resources[idx].total = 0; } else { - int64_t resource_id = string_to_int_map_.Get(resource_name); - auto itr = local_view->custom_resources.find(resource_id); + auto itr = local_view->custom_resources.find(resource_id.ToInt()); if (itr != local_view->custom_resources.end()) { local_view->custom_resources.erase(itr); } @@ -202,33 +182,22 @@ void ClusterResourceManager::DeleteResource(const std::string &node_id_string, } std::string ClusterResourceManager::GetNodeResourceViewString( - const std::string &node_name) const { - int64_t node_id = string_to_int_map_.Get(node_name); + scheduling::NodeID node_id) const { const auto &node = map_find_or_die(nodes_, node_id); - return node.GetLocalView().DictString(string_to_int_map_); + return node.GetLocalView().DictString(); } std::string ClusterResourceManager::GetResourceNameFromIndex(int64_t res_idx) { - if (res_idx == CPU) { - return ray::kCPU_ResourceLabel; - } else if (res_idx == GPU) { - return ray::kGPU_ResourceLabel; - } else if (res_idx == OBJECT_STORE_MEM) { - return ray::kObjectStoreMemory_ResourceLabel; - } else if (res_idx == MEM) { - return ray::kMemory_ResourceLabel; - } else { - return string_to_int_map_.Get((uint64_t)res_idx); - } + return scheduling::ResourceID(res_idx).Binary(); } -const absl::flat_hash_map &ClusterResourceManager::GetResourceView() - const { +const absl::flat_hash_map + &ClusterResourceManager::GetResourceView() const { return nodes_; } bool ClusterResourceManager::SubtractNodeAvailableResources( - int64_t node_id, const ResourceRequest &resource_request) { + scheduling::NodeID node_id, const ResourceRequest &resource_request) { auto it = nodes_.find(node_id); if (it == nodes_.end()) { return false; @@ -260,7 +229,7 @@ bool ClusterResourceManager::SubtractNodeAvailableResources( } bool ClusterResourceManager::HasSufficientResource( - int64_t node_id, const ResourceRequest &resource_request, + scheduling::NodeID node_id, const ResourceRequest &resource_request, bool ignore_object_store_memory_requirement) const { auto it = nodes_.find(node_id); if (it == nodes_.end()) { @@ -305,8 +274,8 @@ bool ClusterResourceManager::HasSufficientResource( void ClusterResourceManager::DebugString(std::stringstream &buffer) const { for (auto &node : GetResourceView()) { - buffer << "node id: " << node.first; - buffer << node.second.GetLocalView().DebugString(string_to_int_map_); + buffer << "node id: " << node.first.ToInt(); + buffer << node.second.GetLocalView().DebugString(); } } diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.h b/src/ray/raylet/scheduling/cluster_resource_manager.h index 9010f5a5f..ca5368d06 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.h +++ b/src/ray/raylet/scheduling/cluster_resource_manager.h @@ -37,10 +37,10 @@ class ClusterTaskManagerTest; /// This class is not thread safe. class ClusterResourceManager { public: - explicit ClusterResourceManager(StringIdMap &string_to_int_map); + explicit ClusterResourceManager(); /// Get the resource view of the cluster. - const absl::flat_hash_map &GetResourceView() const; + const absl::flat_hash_map &GetResourceView() const; // Mapping from predefined resource indexes to resource strings std::string GetResourceNameFromIndex(int64_t res_idx); @@ -49,14 +49,13 @@ class ClusterResourceManager { /// /// \param node_id_string ID of the node which resoruces need to be udpated. /// \param resource_data The node resource data. - bool UpdateNode(const std::string &node_id_string, - const rpc::ResourcesData &resource_data); + bool UpdateNode(scheduling::NodeID node_id, const rpc::ResourcesData &resource_data); /// Remove node from the cluster data structure. This happens /// when a node fails or it is removed from the cluster. /// /// \param node_id_string ID of the node to be removed. - bool RemoveNode(const std::string &node_id_string); + bool RemoveNode(scheduling::NodeID node_id); /// Get number of nodes in the cluster. int64_t NumNodes() const; @@ -66,24 +65,24 @@ class ClusterResourceManager { /// \param node_name: Node whose resource we want to update. /// \param resource_name: Resource which we want to update. /// \param resource_total: New capacity of the resource. - void UpdateResourceCapacity(const std::string &node_name, - const std::string &resource_name, double resource_total); + void UpdateResourceCapacity(scheduling::NodeID node_id, + scheduling::ResourceID resource_id, double resource_total); /// Delete a given resource from a given node. /// /// \param node_name: Node whose resource we want to delete. /// \param resource_name: Resource we want to delete - void DeleteResource(const std::string &node_name, const std::string &resource_name); + void DeleteResource(scheduling::NodeID node_id, scheduling::ResourceID resource_id); /// Return local resources in human-readable string form. - std::string GetNodeResourceViewString(const std::string &node_name) const; + std::string GetNodeResourceViewString(scheduling::NodeID node_id) const; /// Get local resource. - const NodeResources &GetNodeResources(const std::string &node_name) const; + const NodeResources &GetNodeResources(scheduling::NodeID node_id) const; /// Subtract available resource from a given node. //// Return false if such node doesn't exist. - bool SubtractNodeAvailableResources(int64_t node_id, + bool SubtractNodeAvailableResources(scheduling::NodeID node_id, const ResourceRequest &resource_request); /// Check if we have sufficient resource to fullfill resource request for an given node. @@ -92,7 +91,8 @@ class ClusterResourceManager { /// \param resource_request: the request we want to check. /// \param ignore_object_store_memory_requirement: if true, we will ignore the /// require_object_store_memory in the resource_request. - bool HasSufficientResource(int64_t node_id, const ResourceRequest &resource_request, + bool HasSufficientResource(scheduling::NodeID node_id, + const ResourceRequest &resource_request, bool ignore_object_store_memory_requirement) const; void DebugString(std::stringstream &buffer) const; @@ -104,29 +104,20 @@ class ClusterResourceManager { /// /// \param node_id: Node ID. /// \param node_resources: Up to date total and available resources of the node. - void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources); + void AddOrUpdateNode(scheduling::NodeID node_id, const NodeResources &node_resources); void AddOrUpdateNode( - const std::string &node_id, + scheduling::NodeID node_id, const absl::flat_hash_map &resource_map_total, const absl::flat_hash_map &resource_map_available); - /// Remove node from the cluster data structure. This happens - /// when a node fails or it is removed from the cluster. - /// - /// \param node_id ID of the node to be removed. - bool RemoveNode(int64_t node_id); - /// Return resources associated to the given node_id in ret_resources. /// If node_id not found, return false; otherwise return true. - bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const; + bool GetNodeResources(scheduling::NodeID node_id, NodeResources *ret_resources) const; /// List of nodes in the clusters and their resources organized as a map. /// The key of the map is the node ID. - absl::flat_hash_map nodes_; - /// Keep the mapping between node and resource IDs in string representation - /// to integer representation. Used for improving map performance. - StringIdMap &string_to_int_map_; + absl::flat_hash_map nodes_; friend class ClusterResourceSchedulerTest; friend struct ClusterResourceManagerTest; diff --git a/src/ray/raylet/scheduling/cluster_resource_manager_test.cc b/src/ray/raylet/scheduling/cluster_resource_manager_test.cc index 9bba64da8..96f67e9e2 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager_test.cc @@ -18,14 +18,14 @@ namespace ray { -NodeResources CreateNodeResources(StringIdMap &map, double available_cpu, - double total_cpu, double available_custom_resource = 0, +NodeResources CreateNodeResources(double available_cpu, double total_cpu, + double available_custom_resource = 0, double total_custom_resource = 0, bool object_pulls_queued = false) { NodeResources resources; resources.predefined_resources = {{available_cpu, total_cpu}, {0, 0}, {0, 0}, {0, 0}}; - resources.custom_resources[map.Insert("CUSTOM")] = {available_custom_resource, - total_custom_resource}; + resources.custom_resources[scheduling::ResourceID("CUSTOM").ToInt()] = { + available_custom_resource, total_custom_resource}; resources.object_pulls_queued = object_pulls_queued; return resources; } @@ -33,22 +33,21 @@ NodeResources CreateNodeResources(StringIdMap &map, double available_cpu, struct ClusterResourceManagerTest : public ::testing::Test { void SetUp() { ::testing::Test::SetUp(); - manager = std::make_unique(map); + manager = std::make_unique(); + manager->AddOrUpdateNode(node0, + CreateNodeResources(/*available_cpu*/ 1, /*total_cpu*/ 1)); manager->AddOrUpdateNode( - node0, CreateNodeResources(map, /*available_cpu*/ 1, /*total_cpu*/ 1)); - manager->AddOrUpdateNode( - node1, CreateNodeResources(map, /*available_cpu*/ 0, /*total_cpu*/ 0, + node1, CreateNodeResources(/*available_cpu*/ 0, /*total_cpu*/ 0, /*available_custom*/ 1, /*total_custom*/ 1)); manager->AddOrUpdateNode( - node2, CreateNodeResources(map, /*available_cpu*/ 1, /*total_cpu*/ 1, + node2, CreateNodeResources(/*available_cpu*/ 1, /*total_cpu*/ 1, /*available_custom*/ 1, /*total_custom*/ 1, /*object_pulls_queued*/ true)); } - StringIdMap map; - int64_t node0 = 0; - int64_t node1 = 1; - int64_t node2 = 2; - int64_t node3 = 3; + scheduling::NodeID node0 = scheduling::NodeID(0); + scheduling::NodeID node1 = scheduling::NodeID(1); + scheduling::NodeID node2 = scheduling::NodeID(2); + scheduling::NodeID node3 = scheduling::NodeID(3); std::unique_ptr manager; }; @@ -57,32 +56,32 @@ TEST_F(ClusterResourceManagerTest, HasSufficientResourceTest) { node3, {}, /*ignore_object_store_memory_requirement*/ false)); ASSERT_TRUE(manager->HasSufficientResource( node0, - ResourceMapToResourceRequest(map, {{"CPU", 1}}, + ResourceMapToResourceRequest({{"CPU", 1}}, /*requires_object_store_memory=*/true), /*ignore_object_store_memory_requirement*/ false)); ASSERT_FALSE(manager->HasSufficientResource( node0, - ResourceMapToResourceRequest(map, {{"CUSTOM", 1}}, + ResourceMapToResourceRequest({{"CUSTOM", 1}}, /*requires_object_store_memory=*/true), /*ignore_object_store_memory_requirement*/ false)); ASSERT_TRUE(manager->HasSufficientResource( node1, - ResourceMapToResourceRequest(map, {{"CUSTOM", 1}}, + ResourceMapToResourceRequest({{"CUSTOM", 1}}, /*requires_object_store_memory=*/true), /*ignore_object_store_memory_requirement*/ false)); ASSERT_TRUE(manager->HasSufficientResource( node2, - ResourceMapToResourceRequest(map, {{"CPU", 1}}, + ResourceMapToResourceRequest({{"CPU", 1}}, /*requires_object_store_memory=*/false), /*ignore_object_store_memory_requirement*/ false)); ASSERT_FALSE(manager->HasSufficientResource( node2, - ResourceMapToResourceRequest(map, {{"CPU", 1}}, + ResourceMapToResourceRequest({{"CPU", 1}}, /*requires_object_store_memory=*/true), /*ignore_object_store_memory_requirement*/ false)); ASSERT_TRUE(manager->HasSufficientResource( node2, - ResourceMapToResourceRequest(map, {{"CPU", 1}}, + ResourceMapToResourceRequest({{"CPU", 1}}, /*requires_object_store_memory=*/true), /*ignore_object_store_memory_requirement*/ true)); } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index b2d382aee..c6400cefa 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -21,25 +21,13 @@ namespace ray { -namespace { -// Add predefined resource in string_to_int_map to -// avoid conflict. -void PopulatePredefinedResources(StringIdMap &string_to_int_map) { - string_to_int_map.InsertOrDie(ray::kCPU_ResourceLabel, CPU) - .InsertOrDie(ray::kGPU_ResourceLabel, GPU) - .InsertOrDie(ray::kObjectStoreMemory_ResourceLabel, OBJECT_STORE_MEM) - .InsertOrDie(ray::kMemory_ResourceLabel, MEM); -} -} // namespace - -ClusterResourceScheduler::ClusterResourceScheduler() { - PopulatePredefinedResources(string_to_int_map_); - cluster_resource_manager_ = - std::make_unique(string_to_int_map_); +ClusterResourceScheduler::ClusterResourceScheduler() + : local_node_id_(scheduling::NodeID::Nil()) { + cluster_resource_manager_ = std::make_unique(); NodeResources node_resources; node_resources.predefined_resources.resize(PredefinedResources_MAX); local_resource_manager_ = std::make_unique( - local_node_id_, string_to_int_map_, node_resources, + local_node_id_, node_resources, /*get_used_object_store_memory*/ nullptr, /*get_pull_manager_at_capacity*/ nullptr, [&](const NodeResources &local_resource_update) { cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update); @@ -49,14 +37,12 @@ ClusterResourceScheduler::ClusterResourceScheduler() { } ClusterResourceScheduler::ClusterResourceScheduler( - int64_t local_node_id, const NodeResources &local_node_resources, + scheduling::NodeID local_node_id, const NodeResources &local_node_resources, gcs::GcsClient &gcs_client) - : string_to_int_map_(), local_node_id_(local_node_id), gcs_client_(&gcs_client) { - PopulatePredefinedResources(string_to_int_map_); - cluster_resource_manager_ = - std::make_unique(string_to_int_map_); + : local_node_id_(local_node_id), gcs_client_(&gcs_client) { + cluster_resource_manager_ = std::make_unique(); local_resource_manager_ = std::make_unique( - local_node_id, string_to_int_map_, local_node_resources, + local_node_id, local_node_resources, /*get_used_object_store_memory*/ nullptr, /*get_pull_manager_at_capacity*/ nullptr, [&](const NodeResources &local_resource_update) { cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update); @@ -67,19 +53,16 @@ ClusterResourceScheduler::ClusterResourceScheduler( } ClusterResourceScheduler::ClusterResourceScheduler( - const std::string &local_node_id, + scheduling::NodeID local_node_id, const absl::flat_hash_map &local_node_resources, gcs::GcsClient &gcs_client, std::function get_used_object_store_memory, std::function get_pull_manager_at_capacity) - : string_to_int_map_(), local_node_id_(), gcs_client_(&gcs_client) { - PopulatePredefinedResources(string_to_int_map_); - local_node_id_ = string_to_int_map_.Insert(local_node_id); - NodeResources node_resources = ResourceMapToNodeResources( - string_to_int_map_, local_node_resources, local_node_resources); - cluster_resource_manager_ = - std::make_unique(string_to_int_map_); + : local_node_id_(local_node_id), gcs_client_(&gcs_client) { + NodeResources node_resources = + ResourceMapToNodeResources(local_node_resources, local_node_resources); + cluster_resource_manager_ = std::make_unique(); local_resource_manager_ = std::make_unique( - local_node_id_, string_to_int_map_, node_resources, get_used_object_store_memory, + local_node_id_, node_resources, get_used_object_store_memory, get_pull_manager_at_capacity, [&](const NodeResources &local_resource_update) { cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update); }); @@ -88,19 +71,18 @@ ClusterResourceScheduler::ClusterResourceScheduler( local_node_id_, cluster_resource_manager_->GetResourceView()); } -bool ClusterResourceScheduler::NodeAlive(int64_t node_id) const { +bool ClusterResourceScheduler::NodeAlive(scheduling::NodeID node_id) const { if (node_id == local_node_id_) { return true; } - if (node_id == -1) { + if (node_id.IsNil()) { return false; } - auto node_id_binary = string_to_int_map_.Get(node_id); - return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id_binary)) != nullptr; + return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id.Binary())) != nullptr; } bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_request, - int64_t node_id) const { + scheduling::NodeID node_id) const { // It's okay if the local node's pull manager is at capacity because we // will eventually spill the task back from the waiting queue if its args // cannot be pulled. @@ -109,7 +91,7 @@ bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_req /*ignore_object_store_memory_requirement*/ node_id == local_node_id_); } -int64_t ClusterResourceScheduler::GetBestSchedulableNode( +scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( const ResourceRequest &resource_request, const rpc::SchedulingStrategy &scheduling_strategy, bool actor_creation, bool force_spillback, int64_t *total_violations, bool *is_infeasible) { @@ -120,7 +102,7 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode( resource_request, [this](auto node_id) { return this->NodeAlive(node_id); }); } - int64_t best_node_id = -1; + auto best_node_id = scheduling::NodeID::Nil(); if (scheduling_strategy.scheduling_strategy_case() == rpc::SchedulingStrategy::SchedulingStrategyCase::kSpreadSchedulingStrategy) { best_node_id = scheduling_policy_->SpreadPolicy( @@ -135,7 +117,7 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode( [this](auto node_id) { return this->NodeAlive(node_id); }); } - *is_infeasible = best_node_id == -1 ? true : false; + *is_infeasible = best_node_id.IsNil(); if (!*is_infeasible) { // TODO (Alex): Support soft constraints if needed later. *total_violations = 0; @@ -143,35 +125,26 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode( RAY_LOG(DEBUG) << "Scheduling decision. " << "forcing spillback: " << force_spillback - << ". Best node: " << best_node_id << " " - << (string_to_int_map_.Get(best_node_id) == "-1" - ? NodeID::Nil() - : NodeID::FromBinary(string_to_int_map_.Get(best_node_id))) + << ". Best node: " << best_node_id.ToInt() << " " + << (best_node_id.IsNil() ? NodeID::Nil() + : NodeID::FromBinary(best_node_id.Binary())) << ", is infeasible: " << *is_infeasible; return best_node_id; } -std::string ClusterResourceScheduler::GetBestSchedulableNode( +scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( const absl::flat_hash_map &task_resources, const rpc::SchedulingStrategy &scheduling_strategy, bool requires_object_store_memory, bool actor_creation, bool force_spillback, int64_t *total_violations, bool *is_infeasible) { - ResourceRequest resource_request = ResourceMapToResourceRequest( - string_to_int_map_, task_resources, requires_object_store_memory); - int64_t node_id = - GetBestSchedulableNode(resource_request, scheduling_strategy, actor_creation, - force_spillback, total_violations, is_infeasible); - - if (node_id == -1) { - // This is not a schedulable node, so return empty string. - return ""; - } - // Return the string name of the node. - return string_to_int_map_.Get(node_id); + ResourceRequest resource_request = + ResourceMapToResourceRequest(task_resources, requires_object_store_memory); + return GetBestSchedulableNode(resource_request, scheduling_strategy, actor_creation, + force_spillback, total_violations, is_infeasible); } bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources( - int64_t node_id, const ResourceRequest &resource_request) { + scheduling::NodeID node_id, const ResourceRequest &resource_request) { RAY_CHECK(node_id != local_node_id_); // Just double check this node can still schedule the resource request. @@ -182,46 +155,40 @@ bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources( resource_request); } -const StringIdMap &ClusterResourceScheduler::GetStringIdMap() const { - return string_to_int_map_; -} - std::string ClusterResourceScheduler::DebugString(void) const { std::stringstream buffer; - buffer << "\nLocal id: " << local_node_id_; + buffer << "\nLocal id: " << local_node_id_.ToInt(); buffer << " Local resources: " << local_resource_manager_->DebugString(); cluster_resource_manager_->DebugString(buffer); return buffer.str(); } bool ClusterResourceScheduler::AllocateRemoteTaskResources( - const std::string &node_string, + scheduling::NodeID node_id, const absl::flat_hash_map &task_resources) { ResourceRequest resource_request = ResourceMapToResourceRequest( - string_to_int_map_, task_resources, /*requires_object_store_memory=*/false); - auto node_id = string_to_int_map_.Insert(node_string); + task_resources, /*requires_object_store_memory=*/false); RAY_CHECK(node_id != local_node_id_); return SubtractRemoteNodeAvailableResources(node_id, resource_request); } bool ClusterResourceScheduler::IsSchedulableOnNode( - const std::string &node_name, const absl::flat_hash_map &shape) { - int64_t node_id = string_to_int_map_.Get(node_name); - auto resource_request = ResourceMapToResourceRequest( - string_to_int_map_, shape, /*requires_object_store_memory=*/false); + scheduling::NodeID node_id, const absl::flat_hash_map &shape) { + auto resource_request = + ResourceMapToResourceRequest(shape, /*requires_object_store_memory=*/false); return IsSchedulable(resource_request, node_id); } -std::string ClusterResourceScheduler::GetBestSchedulableNode( +scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( const TaskSpecification &task_spec, bool prioritize_local_node, bool exclude_local_node, bool requires_object_store_memory, bool *is_infeasible) { // If the local node is available, we should directly return it instead of // going through the full hybrid policy since we don't want spillback. if (prioritize_local_node && !exclude_local_node && - IsSchedulableOnNode(string_to_int_map_.Get(local_node_id_), + IsSchedulableOnNode(local_node_id_, task_spec.GetRequiredResources().GetResourceMap())) { *is_infeasible = false; - return string_to_int_map_.Get(local_node_id_); + return local_node_id_; } // This argument is used to set violation, which is an unsupported feature now. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 350852713..47e521e75 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -50,18 +50,17 @@ class ClusterResourceScheduler { /// \param local_node_id: ID of local node, /// \param local_node_resources: The total and the available resources associated /// with the local node. - ClusterResourceScheduler(int64_t local_node_id, + ClusterResourceScheduler(scheduling::NodeID local_node_id, const NodeResources &local_node_resources, gcs::GcsClient &gcs_client); + ClusterResourceScheduler( - const std::string &local_node_id, + scheduling::NodeID local_node_id, const absl::flat_hash_map &local_node_resources, gcs::GcsClient &gcs_client, std::function get_used_object_store_memory = nullptr, std::function get_pull_manager_at_capacity = nullptr); - const StringIdMap &GetStringIdMap() const; - /// Find a node in the cluster on which we can schedule a given resource request. /// In hybrid mode, see `scheduling_policy.h` for a description of the policy. /// @@ -76,10 +75,11 @@ class ClusterResourceScheduler { /// /// \return emptry string, if no node can schedule the current request; otherwise, /// return the string name of a node that can schedule the resource request. - std::string GetBestSchedulableNode(const TaskSpecification &task_spec, - bool prioritize_local_node, bool exclude_local_node, - bool requires_object_store_memory, - bool *is_infeasible); + scheduling::NodeID GetBestSchedulableNode(const TaskSpecification &task_spec, + bool prioritize_local_node, + bool exclude_local_node, + bool requires_object_store_memory, + bool *is_infeasible); /// Subtract the resources required by a given resource request (resource_request) from /// a given remote node. @@ -89,7 +89,7 @@ class ClusterResourceScheduler { /// \return True if remote node has enough resources to satisfy the resource request. /// False otherwise. bool AllocateRemoteTaskResources( - const std::string &node_id, + scheduling::NodeID node_id, const absl::flat_hash_map &task_resources); /// Return human-readable string for this scheduler state. @@ -100,7 +100,7 @@ class ClusterResourceScheduler { /// /// \param node_name Name of the node. /// \param shape The resource demand's shape. - bool IsSchedulableOnNode(const std::string &node_name, + bool IsSchedulableOnNode(scheduling::NodeID node_id, const absl::flat_hash_map &shape); LocalResourceManager &GetLocalResourceManager() { return *local_resource_manager_; } @@ -109,7 +109,7 @@ class ClusterResourceScheduler { } private: - bool NodeAlive(int64_t node_id) const; + bool NodeAlive(scheduling::NodeID node_id) const; /// Decrease the available resources of a node when a resource request is /// scheduled on the given node. @@ -119,7 +119,7 @@ class ClusterResourceScheduler { /// /// \return true, if resource_request can be indeed scheduled on the node, /// and false otherwise. - bool SubtractRemoteNodeAvailableResources(int64_t node_id, + bool SubtractRemoteNodeAvailableResources(scheduling::NodeID node_id, const ResourceRequest &resource_request); /// Check whether a resource request can be scheduled given a node. @@ -128,7 +128,8 @@ class ClusterResourceScheduler { /// \param node_id: ID of the node. /// /// \return: Whether the request can be scheduled. - bool IsSchedulable(const ResourceRequest &resource_request, int64_t node_id) const; + bool IsSchedulable(const ResourceRequest &resource_request, + scheduling::NodeID node_id) const; /// Find a node in the cluster on which we can schedule a given resource request. /// In hybrid mode, see `scheduling_policy.h` for a description of the policy. @@ -145,10 +146,10 @@ class ClusterResourceScheduler { /// /// \return -1, if no node can schedule the current request; otherwise, /// return the ID of a node that can schedule the resource request. - int64_t GetBestSchedulableNode(const ResourceRequest &resource_request, - const rpc::SchedulingStrategy &scheduling_strategy, - bool actor_creation, bool force_spillback, - int64_t *violations, bool *is_infeasible); + scheduling::NodeID GetBestSchedulableNode( + const ResourceRequest &resource_request, + const rpc::SchedulingStrategy &scheduling_strategy, bool actor_creation, + bool force_spillback, int64_t *violations, bool *is_infeasible); /// Similar to /// int64_t GetBestSchedulableNode(...) @@ -156,17 +157,14 @@ class ClusterResourceScheduler { /// \return "", if no node can schedule the current request; otherwise, /// return the ID in string format of a node that can schedule the // resource request. - std::string GetBestSchedulableNode( + scheduling::NodeID GetBestSchedulableNode( const absl::flat_hash_map &resource_request, const rpc::SchedulingStrategy &scheduling_strategy, bool requires_object_store_memory, bool actor_creation, bool force_spillback, int64_t *violations, bool *is_infeasible); - /// Keep the mapping between node and resource IDs in string representation - /// to integer representation. Used for improving map performance. - StringIdMap string_to_int_map_; /// Identifier of local node. - int64_t local_node_id_; + scheduling::NodeID local_node_id_; /// Gcs client. It's not owned by this class. gcs::GcsClient *gcs_client_; /// Resources of local node. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 6624272d0..8c565d60a 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -185,19 +185,19 @@ class ClusterResourceSchedulerTest : public ::testing::Test { initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(i, node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(i), node_resources); node_resources.custom_resources.clear(); } } - void AssertPredefinedNodeResources(const ClusterResourceScheduler &resource_scheduler) { - ASSERT_EQ(resource_scheduler.string_to_int_map_.Get(ray::kCPU_ResourceLabel), CPU); - ASSERT_EQ(resource_scheduler.string_to_int_map_.Get(ray::kGPU_ResourceLabel), GPU); - ASSERT_EQ( - resource_scheduler.string_to_int_map_.Get(ray::kObjectStoreMemory_ResourceLabel), - OBJECT_STORE_MEM); - ASSERT_EQ(resource_scheduler.string_to_int_map_.Get(ray::kMemory_ResourceLabel), MEM); + void AssertPredefinedNodeResources() { + ASSERT_EQ(ray::kCPU_ResourceLabel, scheduling::ResourceID(CPU).Binary()); + ASSERT_EQ(ray::kGPU_ResourceLabel, scheduling::ResourceID(GPU).Binary()); + ASSERT_EQ(ray::kObjectStoreMemory_ResourceLabel, + scheduling::ResourceID(OBJECT_STORE_MEM).Binary()); + ASSERT_EQ(ray::kMemory_ResourceLabel, scheduling::ResourceID(MEM).Binary()); } std::unique_ptr gcs_client_; std::string node_name; @@ -313,7 +313,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingIdInsertOrDieTest) { TEST_F(ClusterResourceSchedulerTest, SchedulingInitClusterTest) { int num_nodes = 10; ClusterResourceScheduler resource_scheduler; - AssertPredefinedNodeResources(resource_scheduler); + AssertPredefinedNodeResources(); initCluster(resource_scheduler, num_nodes); @@ -327,7 +327,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) { ClusterResourceScheduler resource_scheduler; initCluster(resource_scheduler, num_nodes); - resource_scheduler.GetClusterResourceManager().RemoveNode(remove_id); + resource_scheduler.GetClusterResourceManager().RemoveNode( + scheduling::NodeID(remove_id)); ASSERT_TRUE(num_nodes - 1 == resource_scheduler.GetClusterResourceManager().NumNodes()); } @@ -361,19 +362,19 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) { cust_capacities.push_back(rand() % 10); initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(update_id, - node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(update_id), node_resources); } ASSERT_TRUE(num_nodes == resource_scheduler.GetClusterResourceManager().NumNodes()); } TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { absl::flat_hash_map resource_total({{"CPU", 10}}); - auto local_node_id = NodeID::FromRandom().Binary(); + auto local_node_id = scheduling::NodeID(NodeID::FromRandom().Binary()); ClusterResourceScheduler resource_scheduler(local_node_id, resource_total, *gcs_client_); - AssertPredefinedNodeResources(resource_scheduler); - auto remote_node_id = NodeID::FromRandom().Binary(); + AssertPredefinedNodeResources(); + auto remote_node_id = scheduling::NodeID(NodeID::FromRandom().Binary()); resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( remote_node_id, resource_total, resource_total); @@ -382,17 +383,17 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { bool is_infeasible; rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_spread_scheduling_strategy(); - std::string node_id_1 = resource_scheduler.GetBestSchedulableNode( + auto node_id_1 = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, false, &violations, &is_infeasible); absl::flat_hash_map resource_available({{"CPU", 9}}); resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( node_id_1, resource_total, resource_available); - std::string node_id_2 = resource_scheduler.GetBestSchedulableNode( + auto node_id_2 = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, false, &violations, &is_infeasible); - ASSERT_EQ((std::set{node_id_1, node_id_2}), - (std::set{local_node_id, remote_node_id})); + ASSERT_EQ((std::set{node_id_1, node_id_2}), + (std::set{local_node_id, remote_node_id})); } TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { @@ -402,8 +403,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { vector cust_ids{1, 2}; vector cust_capacities{5, 5}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(1, node_resources, *gcs_client_); - AssertPredefinedNodeResources(resource_scheduler); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(1), node_resources, + *gcs_client_); + AssertPredefinedNodeResources(); { ResourceRequest resource_request; @@ -416,9 +418,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { bool is_infeasible; rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); - int64_t node_id = resource_scheduler.GetBestSchedulableNode( + auto node_id = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); - ASSERT_EQ(node_id, 1); + ASSERT_EQ(node_id.ToInt(), 1); ASSERT_TRUE(violations == 0); NodeResources nr1, nr2; @@ -455,23 +457,23 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest) { absl::flat_hash_map initial_resources = { {ray::kCPU_ResourceLabel, 1}, {"custom", 1}}; std::string name = NodeID::FromRandom().Binary(); - ClusterResourceScheduler resource_scheduler(name, initial_resources, *gcs_client_, - nullptr, nullptr); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(name), initial_resources, + *gcs_client_, nullptr, nullptr); resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( - ray::kCPU_ResourceLabel, {0, 1, 1}); - resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances("custom", - {0, 1, 1}); + scheduling::ResourceID(ray::kCPU_ResourceLabel), {0, 1, 1}); + resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( + scheduling::ResourceID("custom"), {0, 1, 1}); const auto &predefined_resources = resource_scheduler.GetClusterResourceManager() - .GetNodeResources(name) + .GetNodeResources(scheduling::NodeID(name)) .predefined_resources; ASSERT_EQ(predefined_resources[CPU].total.Double(), 3); const auto &custom_resources = resource_scheduler.GetClusterResourceManager() - .GetNodeResources(name) + .GetNodeResources(scheduling::NodeID(name)) .custom_resources; - auto resource_id = resource_scheduler.string_to_int_map_.Get("custom"); + auto resource_id = scheduling::ResourceID("custom").ToInt(); ASSERT_EQ(custom_resources.find(resource_id)->second.total.Double(), 3); } @@ -487,13 +489,14 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) { vector cust_ids{1, 2}; vector cust_capacities{5, 5}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id, - node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(node_id), node_resources); nr = node_resources; } // Check whether node resources were correctly added. - if (resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr_out)) { + if (resource_scheduler.GetClusterResourceManager().GetNodeResources( + scheduling::NodeID(node_id), &nr_out)) { ASSERT_TRUE(nodeResourcesEqual(nr, nr_out)); } else { ASSERT_TRUE(false); @@ -506,11 +509,12 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) { vector cust_ids{2, 3}; vector cust_capacities{6, 6}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id, - node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(node_id), node_resources); nr = node_resources; } - if (resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr_out)) { + if (resource_scheduler.GetClusterResourceManager().GetNodeResources( + scheduling::NodeID(node_id), &nr_out)) { ASSERT_TRUE(nodeResourcesEqual(nr, nr_out)); } else { ASSERT_TRUE(false); @@ -524,9 +528,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { vector cust_ids{1}; vector cust_capacities{10}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); auto node_id = NodeID::FromRandom(); - auto node_internal_id = resource_scheduler.string_to_int_map_.Insert(node_id.Binary()); rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); { @@ -535,8 +539,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { vector cust_ids{1, 2}; vector cust_capacities{5, 5}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_internal_id, - node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(node_id.Binary()), node_resources); } // Predefined resources, hard constraint violation { @@ -546,9 +550,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { EmptyFixedPointVector); int64_t violations; bool is_infeasible; - int64_t node_id = resource_scheduler.GetBestSchedulableNode( + auto node_id = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); - ASSERT_EQ(node_id, -1); + ASSERT_TRUE(node_id.IsNil()); } // Predefined resources, no constraint violation. @@ -559,9 +563,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { EmptyFixedPointVector); int64_t violations; bool is_infeasible; - int64_t node_id = resource_scheduler.GetBestSchedulableNode( + auto node_id = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); - ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } // Custom resources, hard constraint violation. @@ -573,9 +577,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands); int64_t violations; bool is_infeasible; - int64_t node_id = resource_scheduler.GetBestSchedulableNode( + auto node_id = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); - ASSERT_TRUE(node_id == -1); + ASSERT_TRUE(node_id.IsNil()); } // Custom resources, no constraint violation. { @@ -586,9 +590,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands); int64_t violations; bool is_infeasible; - int64_t node_id = resource_scheduler.GetBestSchedulableNode( + auto node_id = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); - ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } // Custom resource missing, hard constraint violation. @@ -600,9 +604,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands); int64_t violations; bool is_infeasible; - int64_t node_id = resource_scheduler.GetBestSchedulableNode( + auto node_id = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); - ASSERT_TRUE(node_id == -1); + ASSERT_TRUE(node_id.IsNil()); } // Placement hints, no constraint violation. { @@ -613,9 +617,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands); int64_t violations; bool is_infeasible; - int64_t node_id = resource_scheduler.GetBestSchedulableNode( + auto node_id = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); - ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } } @@ -633,7 +637,8 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesWithCpuUnitTest) vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); TaskResourceInstances available_cluster_resources = resource_scheduler.GetLocalResourceManager() @@ -665,7 +670,8 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesTest) { vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); TaskResourceInstances available_cluster_resources = resource_scheduler.GetLocalResourceManager() @@ -701,7 +707,7 @@ TEST_F(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest) { vector pred_capacities{3 /* CPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler cluster(0, node_resources, *gcs_client_); + ClusterResourceScheduler cluster(scheduling::NodeID(0), node_resources, *gcs_client_); ResourceInstanceCapacities instances; @@ -734,7 +740,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector pred_capacities{3. /* CPU */, 4. /* MEM */, 5. /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); ResourceRequest resource_request; vector pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -766,7 +773,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); ResourceRequest resource_request; vector pred_demands = {4. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -794,7 +802,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector cust_ids{1, 2}; vector cust_capacities{4, 4}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); ResourceRequest resource_request; vector pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -826,7 +835,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector cust_ids{1, 2}; vector cust_capacities{4, 4}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); ResourceRequest resource_request; vector pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -855,7 +865,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesAllocationFailureTest) vector cust_ids{1, 2, 3}; vector cust_capacities{4, 4, 4}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); ResourceRequest resource_request; vector pred_demands = {0. /* CPU */, 0. /* MEM */, 0. /* GPU */}; @@ -885,7 +896,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) { vector cust_ids{1, 2}; vector cust_capacities{4., 4.}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); ResourceRequest resource_request; vector pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -913,25 +925,29 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) { } TEST_F(ClusterResourceSchedulerTest, DeadNodeTest) { - ClusterResourceScheduler resource_scheduler("local", {}, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), + absl::flat_hash_map{}, + *gcs_client_); absl::flat_hash_map resource; resource["CPU"] = 10000.0; auto node_id = NodeID::FromRandom(); - resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id.Binary(), - resource, resource); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(node_id.Binary()), resource, resource); int64_t violations = 0; bool is_infeasible = false; rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); - ASSERT_EQ(node_id.Binary(), resource_scheduler.GetBestSchedulableNode( - resource, scheduling_strategy, false, false, false, - &violations, &is_infeasible)); + ASSERT_EQ(scheduling::NodeID(node_id.Binary()), + resource_scheduler.GetBestSchedulableNode(resource, scheduling_strategy, + false, false, false, &violations, + &is_infeasible)); EXPECT_CALL(*gcs_client_->mock_node_accessor, Get(node_id, ::testing::_)) .WillOnce(::testing::Return(nullptr)) .WillOnce(::testing::Return(nullptr)); - ASSERT_EQ("", resource_scheduler.GetBestSchedulableNode(resource, scheduling_strategy, - false, false, false, - &violations, &is_infeasible)); + ASSERT_TRUE(resource_scheduler + .GetBestSchedulableNode(resource, scheduling_strategy, false, false, + false, &violations, &is_infeasible) + .IsNil()); } TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { @@ -941,7 +957,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); std::vector allocate_gpu_instances{0.5, 0.5, 0.5, 0.5}; resource_scheduler.GetLocalResourceManager().SubtractGPUResourceInstances( @@ -1004,7 +1021,8 @@ TEST_F(ClusterResourceSchedulerTest, vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); { std::vector allocate_gpu_instances{0.5, 0.5, 2, 0.5}; @@ -1023,7 +1041,8 @@ TEST_F(ClusterResourceSchedulerTest, expected_available_gpu_instances.begin())); NodeResources nr; - resource_scheduler.GetClusterResourceManager().GetNodeResources(0, &nr); + resource_scheduler.GetClusterResourceManager().GetNodeResources( + scheduling::NodeID(0), &nr); ASSERT_TRUE(nr.predefined_resources[GPU].available == 1.5); } @@ -1044,7 +1063,8 @@ TEST_F(ClusterResourceSchedulerTest, expected_available_gpu_instances.begin())); NodeResources nr; - resource_scheduler.GetClusterResourceManager().GetNodeResources(0, &nr); + resource_scheduler.GetClusterResourceManager().GetNodeResources( + scheduling::NodeID(0), &nr); ASSERT_TRUE(nr.predefined_resources[GPU].available == 3.8); } } @@ -1055,7 +1075,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) { vector pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); ResourceRequest resource_request; vector pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -1081,7 +1102,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithoutCpuUnitTest) { vector pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources, + *gcs_client_); ResourceRequest resource_request; vector pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -1104,10 +1126,12 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithoutCpuUnitTest) { TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { absl::flat_hash_map resource_spec({{"CPU", 1}}); - ClusterResourceScheduler resource_scheduler("local", {}, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), + absl::flat_hash_map{}, + *gcs_client_); for (int i = 0; i < 100; i++) { resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( - NodeID::FromRandom().Binary(), {}, {}); + scheduling::NodeID(NodeID::FromRandom().Binary()), {}, {}); } // No feasible nodes. @@ -1115,14 +1139,14 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { bool is_infeasible; rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); - ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, scheduling_strategy, - false, false, false, - &total_violations, &is_infeasible), - ""); + ASSERT_TRUE(resource_scheduler + .GetBestSchedulableNode(resource_spec, scheduling_strategy, false, + false, false, &total_violations, &is_infeasible) + .IsNil()); // Feasible remote node, but doesn't currently have resources available. We // should spill there. - auto remote_feasible = NodeID::FromRandom().Binary(); + auto remote_feasible = scheduling::NodeID(NodeID::FromRandom().Binary()); resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( remote_feasible, resource_spec, {{"CPU", 0.}}); ASSERT_EQ(remote_feasible, resource_scheduler.GetBestSchedulableNode( @@ -1131,7 +1155,7 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { // Feasible remote node, and it currently has resources available. We should // prefer to spill there. - auto remote_available = NodeID::FromRandom().Binary(); + auto remote_available = scheduling::NodeID(NodeID::FromRandom().Binary()); resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( remote_available, resource_spec, resource_spec); ASSERT_EQ(remote_available, resource_scheduler.GetBestSchedulableNode( @@ -1146,14 +1170,15 @@ TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) { absl::flat_hash_map initial_resources( {{"CPU", 1}, {"GPU", 2}, {"memory", 3}, {"1", 1}, {"2", 2}, {"3", 3}}); - ClusterResourceScheduler resource_scheduler("0", initial_resources, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("0"), initial_resources, + *gcs_client_); NodeResources other_node_resources; vector other_pred_capacities{1. /* CPU */, 1. /* MEM */, 1. /* GPU */}; vector other_cust_capacities{5., 4., 3., 2., 1.}; initNodeResources(other_node_resources, other_pred_capacities, cust_ids, other_cust_capacities); - resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(12345, - other_node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(12345), other_node_resources); { // Cluster is idle. rpc::ResourcesData data; @@ -1229,15 +1254,15 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) { {"object_store_memory", 1000 * 1024 * 1024}}); int64_t used_object_store_memory = 250 * 1024 * 1024; int64_t *ptr = &used_object_store_memory; - ClusterResourceScheduler resource_scheduler("0", initial_resources, *gcs_client_, - [&] { return *ptr; }); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("0"), initial_resources, + *gcs_client_, [&] { return *ptr; }); NodeResources other_node_resources; vector other_pred_capacities{1. /* CPU */, 1. /* MEM */, 1. /* GPU */}; vector other_cust_capacities{10.}; initNodeResources(other_node_resources, other_pred_capacities, cust_ids, other_cust_capacities); - resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(12345, - other_node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(12345), other_node_resources); { rpc::ResourcesData data; @@ -1326,8 +1351,9 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) { TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) { absl::flat_hash_map initial_resources({{"CPU", 1}}); - ClusterResourceScheduler resource_scheduler("local", initial_resources, *gcs_client_); - auto remote = NodeID::FromRandom().Binary(); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), + initial_resources, *gcs_client_); + auto remote = scheduling::NodeID(NodeID::FromRandom().Binary()); resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(remote, {{"CPU", 2.}}, {{"CPU", 2.}}); const absl::flat_hash_map task_spec = {{"CPU", 1.}}; @@ -1366,9 +1392,10 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) { } // Our local view says there are not enough resources on the remote node to // schedule another task. - ASSERT_EQ("", resource_scheduler.GetBestSchedulableNode( - task_spec, scheduling_strategy, false, false, true, &t, - &is_infeasible)); + ASSERT_EQ( + resource_scheduler.GetBestSchedulableNode(task_spec, scheduling_strategy, false, + false, true, &t, &is_infeasible), + scheduling::NodeID::Nil()); ASSERT_FALSE( resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources( task_spec, task_allocation)); @@ -1378,7 +1405,8 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) { } TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { - ClusterResourceScheduler resource_scheduler("local", {{"CPU", 2}}, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), {{"CPU", 2}}, + *gcs_client_); absl::flat_hash_map resource_request = {{"CPU", 1}, {"custom123", 2}}; @@ -1387,36 +1415,38 @@ TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); - std::string result = resource_scheduler.GetBestSchedulableNode( + auto result = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); - ASSERT_TRUE(result.empty()); + ASSERT_TRUE(result.IsNil()); - resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances("custom123", - {0., 1.0, 1.0}); + resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( + scheduling::ResourceID("custom123"), {0., 1.0, 1.0}); result = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); - ASSERT_FALSE(result.empty()) << resource_scheduler.DebugString(); + ASSERT_FALSE(result.IsNil()) << resource_scheduler.DebugString(); resource_request["custom123"] = 3; result = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); - ASSERT_TRUE(result.empty()); + ASSERT_TRUE(result.IsNil()); - resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances("custom123", - {1.0}); + resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( + scheduling::ResourceID("custom123"), {1.0}); result = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); - ASSERT_FALSE(result.empty()); + ASSERT_FALSE(result.IsNil()); - resource_scheduler.GetLocalResourceManager().DeleteLocalResource("custom123"); + resource_scheduler.GetLocalResourceManager().DeleteLocalResource( + scheduling::ResourceID("custom123")); result = resource_scheduler.GetBestSchedulableNode( resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); - ASSERT_TRUE(result.empty()); + ASSERT_TRUE(result.IsNil()); } TEST_F(ClusterResourceSchedulerTest, AvailableResourceEmptyTest) { - ClusterResourceScheduler resource_scheduler("local", {{"custom123", 5}}, *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), + {{"custom123", 5}}, *gcs_client_); std::shared_ptr resource_instances = std::make_shared(); absl::flat_hash_map resource_request = {{"custom123", 5}}; @@ -1424,16 +1454,17 @@ TEST_F(ClusterResourceSchedulerTest, AvailableResourceEmptyTest) { resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources( resource_request, resource_instances); ASSERT_TRUE(allocated); - ASSERT_TRUE( - resource_scheduler.GetLocalResourceManager().IsAvailableResourceEmpty("custom123")); + ASSERT_TRUE(resource_scheduler.GetLocalResourceManager().IsAvailableResourceEmpty( + scheduling::ResourceID("custom123"))); } TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { absl::flat_hash_map resource_spec({{"CPU", 1}}); - ClusterResourceScheduler resource_scheduler("local", resource_spec, *gcs_client_); - std::vector node_ids; + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), resource_spec, + *gcs_client_); + std::vector node_ids; for (int i = 0; i < 100; i++) { - node_ids.push_back(NodeID::FromRandom().Binary()); + node_ids.emplace_back(NodeID::FromRandom().Binary()); resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_ids.back(), {}, {}); } @@ -1447,20 +1478,20 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { ASSERT_EQ(resource_scheduler.GetBestSchedulableNode( resource_spec, scheduling_strategy, false, false, /*force_spillback=*/false, &total_violations, &is_infeasible), - "local"); + scheduling::NodeID("local")); // If spillback is forced, we try to spill to remote, but only if there is a // schedulable node. ASSERT_EQ(resource_scheduler.GetBestSchedulableNode( resource_spec, scheduling_strategy, false, false, /*force_spillback=*/true, &total_violations, &is_infeasible), - ""); + scheduling::NodeID::Nil()); // Choose a remote node that has the resources available. resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_ids[50], resource_spec, {}); ASSERT_EQ(resource_scheduler.GetBestSchedulableNode( resource_spec, scheduling_strategy, false, false, /*force_spillback=*/true, &total_violations, &is_infeasible), - ""); + scheduling::NodeID::Nil()); resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( node_ids[51], resource_spec, resource_spec); ASSERT_EQ(resource_scheduler.GetBestSchedulableNode( @@ -1476,8 +1507,8 @@ TEST_F(ClusterResourceSchedulerTest, CustomResourceInstanceTest) { "custom_unit_instance_resources": "FPGA" } )"); - ClusterResourceScheduler resource_scheduler("local", {{"CPU", 4}, {"FPGA", 2}}, - *gcs_client_); + ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), + {{"CPU", 4}, {"FPGA", 2}}, *gcs_client_); StringIdMap mock_string_to_int_map; int64_t fpga_resource_id = mock_string_to_int_map.Insert("FPGA"); @@ -1509,7 +1540,7 @@ TEST_F(ClusterResourceSchedulerTest, CustomResourceInstanceTest) { TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesSerializedStringTest) { ClusterResourceScheduler resource_scheduler( - "local", {{"CPU", 4}, {"memory", 4}, {"GPU", 2}}, *gcs_client_); + scheduling::NodeID("local"), {{"CPU", 4}, {"memory", 4}, {"GPU", 2}}, *gcs_client_); std::shared_ptr cluster_resources = std::make_shared(); addTaskResourceInstances(true, {2.}, 0, cluster_resources.get()); @@ -1534,7 +1565,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesSerializedStringTest) addTaskResourceInstances(true, {4.}, 1, cluster_instance_resources.get()); addTaskResourceInstances(true, {1., 1.}, 2, cluster_instance_resources.get()); ClusterResourceScheduler resource_scheduler_cpu_instance( - "local", {{"CPU", 4}, {"memory", 4}, {"GPU", 2}}, *gcs_client_); + scheduling::NodeID("local"), {{"CPU", 4}, {"memory", 4}, {"GPU", 2}}, *gcs_client_); std::string instance_serialized_string = resource_scheduler_cpu_instance.GetLocalResourceManager() .SerializedTaskResourceInstances(cluster_instance_resources); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 6349f0603..c06eb2aa3 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -77,21 +77,21 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() { RayTask task = work->task; RAY_LOG(DEBUG) << "Scheduling pending task " << task.GetTaskSpecification().TaskId(); - std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( + auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( task.GetTaskSpecification(), work->PrioritizeLocalNode(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, &is_infeasible); // There is no node that has available resources to run the request. // Move on to the next shape. - if (node_id_string.empty()) { + if (scheduling_node_id.IsNil()) { RAY_LOG(DEBUG) << "No node found to schedule a task " << task.GetTaskSpecification().TaskId() << " is infeasible?" << is_infeasible; break; } - NodeID node_id = NodeID::FromBinary(node_id_string); + NodeID node_id = NodeID::FromBinary(scheduling_node_id.Binary()); ScheduleOnNode(node_id, work); work_it = work_queue.erase(work_it); } @@ -129,7 +129,7 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() { RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:" << task.GetTaskSpecification().TaskId(); bool is_infeasible; - std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( + cluster_resource_scheduler_->GetBestSchedulableNode( task.GetTaskSpecification(), work->PrioritizeLocalNode(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, &is_infeasible); @@ -293,7 +293,8 @@ void ClusterTaskManager::ScheduleOnNode(const NodeID &spillback_to, RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to; if (!cluster_resource_scheduler_->AllocateRemoteTaskResources( - spillback_to.Binary(), task_spec.GetRequiredResources().GetResourceMap())) { + scheduling::NodeID(spillback_to.Binary()), + task_spec.GetRequiredResources().GetResourceMap())) { RAY_LOG(DEBUG) << "Tried to allocate resources for request " << task_spec.TaskId() << " on a remote node that are no longer available"; } diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index ae74baa40..1beeee389 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -129,8 +129,8 @@ std::shared_ptr CreateSingleNodeScheduler( local_node_resources[ray::kGPU_ResourceLabel] = num_gpus; local_node_resources[ray::kMemory_ResourceLabel] = 128; - auto scheduler = - std::make_shared(id, local_node_resources, gcs_client); + auto scheduler = std::make_shared( + scheduling::NodeID(id), local_node_resources, gcs_client); return scheduler; } @@ -285,8 +285,8 @@ class ClusterTaskManagerTest : public ::testing::Test { node_resources[ray::kCPU_ResourceLabel] = num_cpus; node_resources[ray::kGPU_ResourceLabel] = num_gpus; node_resources[ray::kMemory_ResourceLabel] = memory; - scheduler_->GetClusterResourceManager().AddOrUpdateNode(id.Binary(), node_resources, - node_resources); + scheduler_->GetClusterResourceManager().AddOrUpdateNode( + scheduling::NodeID(id.Binary()), node_resources, node_resources); rpc::GcsNodeInfo info; node_info_[id] = info; @@ -1480,7 +1480,8 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) { // Delete cpu resource of local node, then task 2 should be turned into // infeasible. - scheduler_->GetLocalResourceManager().DeleteLocalResource(ray::kCPU_ResourceLabel); + scheduler_->GetLocalResourceManager().DeleteLocalResource( + scheduling::ResourceID(ray::kCPU_ResourceLabel)); RayTask task2 = CreateTask({{ray::kCPU_ResourceLabel, 4}}); rpc::RequestWorkerLeaseReply reply2; @@ -1506,7 +1507,8 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) { TEST_F(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources) { const NodeResources &node_resources = - scheduler_->GetClusterResourceManager().GetNodeResources(id_.Binary()); + scheduler_->GetClusterResourceManager().GetNodeResources( + scheduling::NodeID(id_.Binary())); ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8); ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 4); @@ -1831,8 +1833,8 @@ TEST_F(ClusterTaskManagerTest, PopWorkerExactlyOnce) { } TEST_F(ClusterTaskManagerTest, CapRunningOnDispatchQueue) { - scheduler_->GetLocalResourceManager().AddLocalResourceInstances(ray::kGPU_ResourceLabel, - {1, 1, 1}); + scheduler_->GetLocalResourceManager().AddLocalResourceInstances( + scheduling::ResourceID(ray::kGPU_ResourceLabel), {1, 1, 1}); RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 4}, {ray::kGPU_ResourceLabel, 1}}, /*num_args=*/0, /*args=*/{}); RayTask task2 = CreateTask({{ray::kCPU_ResourceLabel, 4}, {ray::kGPU_ResourceLabel, 1}}, @@ -1883,8 +1885,8 @@ TEST_F(ClusterTaskManagerTest, CapRunningOnDispatchQueue) { } TEST_F(ClusterTaskManagerTest, ZeroCPUTasks) { - scheduler_->GetLocalResourceManager().AddLocalResourceInstances(ray::kGPU_ResourceLabel, - {1, 1, 1}); + scheduler_->GetLocalResourceManager().AddLocalResourceInstances( + scheduling::ResourceID(ray::kGPU_ResourceLabel), {1, 1, 1}); RayTask task = CreateTask({{"GPU", 1}}, /*num_args=*/0, /*args=*/{}); RayTask task2 = CreateTask({{"GPU", 1}}, /*num_args=*/0, /*args=*/{}); RayTask task3 = CreateTask({{"GPU", 1}}, /*num_args=*/0, /*args=*/{}); diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index b64da23cb..e7cfb41bc 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -22,20 +22,17 @@ namespace ray { LocalResourceManager::LocalResourceManager( - int64_t local_node_id, StringIdMap &resource_name_to_id, - const NodeResources &node_resources, + scheduling::NodeID local_node_id, const NodeResources &node_resources, std::function get_used_object_store_memory, std::function get_pull_manager_at_capacity, std::function resource_change_subscriber) : local_node_id_(local_node_id), - resource_name_to_id_(resource_name_to_id), get_used_object_store_memory_(get_used_object_store_memory), get_pull_manager_at_capacity_(get_pull_manager_at_capacity), resource_change_subscriber_(resource_change_subscriber) { InitResourceUnitInstanceInfo(); InitLocalResources(node_resources); - RAY_LOG(DEBUG) << "local resources: " - << local_resources_.DebugString(resource_name_to_id); + RAY_LOG(DEBUG) << "local resources: " << local_resources_.DebugString(); } void LocalResourceManager::InitResourceUnitInstanceInfo() { @@ -57,14 +54,15 @@ void LocalResourceManager::InitResourceUnitInstanceInfo() { std::vector results; boost::split(results, custom_unit_instance_resources, boost::is_any_of(",")); for (std::string &result : results) { - int64_t resource_id = resource_name_to_id_.Insert(result); + int64_t resource_id = scheduling::ResourceID(result).ToInt(); custom_unit_instance_resources_.emplace(resource_id); } } } void LocalResourceManager::AddLocalResourceInstances( - const std::string &resource_name, const std::vector &instances) { + scheduling::ResourceID resource_id, const std::vector &instances) { + auto resource_name = resource_id.Binary(); ResourceInstanceCapacities *node_instances; local_resources_.predefined_resources.resize(PredefinedResources_MAX); if (kCPU_ResourceLabel == resource_name) { @@ -76,9 +74,7 @@ void LocalResourceManager::AddLocalResourceInstances( } else if (kMemory_ResourceLabel == resource_name) { node_instances = &local_resources_.predefined_resources[MEM]; } else { - resource_name_to_id_.Insert(resource_name); - int64_t resource_id = resource_name_to_id_.Get(resource_name); - node_instances = &local_resources_.custom_resources[resource_id]; + node_instances = &local_resources_.custom_resources[resource_id.ToInt()]; } if (node_instances->total.size() < instances.size()) { @@ -93,8 +89,8 @@ void LocalResourceManager::AddLocalResourceInstances( OnResourceChanged(); } -void LocalResourceManager::DeleteLocalResource(const std::string &resource_name) { - int idx = GetPredefinedResourceIndex(resource_name); +void LocalResourceManager::DeleteLocalResource(scheduling::ResourceID resource_id) { + int idx = GetPredefinedResourceIndex(resource_id); if (idx != -1) { for (auto &total : local_resources_.predefined_resources[idx].total) { total = 0; @@ -103,26 +99,23 @@ void LocalResourceManager::DeleteLocalResource(const std::string &resource_name) available = 0; } } else { - int64_t resource_id = resource_name_to_id_.Get(resource_name); - auto c_itr = local_resources_.custom_resources.find(resource_id); + auto c_itr = local_resources_.custom_resources.find(resource_id.ToInt()); if (c_itr != local_resources_.custom_resources.end()) { - local_resources_.custom_resources[resource_id].total.clear(); - local_resources_.custom_resources[resource_id].available.clear(); + local_resources_.custom_resources[resource_id.ToInt()].total.clear(); + local_resources_.custom_resources[resource_id.ToInt()].available.clear(); local_resources_.custom_resources.erase(c_itr); } } OnResourceChanged(); } -bool LocalResourceManager::IsAvailableResourceEmpty(const std::string &resource_name) { - int idx = GetPredefinedResourceIndex(resource_name); +bool LocalResourceManager::IsAvailableResourceEmpty(scheduling::ResourceID resource_id) { + int idx = GetPredefinedResourceIndex(resource_id); if (idx != -1) { return FixedPoint::Sum(local_resources_.predefined_resources[idx].available) <= 0; } - resource_name_to_id_.Insert(resource_name); - int64_t resource_id = resource_name_to_id_.Get(resource_name); - auto itr = local_resources_.custom_resources.find(resource_id); + auto itr = local_resources_.custom_resources.find(resource_id.ToInt()); if (itr != local_resources_.custom_resources.end()) { return FixedPoint::Sum(itr->second.available) <= 0; } else { @@ -132,7 +125,7 @@ bool LocalResourceManager::IsAvailableResourceEmpty(const std::string &resource_ std::string LocalResourceManager::DebugString(void) const { std::stringstream buffer; - buffer << local_resources_.DebugString(resource_name_to_id_); + buffer << local_resources_.DebugString(); return buffer.str(); } @@ -441,7 +434,7 @@ bool LocalResourceManager::AllocateLocalTaskResources( RAY_CHECK(task_allocation != nullptr); // We don't track object store memory demands so no need to allocate them. ResourceRequest resource_request = ResourceMapToResourceRequest( - resource_name_to_id_, task_resources, /*requires_object_store_memory=*/false); + task_resources, /*requires_object_store_memory=*/false); return AllocateLocalTaskResources(resource_request, task_allocation); } @@ -510,8 +503,7 @@ void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data) // Initialize if last report resources is empty. if (!last_report_resources_) { - NodeResources node_resources = - ResourceMapToNodeResources(resource_name_to_id_, {{}}, {{}}); + NodeResources node_resources = ResourceMapToNodeResources({{}}, {{}}); last_report_resources_.reset(new NodeResources(node_resources)); } @@ -533,7 +525,7 @@ void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data) uint64_t custom_id = it.first; const auto &capacity = it.second; const auto &last_capacity = last_report_resources_->custom_resources[custom_id]; - const auto &label = resource_name_to_id_.Get(custom_id); + auto label = scheduling::ResourceID(custom_id).Binary(); // Note: available may be negative, but only report positive to GCS. if (capacity.available != last_capacity.available && capacity.available > 0) { resources_data.set_resources_available_changed(true); @@ -541,7 +533,8 @@ void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data) capacity.available.Double(); } if (capacity.total != last_capacity.total) { - (*resources_data.mutable_resources_total())[label] = capacity.total.Double(); + (*resources_data.mutable_resources_total())[std::move(label)] = + capacity.total.Double(); } } @@ -586,7 +579,7 @@ ray::gcs::NodeResourceInfoAccessor::ResourceMap LocalResourceManager::GetResourc } for (auto entry : local_resources_.custom_resources) { - std::string resource_name = resource_name_to_id_.Get(entry.first); + std::string resource_name = scheduling::ResourceID(entry.first).Binary(); double resource_total = FixedPoint::Sum(entry.second.total).Double(); if (!resource_map_filter.contains(resource_name)) { continue; @@ -646,36 +639,28 @@ std::string LocalResourceManager::SerializedTaskResourceInstances( void LocalResourceManager::ResetLastReportResourceUsage( const SchedulingResources &replacement) { - last_report_resources_ = std::make_unique(ResourceMapToNodeResources( - resource_name_to_id_, replacement.GetTotalResources().GetResourceMap(), - replacement.GetAvailableResources().GetResourceMap())); + last_report_resources_ = std::make_unique( + ResourceMapToNodeResources(replacement.GetTotalResources().GetResourceMap(), + replacement.GetAvailableResources().GetResourceMap())); } -bool LocalResourceManager::ResourcesExist(const std::string &resource_name) { - int idx = GetPredefinedResourceIndex(resource_name); +bool LocalResourceManager::ResourcesExist(scheduling::ResourceID resource_id) { + int idx = GetPredefinedResourceIndex(resource_id); if (idx != -1) { // Return true directly for predefined resources as we always initialize this kind of // resources at the beginning. return true; } else { - int64_t resource_id = resource_name_to_id_.Get(resource_name); - const auto &it = local_resources_.custom_resources.find(resource_id); + const auto &it = local_resources_.custom_resources.find(resource_id.ToInt()); return it != local_resources_.custom_resources.end(); } } -int GetPredefinedResourceIndex(const std::string &resource_name) { - int idx = -1; - if (resource_name == ray::kCPU_ResourceLabel) { - idx = (int)ray::CPU; - } else if (resource_name == ray::kGPU_ResourceLabel) { - idx = (int)ray::GPU; - } else if (resource_name == ray::kObjectStoreMemory_ResourceLabel) { - idx = (int)ray::OBJECT_STORE_MEM; - } else if (resource_name == ray::kMemory_ResourceLabel) { - idx = (int)ray::MEM; - }; - return idx; +int GetPredefinedResourceIndex(scheduling::ResourceID resource_id) { + if (resource_id.ToInt() >= 0 && resource_id.ToInt() < PredefinedResources_MAX) { + return resource_id.ToInt(); + } + return -1; } } // namespace ray diff --git a/src/ray/raylet/scheduling/local_resource_manager.h b/src/ray/raylet/scheduling/local_resource_manager.h index 750972c8c..5bd6275be 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.h +++ b/src/ray/raylet/scheduling/local_resource_manager.h @@ -40,30 +40,29 @@ namespace ray { class LocalResourceManager { public: LocalResourceManager( - int64_t local_node_id, StringIdMap &resource_name_to_id, - const NodeResources &node_resources, + scheduling::NodeID local_node_id, const NodeResources &node_resources, std::function get_used_object_store_memory, std::function get_pull_manager_at_capacity, std::function resource_change_subscriber); - int64_t GetNodeId() const { return local_node_id_; } + scheduling::NodeID GetNodeId() const { return local_node_id_; } /// Add a local resource that is available. /// /// \param resource_name: Resource which we want to update. /// \param resource_total: New capacity of the resource. - void AddLocalResourceInstances(const std::string &resource_name, + void AddLocalResourceInstances(scheduling::ResourceID resource_id, const std::vector &instances); /// Delete a given resource from the local node. /// /// \param resource_name: Resource we want to delete - void DeleteLocalResource(const std::string &resource_name); + void DeleteLocalResource(scheduling::ResourceID resource_id); /// Check whether the available resources are empty. /// /// \param resource_name: Resource which we want to check. - bool IsAvailableResourceEmpty(const std::string &resource_name); + bool IsAvailableResourceEmpty(scheduling::ResourceID resource_id); /// Return local resources. NodeResourceInstances GetLocalResources() const { return local_resources_; } @@ -164,7 +163,7 @@ class LocalResourceManager { /// \param resource_name: the specific resource name. /// /// \return true, if exist. otherwise, false. - bool ResourcesExist(const std::string &resource_name); + bool ResourcesExist(scheduling::ResourceID resource_id); private: /// Create instances for each resource associated with the local node, given @@ -270,10 +269,7 @@ class LocalResourceManager { void UpdateAvailableObjectStoreMemResource(); /// Identifier of local node. - int64_t local_node_id_; - /// Keep the mapping between node and resource IDs in string representation - /// to integer representation. Used for improving map performance. - StringIdMap &resource_name_to_id_; + scheduling::NodeID local_node_id_; /// Resources of local node. NodeResourceInstances local_resources_; /// Cached resources, used to compare with newest one in light heartbeat mode. @@ -301,6 +297,6 @@ class LocalResourceManager { FRIEND_TEST(ClusterResourceSchedulerTest, CustomResourceInstanceTest); }; -int GetPredefinedResourceIndex(const std::string &resource_name); +int GetPredefinedResourceIndex(scheduling::ResourceID resource_id); } // end namespace ray diff --git a/src/ray/raylet/scheduling/local_task_manager.cc b/src/ray/raylet/scheduling/local_task_manager.cc index 2db52e8e9..9b698e139 100644 --- a/src/ray/raylet/scheduling/local_task_manager.cc +++ b/src/ray/raylet/scheduling/local_task_manager.cc @@ -315,13 +315,14 @@ void LocalTaskManager::SpillWaitingTasks() { // TODO(swang): The policy currently does not account for the amount of // object store memory availability. Ideally, we should pick the node with // the most memory availability. - std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( + auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( (*it)->task.GetTaskSpecification(), /*prioritize_local_node*/ true, /*exclude_local_node*/ force_spillback, /*requires_object_store_memory*/ true, &is_infeasible); - if (!node_id_string.empty() && node_id_string != self_node_id_.Binary()) { - NodeID node_id = NodeID::FromBinary(node_id_string); + if (!scheduling_node_id.IsNil() && + scheduling_node_id.Binary() != self_node_id_.Binary()) { + NodeID node_id = NodeID::FromBinary(scheduling_node_id.Binary()); Spillback(node_id, *it); if (!task.GetTaskSpecification().GetDependencies().empty()) { task_dependency_manager_.RemoveTaskDependencies( @@ -330,7 +331,7 @@ void LocalTaskManager::SpillWaitingTasks() { waiting_tasks_index_.erase(task_id); it = waiting_task_queue_.erase(it); } else { - if (node_id_string.empty()) { + if (scheduling_node_id.IsNil()) { RAY_LOG(DEBUG) << "RayTask " << task_id << " has blocked dependencies, but no other node has resources, " "keeping the task local"; @@ -347,17 +348,17 @@ void LocalTaskManager::SpillWaitingTasks() { bool LocalTaskManager::TrySpillback(const std::shared_ptr &work, bool &is_infeasible) { - std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( + auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( work->task.GetTaskSpecification(), work->PrioritizeLocalNode(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, &is_infeasible); - if (is_infeasible || node_id_string == self_node_id_.Binary() || - node_id_string.empty()) { + if (is_infeasible || scheduling_node_id.IsNil() || + scheduling_node_id.Binary() == self_node_id_.Binary()) { return false; } - NodeID node_id = NodeID::FromBinary(node_id_string); + NodeID node_id = NodeID::FromBinary(scheduling_node_id.Binary()); Spillback(node_id, work); return true; } @@ -387,7 +388,7 @@ bool LocalTaskManager::PoppedWorkerHandler( task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); for (auto &entry : required_resource) { if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist( - entry.first)) { + scheduling::ResourceID(entry.first))) { RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first != PlacementGroupID::Nil()); RAY_LOG(DEBUG) << "The placement group: " @@ -522,7 +523,8 @@ void LocalTaskManager::Spillback(const NodeID &spillback_to, RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to; if (!cluster_resource_scheduler_->AllocateRemoteTaskResources( - spillback_to.Binary(), task_spec.GetRequiredResources().GetResourceMap())) { + scheduling::NodeID(spillback_to.Binary()), + task_spec.GetRequiredResources().GetResourceMap())) { RAY_LOG(DEBUG) << "Tried to allocate resources for request " << task_spec.TaskId() << " on a remote node that are no longer available"; } @@ -984,7 +986,6 @@ bool LocalTaskManager::ReturnCpuResourcesToBlockedWorker( ResourceSet LocalTaskManager::CalcNormalTaskResources() const { absl::flat_hash_map total_normal_task_resources; - const auto &string_id_map = cluster_resource_scheduler_->GetStringIdMap(); for (auto &entry : leased_workers_) { std::shared_ptr worker = entry.second; auto &task_spec = worker->GetAssignedTask().GetTaskSpecification(); @@ -1009,7 +1010,8 @@ ResourceSet LocalTaskManager::CalcNormalTaskResources() const { } for (auto &entry : resource_request.custom_resources) { if (entry.second > 0) { - total_normal_task_resources[string_id_map.Get(entry.first)] += entry.second; + total_normal_task_resources[scheduling::ResourceID(entry.first).Binary()] += + entry.second; } } } diff --git a/src/ray/raylet/scheduling/scheduling_ids.cc b/src/ray/raylet/scheduling/scheduling_ids.cc index a965dec1d..39ce1c944 100644 --- a/src/ray/raylet/scheduling/scheduling_ids.cc +++ b/src/ray/raylet/scheduling/scheduling_ids.cc @@ -14,6 +14,8 @@ #include "ray/raylet/scheduling/scheduling_ids.h" +namespace ray { + int64_t StringIdMap::Get(const std::string &string_id) const { auto it = string_to_int_.find(string_id); if (it == string_to_int_.end()) { @@ -68,3 +70,5 @@ StringIdMap &StringIdMap::InsertOrDie(const std::string &string_id, int64_t valu } int64_t StringIdMap::Count() { return string_to_int_.size(); } + +} // namespace ray diff --git a/src/ray/raylet/scheduling/scheduling_ids.h b/src/ray/raylet/scheduling/scheduling_ids.h index faf79d482..e374040d9 100644 --- a/src/ray/raylet/scheduling/scheduling_ids.h +++ b/src/ray/raylet/scheduling/scheduling_ids.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include "absl/container/flat_hash_map.h" @@ -22,6 +23,17 @@ /// Limit the ID range to test for collisions. #define MAX_ID_TEST 8 +namespace ray { + +/// List of predefined resources. +enum PredefinedResources { CPU, MEM, GPU, OBJECT_STORE_MEM, PredefinedResources_MAX }; + +const std::string kCPU_ResourceLabel = "CPU"; +const std::string kGPU_ResourceLabel = "GPU"; +const std::string kObjectStoreMemory_ResourceLabel = "object_store_memory"; +const std::string kMemory_ResourceLabel = "memory"; +const std::string kBundle_ResourceLabel = "bundle"; + /// Class to map string IDs to unique integer IDs and back. class StringIdMap { absl::flat_hash_map string_to_int_; @@ -62,3 +74,85 @@ class StringIdMap { /// Get number of identifiers. int64_t Count(); }; + +enum class SchedulingIDTag { Node, Resource }; + +/// Represent a string scheduling id. It optimizes the storage by +/// using a singleton StringIdMap, and only store the integer index as +/// its only member. +/// +/// Note: this class is not thread safe! +template +class BaseSchedulingID { + public: + explicit BaseSchedulingID(const std::string &name) : id_{GetMap().Insert(name)} {} + + explicit BaseSchedulingID(int64_t id) : id_{id} {} + + int64_t ToInt() const { return id_; } + + std::string Binary() const { return GetMap().Get(id_); } + + bool operator==(const BaseSchedulingID &rhs) const { return id_ == rhs.id_; } + + bool operator!=(const BaseSchedulingID &rhs) const { return id_ != rhs.id_; } + + bool operator<(const BaseSchedulingID &rhs) const { return id_ < rhs.id_; } + + bool IsNil() const { return id_ == -1; } + + static BaseSchedulingID Nil() { return BaseSchedulingID(-1); } + + private: + /// Meyer's singleton to store the StringIdMap. + static StringIdMap &GetMap() { + static StringIdMap map; + return map; + } + int64_t id_ = -1; +}; + +template +std::ostream &operator<<(std::ostream &os, const ray::BaseSchedulingID &id) { + os << id.ToInt(); + return os; +} + +template <> +inline std::ostream &operator<<( + std::ostream &os, const ray::BaseSchedulingID &id) { + os << id.Binary(); + return os; +} + +/// Specialization for SchedulingIDTag. Specifically, we populate +/// the singleton map with PredefinedResources. +template <> +inline StringIdMap &BaseSchedulingID::GetMap() { + static StringIdMap map = []() { + StringIdMap map; + return map.InsertOrDie(kCPU_ResourceLabel, CPU) + .InsertOrDie(kGPU_ResourceLabel, GPU) + .InsertOrDie(kObjectStoreMemory_ResourceLabel, OBJECT_STORE_MEM) + .InsertOrDie(kMemory_ResourceLabel, MEM); + }(); + return map; +} + +namespace scheduling { +/// The actual scheduling id definitions which are used in scheduler. +using ResourceID = BaseSchedulingID; +using NodeID = BaseSchedulingID; +} // namespace scheduling + +} // namespace ray + +/// implements hash function for BaseSchedulingID +namespace std { +template +struct hash> { + std::size_t operator()(const ray::BaseSchedulingID &id) const { + return std::hash()(id.ToInt()); + } +}; +} // namespace std diff --git a/src/ray/raylet/scheduling/scheduling_ids_test.cc b/src/ray/raylet/scheduling/scheduling_ids_test.cc new file mode 100644 index 000000000..6bff8092a --- /dev/null +++ b/src/ray/raylet/scheduling/scheduling_ids_test.cc @@ -0,0 +1,52 @@ +// Copyright 2021 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/scheduling/scheduling_ids.h" + +#include "gtest/gtest.h" + +namespace ray { +using namespace ray::scheduling; + +struct SchedulingIDsTest : public ::testing::Test {}; + +TEST_F(SchedulingIDsTest, BasicTest) { + std::vector string_ids = {"hello", "whaaat", "yes"}; + std::vector node_ids; + for (auto &string_id : string_ids) { + node_ids.emplace_back(NodeID(string_id)); + ASSERT_EQ(node_ids.back().Binary(), string_id); + } + ASSERT_EQ(node_ids[0], NodeID(string_ids[0])); + ASSERT_EQ(node_ids[0], NodeID(node_ids[0].ToInt())); + + ASSERT_TRUE(NodeID::Nil().IsNil()); + ASSERT_EQ(NodeID::Nil().ToInt(), -1); + ASSERT_EQ(NodeID::Nil().Binary(), "-1"); + + ASSERT_EQ(NodeID(13), NodeID(13)); + ASSERT_NE(NodeID(1), NodeID(2)); + ASSERT_TRUE(NodeID(1) < NodeID(2)); +} + +TEST_F(SchedulingIDsTest, PrepopulateResourceIDTest) { + ASSERT_EQ(kCPU_ResourceLabel, ResourceID(CPU).Binary()); + ASSERT_EQ(kGPU_ResourceLabel, ResourceID(GPU).Binary()); + ASSERT_EQ(kObjectStoreMemory_ResourceLabel, ResourceID(OBJECT_STORE_MEM).Binary()); + ASSERT_EQ(kMemory_ResourceLabel, ResourceID(MEM).Binary()); + + // mean while NodeID is not populated. + ASSERT_NE(kCPU_ResourceLabel, NodeID(CPU).Binary()); +} +} // namespace ray diff --git a/src/ray/raylet/scheduling/scheduling_policy.cc b/src/ray/raylet/scheduling/scheduling_policy.cc index 4b5dbd4cc..f17dd0f78 100644 --- a/src/ray/raylet/scheduling/scheduling_policy.cc +++ b/src/ray/raylet/scheduling/scheduling_policy.cc @@ -38,10 +38,10 @@ bool DoesNodeHaveGPUs(const NodeResources &resources) { } } // namespace -int64_t SchedulingPolicy::SpreadPolicy(const ResourceRequest &resource_request, - bool force_spillback, bool require_available, - std::function is_node_available) { - std::vector round; +scheduling::NodeID SchedulingPolicy::SpreadPolicy( + const ResourceRequest &resource_request, bool force_spillback, bool require_available, + std::function is_node_available) { + std::vector round; round.reserve(nodes_.size()); for (const auto &pair : nodes_) { round.emplace_back(pair.first); @@ -69,19 +69,19 @@ int64_t SchedulingPolicy::SpreadPolicy(const ResourceRequest &resource_request, is_node_available); } -int64_t SchedulingPolicy::HybridPolicyWithFilter( +scheduling::NodeID SchedulingPolicy::HybridPolicyWithFilter( const ResourceRequest &resource_request, float spread_threshold, bool force_spillback, - bool require_available, std::function is_node_available, + bool require_available, std::function is_node_available, NodeFilter node_filter) { // Step 1: Generate the traversal order. We guarantee that the first node is local, to // encourage local scheduling. The rest of the traversal order should be globally // consistent, to encourage using "warm" workers. - std::vector round; + std::vector round; round.reserve(nodes_.size()); const auto local_it = nodes_.find(local_node_id_); RAY_CHECK(local_it != nodes_.end()); auto predicate = [node_filter, &is_node_available]( - int64_t node_id, const NodeResources &node_resources) { + scheduling::NodeID node_id, const NodeResources &node_resources) { if (!is_node_available(node_id)) { return false; } @@ -116,7 +116,7 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter( // place. std::sort(round.begin() + start_index, round.end()); - int64_t best_node_id = -1; + scheduling::NodeID best_node_id = scheduling::NodeID::Nil(); float best_utilization_score = INFINITY; bool best_is_available = false; @@ -143,12 +143,12 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter( ignore_pull_manager_at_capacity); float critical_resource_utilization = node.GetLocalView().CalculateCriticalResourceUtilization(); - RAY_LOG(DEBUG) << "Node " << node_id << " is " + RAY_LOG(DEBUG) << "Node " << node_id.ToInt() << " is " << (is_available ? "available" : "not available") << " for request " << resource_request.DebugString() << " with critical resource utilization " << critical_resource_utilization << " based on local view " - << node.GetLocalView().DebugString(StringIdMap()); + << node.GetLocalView().DebugString(); if (critical_resource_utilization < spread_threshold) { critical_resource_utilization = 0; } @@ -180,11 +180,10 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter( return best_node_id; } -int64_t SchedulingPolicy::HybridPolicy(const ResourceRequest &resource_request, - float spread_threshold, bool force_spillback, - bool require_available, - std::function is_node_available, - bool scheduler_avoid_gpu_nodes) { +scheduling::NodeID SchedulingPolicy::HybridPolicy( + const ResourceRequest &resource_request, float spread_threshold, bool force_spillback, + bool require_available, std::function is_node_available, + bool scheduler_avoid_gpu_nodes) { if (!scheduler_avoid_gpu_nodes || IsGPURequest(resource_request)) { return HybridPolicyWithFilter(resource_request, spread_threshold, force_spillback, require_available, std::move(is_node_available)); @@ -194,7 +193,7 @@ int64_t SchedulingPolicy::HybridPolicy(const ResourceRequest &resource_request, auto best_node_id = HybridPolicyWithFilter( resource_request, spread_threshold, force_spillback, /*require_available*/ true, is_node_available, NodeFilter::kNonGpu); - if (best_node_id != -1) { + if (!best_node_id.IsNil()) { return best_node_id; } @@ -204,9 +203,10 @@ int64_t SchedulingPolicy::HybridPolicy(const ResourceRequest &resource_request, require_available, is_node_available); } -int64_t SchedulingPolicy::RandomPolicy(const ResourceRequest &resource_request, - std::function is_node_available) { - int64_t best_node = -1; +scheduling::NodeID SchedulingPolicy::RandomPolicy( + const ResourceRequest &resource_request, + std::function is_node_available) { + scheduling::NodeID best_node = scheduling::NodeID::Nil(); if (nodes_.empty()) { return best_node; } @@ -230,7 +230,7 @@ int64_t SchedulingPolicy::RandomPolicy(const ResourceRequest &resource_request, iter = nodes_.begin(); } } - RAY_LOG(DEBUG) << "RandomPolicy, best_node = " << best_node + RAY_LOG(DEBUG) << "RandomPolicy, best_node = " << best_node.ToInt() << ", # nodes = " << nodes_.size() << ", resource_request = " << resource_request.DebugString(); return best_node; diff --git a/src/ray/raylet/scheduling/scheduling_policy.h b/src/ray/raylet/scheduling/scheduling_policy.h index 548cd1979..94588fa77 100644 --- a/src/ray/raylet/scheduling/scheduling_policy.h +++ b/src/ray/raylet/scheduling/scheduling_policy.h @@ -25,7 +25,8 @@ namespace raylet_scheduling_policy { class SchedulingPolicy { public: - SchedulingPolicy(int64_t local_node_id, const absl::flat_hash_map &nodes) + SchedulingPolicy(scheduling::NodeID local_node_id, + const absl::flat_hash_map &nodes) : local_node_id_(local_node_id), nodes_(nodes), gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {} @@ -62,30 +63,31 @@ class SchedulingPolicy { /// /// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to /// schedule on. - int64_t HybridPolicy( + scheduling::NodeID HybridPolicy( const ResourceRequest &resource_request, float spread_threshold, bool force_spillback, bool require_available, - std::function is_node_available, + std::function is_node_available, bool scheduler_avoid_gpu_nodes = RayConfig::instance().scheduler_avoid_gpu_nodes()); /// Round robin among available nodes. /// If there are no available nodes, fallback to hybrid policy. - int64_t SpreadPolicy(const ResourceRequest &resource_request, bool force_spillback, - bool require_available, - std::function is_node_available); + scheduling::NodeID SpreadPolicy( + const ResourceRequest &resource_request, bool force_spillback, + bool require_available, std::function is_node_available); /// Policy that "randomly" picks a node that could fulfil the request. /// TODO(scv119): if there are a lot of nodes died or can't fulfill the resource /// requirement, the distribution might not be even. - int64_t RandomPolicy(const ResourceRequest &resource_request, - std::function is_node_available); + scheduling::NodeID RandomPolicy( + const ResourceRequest &resource_request, + std::function is_node_available); private: /// Identifier of local node. - const int64_t local_node_id_; + const scheduling::NodeID local_node_id_; /// List of nodes in the clusters and their resources organized as a map. /// The key of the map is the node ID. - const absl::flat_hash_map &nodes_; + const absl::flat_hash_map &nodes_; // The node to start round robin if it's spread scheduling. // The index may be inaccurate when nodes are added or removed dynamically, // but it should still be better than always scanning from 0 for spread scheduling. @@ -111,11 +113,11 @@ class SchedulingPolicy { /// /// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to /// schedule on. - int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request, - float spread_threshold, bool force_spillback, - bool require_available, - std::function is_node_available, - NodeFilter node_filter = NodeFilter::kAny); + scheduling::NodeID HybridPolicyWithFilter( + const ResourceRequest &resource_request, float spread_threshold, + bool force_spillback, bool require_available, + std::function is_node_available, + NodeFilter node_filter = NodeFilter::kAny); }; } // namespace raylet_scheduling_policy } // namespace ray diff --git a/src/ray/raylet/scheduling/scheduling_policy_test.cc b/src/ray/raylet/scheduling/scheduling_policy_test.cc index 6f7381903..1e9ca45c1 100644 --- a/src/ray/raylet/scheduling/scheduling_policy_test.cc +++ b/src/ray/raylet/scheduling/scheduling_policy_test.cc @@ -33,27 +33,28 @@ NodeResources CreateNodeResources(double available_cpu, double total_cpu, return resources; } -class SchedulingPolicyTest : public ::testing::Test {}; +class SchedulingPolicyTest : public ::testing::Test { + public: + scheduling::NodeID local_node = scheduling::NodeID(0); + scheduling::NodeID remote_node = scheduling::NodeID(1); + scheduling::NodeID remote_node_2 = scheduling::NodeID(2); + scheduling::NodeID remote_node_3 = scheduling::NodeID(3); + absl::flat_hash_map nodes; +}; TEST_F(SchedulingPolicyTest, SpreadPolicyTest) { - StringIdMap map; - ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node_1 = 1; - int64_t remote_node_2 = 2; - int64_t remote_node_3 = 3; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false); - absl::flat_hash_map nodes; nodes.emplace(local_node, CreateNodeResources(20, 20, 0, 0, 0, 0)); // Unavailable node - nodes.emplace(remote_node_1, CreateNodeResources(0, 20, 0, 0, 0, 0)); + nodes.emplace(remote_node, CreateNodeResources(0, 20, 0, 0, 0, 0)); // Infeasible node nodes.emplace(remote_node_2, CreateNodeResources(0, 0, 0, 0, 0, 0)); nodes.emplace(remote_node_3, CreateNodeResources(20, 20, 0, 0, 0, 0)); raylet_scheduling_policy::SchedulingPolicy scheduling_policy(local_node, nodes); - int64_t to_schedule = + auto to_schedule = scheduling_policy.SpreadPolicy(req, false, false, [](auto) { return true; }); ASSERT_EQ(to_schedule, local_node); @@ -67,17 +68,10 @@ TEST_F(SchedulingPolicyTest, SpreadPolicyTest) { } TEST_F(SchedulingPolicyTest, RandomPolicyTest) { - StringIdMap map; - ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node_1 = 1; - int64_t remote_node_2 = 2; - int64_t remote_node_3 = 3; - - absl::flat_hash_map nodes; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false); nodes.emplace(local_node, CreateNodeResources(20, 20, 0, 0, 0, 0)); - nodes.emplace(remote_node_1, CreateNodeResources(20, 20, 0, 0, 0, 0)); + nodes.emplace(remote_node, CreateNodeResources(20, 20, 0, 0, 0, 0)); // Unavailable node nodes.emplace(remote_node_2, CreateNodeResources(0, 20, 0, 0, 0, 0)); // Infeasible node @@ -85,14 +79,14 @@ TEST_F(SchedulingPolicyTest, RandomPolicyTest) { raylet_scheduling_policy::SchedulingPolicy scheduling_policy(local_node, nodes); - std::map decisions; + std::map decisions; size_t num_node_0_picks = 0; size_t num_node_1_picks = 0; for (int i = 0; i < 1000; i++) { - int64_t to_schedule = scheduling_policy.RandomPolicy(req, [](auto) { return true; }); - ASSERT_TRUE(to_schedule >= 0); - ASSERT_TRUE(to_schedule <= 1); - if (to_schedule == 0) { + auto to_schedule = scheduling_policy.RandomPolicy(req, [](auto) { return true; }); + ASSERT_TRUE(to_schedule.ToInt() >= 0); + ASSERT_TRUE(to_schedule.ToInt() <= 1); + if (to_schedule.ToInt() == 0) { num_node_0_picks++; } else { num_node_1_picks++; @@ -104,10 +98,9 @@ TEST_F(SchedulingPolicyTest, RandomPolicyTest) { } TEST_F(SchedulingPolicyTest, FeasibleDefinitionTest) { - StringIdMap map; auto task_req1 = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"object_store_memory", 1}}, false); - auto task_req2 = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); + ResourceMapToResourceRequest({{"CPU", 1}, {"object_store_memory", 1}}, false); + auto task_req2 = ResourceMapToResourceRequest({{"CPU", 1}}, false); { // Don't break with a non-resized predefined resources array. NodeResources resources; @@ -127,10 +120,9 @@ TEST_F(SchedulingPolicyTest, FeasibleDefinitionTest) { } TEST_F(SchedulingPolicyTest, AvailableDefinitionTest) { - StringIdMap map; auto task_req1 = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"object_store_memory", 1}}, false); - auto task_req2 = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); + ResourceMapToResourceRequest({{"CPU", 1}, {"object_store_memory", 1}}, false); + auto task_req2 = ResourceMapToResourceRequest({{"CPU", 1}}, false); { // Don't break with a non-resized predefined resources array. NodeResources resources; @@ -192,34 +184,28 @@ TEST_F(SchedulingPolicyTest, AvailableTruncationTest) { // In this test, the local node and a remote node are both available. The remote node // has a lower critical resource utilization, but they're both truncated to 0, so we // should still pick the local node (due to traversal order). - StringIdMap map; - ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false); - absl::flat_hash_map nodes; nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 0)); nodes.emplace(remote_node, CreateNodeResources(0.75, 2, 0, 0, 0, 0)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.51, false, false, [](auto) { return true; }); + auto to_schedule = + raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.51, false, false, [](auto) { return true; }); ASSERT_EQ(to_schedule, local_node); } TEST_F(SchedulingPolicyTest, AvailableTieBreakTest) { // In this test, the local node and a remote node are both available. The remote node // has a lower critical resource utilization so we schedule on it. - StringIdMap map; - ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false); - absl::flat_hash_map nodes; nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 0)); nodes.emplace(remote_node, CreateNodeResources(1.5, 2, 0, 0, 0, 0)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.50, false, false, [](auto) { return true; }); + auto to_schedule = + raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.50, false, false, [](auto) { return true; }); ASSERT_EQ(to_schedule, remote_node); } @@ -227,96 +213,67 @@ TEST_F(SchedulingPolicyTest, AvailableOverFeasibleTest) { // In this test, the local node is feasible and has a lower critical resource // utilization, but the remote node can run the task immediately, so we pick the remote // node. - StringIdMap map; - ResourceRequest req = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; - - absl::flat_hash_map nodes; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 0, 1)); nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 1, 1)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.50, false, false, [](auto) { return true; }); + auto to_schedule = + raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.50, false, false, [](auto) { return true; }); ASSERT_EQ(to_schedule, remote_node); } TEST_F(SchedulingPolicyTest, InfeasibleTest) { // All the nodes are infeasible, so we return -1. - StringIdMap map; - ResourceRequest req = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; - - absl::flat_hash_map nodes; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 0, 0)); nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 0, 0)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.50, false, false, [](auto) { return true; }); - ASSERT_EQ(to_schedule, -1); + auto to_schedule = + raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.50, false, false, [](auto) { return true; }); + ASSERT_TRUE(to_schedule.IsNil()); } TEST_F(SchedulingPolicyTest, BarelyFeasibleTest) { // Test the edge case where a task requires all of a node's resources, and the node is // fully utilized. - StringIdMap map; - ResourceRequest req = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); - int64_t local_node = 0; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); - absl::flat_hash_map nodes; nodes.emplace(local_node, CreateNodeResources(0, 1, 0, 0, 0, 1)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.50, false, false, [](auto) { return true; }); + auto to_schedule = + raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.50, false, false, [](auto) { return true; }); ASSERT_EQ(to_schedule, local_node); } TEST_F(SchedulingPolicyTest, TruncationAcrossFeasibleNodesTest) { // Same as AvailableTruncationTest except now none of the nodes are available, but the // tie break logic should apply to feasible nodes too. - StringIdMap map; - ResourceRequest req = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; - - absl::flat_hash_map nodes; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 1)); nodes.emplace(remote_node, CreateNodeResources(0.75, 2, 0, 0, 0, 1)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.51, false, false, [](auto) { return true; }); + auto to_schedule = + raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.51, false, false, [](auto) { return true; }); ASSERT_EQ(to_schedule, local_node); } TEST_F(SchedulingPolicyTest, ForceSpillbackIfAvailableTest) { // The local node is better, but we force spillback, so we'll schedule on a non-local // node anyways. - StringIdMap map; - ResourceRequest req = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; - - absl::flat_hash_map nodes; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1)); nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 1, 10)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.51, true, true, [](auto) { return true; }); + auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.51, true, true, [](auto) { return true; }); ASSERT_EQ(to_schedule, remote_node); } TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) { - StringIdMap map; - int64_t local_node = 0; - int64_t remote_node = 1; - - absl::flat_hash_map nodes; nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 1, 1)); nodes.emplace(remote_node, CreateNodeResources(1, 2, 0, 0, 0, 0)); @@ -324,18 +281,17 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) { // The local node is better, but it has GPUs, the request is // non GPU, and the remote node does not have GPUs, thus // we should schedule on remote node. - const ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); - const int to_schedule = - raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy( - ResourceMapToResourceRequest(map, {{"CPU", 1}}, false), 0.51, false, true, - [](auto) { return true; }, true); + const ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false); + const auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy( + ResourceMapToResourceRequest({{"CPU", 1}}, false), + 0.51, false, true, [](auto) { return true; }, true); ASSERT_EQ(to_schedule, remote_node); } { // A GPU request should be scheduled on a GPU node. - const ResourceRequest req = ResourceMapToResourceRequest(map, {{"GPU", 1}}, false); - const int to_schedule = + const ResourceRequest req = ResourceMapToResourceRequest({{"GPU", 1}}, false); + const auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) .HybridPolicy( req, 0.51, false, true, [](auto) { return true; }, true); @@ -343,8 +299,8 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) { } { // A CPU request can be be scheduled on a CPU node. - const ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); - const int to_schedule = + const ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false); + const auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) .HybridPolicy( req, 0.51, false, true, [](auto) { return true; }, true); @@ -353,8 +309,8 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) { { // A mixed CPU/GPU request should be scheduled on a GPU node. const ResourceRequest req = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); - const int to_schedule = + ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); + const auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) .HybridPolicy( req, 0.51, false, true, [](auto) { return true; }, true); @@ -365,16 +321,11 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) { TEST_F(SchedulingPolicyTest, SchedulenCPURequestsOnGPUNodeAsALastResort) { // Schedule on remote node, even though the request is CPU only, because // we can not schedule on CPU nodes. - StringIdMap map; - ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; - - absl::flat_hash_map nodes; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false); nodes.emplace(local_node, CreateNodeResources(0, 10, 0, 0, 0, 0)); nodes.emplace(remote_node, CreateNodeResources(1, 1, 0, 0, 1, 1)); - const int to_schedule = + const auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) .HybridPolicy( req, 0.51, false, true, [](auto) { return true; }, true); @@ -383,81 +334,66 @@ TEST_F(SchedulingPolicyTest, SchedulenCPURequestsOnGPUNodeAsALastResort) { TEST_F(SchedulingPolicyTest, ForceSpillbackTest) { // The local node is available but disqualified. - StringIdMap map; - ResourceRequest req = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); - absl::flat_hash_map nodes; nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1)); nodes.emplace(remote_node, CreateNodeResources(0, 2, 0, 0, 0, 1)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.51, true, false, [](auto) { return true; }); + auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.51, true, false, [](auto) { return true; }); ASSERT_EQ(to_schedule, remote_node); } TEST_F(SchedulingPolicyTest, ForceSpillbackOnlyFeasibleLocallyTest) { // The local node is better, but we force spillback, so we'll schedule on a non-local // node anyways. - StringIdMap map; - ResourceRequest req = - ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); - int64_t local_node = 0; - int64_t remote_node = 1; + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); - absl::flat_hash_map nodes; nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1)); nodes.emplace(remote_node, CreateNodeResources(0, 2, 0, 0, 0, 0)); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy(req, 0.51, true, false, [](auto) { return true; }); - ASSERT_EQ(to_schedule, -1); + auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy(req, 0.51, true, false, [](auto) { return true; }); + ASSERT_TRUE(to_schedule.IsNil()); } TEST_F(SchedulingPolicyTest, NonGpuNodePreferredSchedulingTest) { // Prefer to schedule on CPU nodes first. // GPU nodes should be preferred as a last resort. - StringIdMap map; - int64_t local_node = 0; - int64_t remote_node_1 = 1; - int64_t remote_node_2 = 2; // local {CPU:2, GPU:1} // Remote {CPU: 2} - absl::flat_hash_map nodes; nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1)); - nodes.emplace(remote_node_1, CreateNodeResources(2, 2, 0, 0, 0, 0)); + nodes.emplace(remote_node, CreateNodeResources(2, 2, 0, 0, 0, 0)); nodes.emplace(remote_node_2, CreateNodeResources(3, 3, 0, 0, 0, 0)); - ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); - int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) - .HybridPolicy( - req, 0.51, false, true, [](auto) { return true; }, - /*gpu_avoid_scheduling*/ true); - ASSERT_EQ(to_schedule, remote_node_1); + ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false); + auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) + .HybridPolicy( + req, 0.51, false, true, [](auto) { return true; }, + /*gpu_avoid_scheduling*/ true); + ASSERT_EQ(to_schedule, remote_node); - req = ResourceMapToResourceRequest(map, {{"CPU", 3}}, false); + req = ResourceMapToResourceRequest({{"CPU", 3}}, false); to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) .HybridPolicy( req, 0.51, false, true, [](auto) { return true; }, /*gpu_avoid_scheduling*/ true); ASSERT_EQ(to_schedule, remote_node_2); - req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); + req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false); to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) .HybridPolicy( req, 0.51, false, true, [](auto) { return true; }, /*gpu_avoid_scheduling*/ true); ASSERT_EQ(to_schedule, local_node); - req = ResourceMapToResourceRequest(map, {{"CPU", 2}}, false); + req = ResourceMapToResourceRequest({{"CPU", 2}}, false); to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes) .HybridPolicy( req, 0.51, false, true, [](auto) { return true; }, /*gpu_avoid_scheduling*/ true); - ASSERT_EQ(to_schedule, remote_node_1); + ASSERT_EQ(to_schedule, remote_node); } int main(int argc, char **argv) {