diff --git a/python/ray/tests/test_placement_group_mini_integration.py b/python/ray/tests/test_placement_group_mini_integration.py index d748dc0ee..d1358e74a 100644 --- a/python/ray/tests/test_placement_group_mini_integration.py +++ b/python/ray/tests/test_placement_group_mini_integration.py @@ -38,6 +38,7 @@ def test_placement_group_remove_stress(ray_start_cluster, execution_number): num_gpus=resource_quantity, resources=custom_resources)) cluster.wait_for_nodes() + num_nodes = len(nodes) ray.init(address=cluster.address) while not ray.is_initialized(): @@ -70,7 +71,11 @@ def test_placement_group_remove_stress(ray_start_cluster, execution_number): # Randomly schedule tasks or actors on placement groups that # are not removed. for pg in pgs_unremoved: - tasks.append(mock_task.options(placement_group=pg).remote()) + for i in range(num_nodes): + tasks.append( + mock_task.options( + placement_group=pg, + placement_group_bundle_index=i).remote()) # Remove the rest of placement groups. for pg in pgs_removed: remove_placement_group(pg) diff --git a/src/mock/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/mock/ray/raylet/scheduling/cluster_resource_scheduler.h index fd94fb591..8c5231124 100644 --- a/src/mock/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/mock/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -30,7 +30,8 @@ class MockClusterResourceScheduler : public ClusterResourceScheduler { (override)); MOCK_METHOD(std::string, GetLocalResourceViewString, (), (const, override)); MOCK_METHOD(void, FillResourceUsage, (rpc::ResourcesData & resources_data), (override)); - MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals, (), + MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals, + (const std::unordered_map &resource_map_filter), (const, override)); MOCK_METHOD(void, UpdateLastResourceUsage, (const std::shared_ptr gcs_resources), (override)); diff --git a/src/mock/ray/raylet/scheduling/cluster_resource_scheduler_interface.h b/src/mock/ray/raylet/scheduling/cluster_resource_scheduler_interface.h index 9cad875fd..79f95c7df 100644 --- a/src/mock/ray/raylet/scheduling/cluster_resource_scheduler_interface.h +++ b/src/mock/ray/raylet/scheduling/cluster_resource_scheduler_interface.h @@ -32,7 +32,8 @@ class MockClusterResourceSchedulerInterface : public ClusterResourceSchedulerInt (const std::shared_ptr gcs_resources), (override)); MOCK_METHOD(void, FillResourceUsage, (rpc::ResourcesData & data), (override)); MOCK_METHOD(double, GetLocalAvailableCpus, (), (const, override)); - MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals, (), + MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals, + (const std::unordered_map &resource_map_filter), (const, override)); MOCK_METHOD(std::string, GetLocalResourceViewString, (), (const, override)); }; diff --git a/src/ray/raylet/placement_group_resource_manager.cc b/src/ray/raylet/placement_group_resource_manager.cc index d9ccfd1ac..9103143c8 100644 --- a/src/ray/raylet/placement_group_resource_manager.cc +++ b/src/ray/raylet/placement_group_resource_manager.cc @@ -102,7 +102,8 @@ void NewPlacementGroupResourceManager::CommitBundle( const auto &string_id_map = cluster_resource_scheduler_->GetStringIdMap(); const auto &task_resource_instances = *bundle_state->resources_; - for (const auto &resource : bundle_spec.GetFormattedResources()) { + const auto &resources = bundle_spec.GetFormattedResources(); + for (const auto &resource : resources) { const auto &resource_name = resource.first; const auto &original_resource_name = GetOriginalResourceName(resource_name); if (original_resource_name != kBundle_ResourceLabel) { @@ -115,7 +116,8 @@ void NewPlacementGroupResourceManager::CommitBundle( } } cluster_resource_scheduler_->UpdateLocalAvailableResourcesFromResourceInstances(); - update_resources_(cluster_resource_scheduler_->GetResourceTotals()); + update_resources_( + cluster_resource_scheduler_->GetResourceTotals(/*resource_name_filter*/ resources)); } void NewPlacementGroupResourceManager::ReturnBundle( diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 73d3d5d07..2f837ec01 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -1120,7 +1120,8 @@ double ClusterResourceScheduler::GetLocalAvailableCpus() const { } ray::gcs::NodeResourceInfoAccessor::ResourceMap -ClusterResourceScheduler::GetResourceTotals() const { +ClusterResourceScheduler::GetResourceTotals( + const std::unordered_map &resource_map_filter) const { ray::gcs::NodeResourceInfoAccessor::ResourceMap map; auto it = nodes_.find(local_node_id_); RAY_CHECK(it != nodes_.end()); @@ -1128,6 +1129,10 @@ ClusterResourceScheduler::GetResourceTotals() const { for (size_t i = 0; i < local_resources.predefined_resources.size(); i++) { std::string resource_name = ResourceEnumToString(static_cast(i)); double resource_total = local_resources.predefined_resources[i].total.Double(); + if (resource_map_filter.count(resource_name) == 0u) { + continue; + } + if (resource_total > 0) { auto data = std::make_shared(); data->set_resource_capacity(resource_total); @@ -1138,6 +1143,10 @@ ClusterResourceScheduler::GetResourceTotals() const { for (auto entry : local_resources.custom_resources) { std::string resource_name = string_to_int_map_.Get(entry.first); double resource_total = entry.second.total.Double(); + if (resource_map_filter.count(resource_name) == 0u) { + continue; + } + if (resource_total > 0) { auto data = std::make_shared(); data->set_resource_capacity(resource_total); diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 8fd316d8e..c5233b367 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -404,8 +404,15 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { /// fields used. void FillResourceUsage(rpc::ResourcesData &resources_data) override; + /// Populate a UpdateResourcesRequest. This is inteneded to update the + /// resource totals on a node when a custom resource is created or deleted + /// (e.g. during the placement group lifecycle). + /// + /// \param resource_map_filter When returning the resource map, the returned result will + /// only contain the keys in the filter. Note that only the key of the map is used. /// \return The total resource capacity of the node. - ray::gcs::NodeResourceInfoAccessor::ResourceMap GetResourceTotals() const override; + ray::gcs::NodeResourceInfoAccessor::ResourceMap GetResourceTotals( + const std::unordered_map &resource_map_filter) const override; /// Update last report resources local cache from gcs cache, /// this is needed when gcs fo. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h b/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h index 2d78bb4a7..1d842d922 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h @@ -78,8 +78,11 @@ class ClusterResourceSchedulerInterface { /// resource totals on a node when a custom resource is created or deleted /// (e.g. during the placement group lifecycle). /// - /// \param Output parameter. Fills out all fields. - virtual ray::gcs::NodeResourceInfoAccessor::ResourceMap GetResourceTotals() const = 0; + /// \param resource_map_filter When returning the resource map, the returned result will + /// only contain the keys in the filter. Note that only the key of the map is used. + /// \return The total resource capacity of the node. + virtual ray::gcs::NodeResourceInfoAccessor::ResourceMap GetResourceTotals( + const std::unordered_map &resource_map_filter) const = 0; /// Return local resources in human-readable string form. virtual std::string GetLocalResourceViewString() const = 0;