mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Placement Group] Fix the high load bug from the placement group (#19277)
This commit is contained in:
parent
6ca3c02041
commit
4360b99803
7 changed files with 37 additions and 9 deletions
|
@ -38,6 +38,7 @@ def test_placement_group_remove_stress(ray_start_cluster, execution_number):
|
|||
num_gpus=resource_quantity,
|
||||
resources=custom_resources))
|
||||
cluster.wait_for_nodes()
|
||||
num_nodes = len(nodes)
|
||||
|
||||
ray.init(address=cluster.address)
|
||||
while not ray.is_initialized():
|
||||
|
@ -70,7 +71,11 @@ def test_placement_group_remove_stress(ray_start_cluster, execution_number):
|
|||
# Randomly schedule tasks or actors on placement groups that
|
||||
# are not removed.
|
||||
for pg in pgs_unremoved:
|
||||
tasks.append(mock_task.options(placement_group=pg).remote())
|
||||
for i in range(num_nodes):
|
||||
tasks.append(
|
||||
mock_task.options(
|
||||
placement_group=pg,
|
||||
placement_group_bundle_index=i).remote())
|
||||
# Remove the rest of placement groups.
|
||||
for pg in pgs_removed:
|
||||
remove_placement_group(pg)
|
||||
|
|
|
@ -30,7 +30,8 @@ class MockClusterResourceScheduler : public ClusterResourceScheduler {
|
|||
(override));
|
||||
MOCK_METHOD(std::string, GetLocalResourceViewString, (), (const, override));
|
||||
MOCK_METHOD(void, FillResourceUsage, (rpc::ResourcesData & resources_data), (override));
|
||||
MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals, (),
|
||||
MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals,
|
||||
(const std::unordered_map<std::string, double> &resource_map_filter),
|
||||
(const, override));
|
||||
MOCK_METHOD(void, UpdateLastResourceUsage,
|
||||
(const std::shared_ptr<SchedulingResources> gcs_resources), (override));
|
||||
|
|
|
@ -32,7 +32,8 @@ class MockClusterResourceSchedulerInterface : public ClusterResourceSchedulerInt
|
|||
(const std::shared_ptr<SchedulingResources> gcs_resources), (override));
|
||||
MOCK_METHOD(void, FillResourceUsage, (rpc::ResourcesData & data), (override));
|
||||
MOCK_METHOD(double, GetLocalAvailableCpus, (), (const, override));
|
||||
MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals, (),
|
||||
MOCK_METHOD(ray::gcs::NodeResourceInfoAccessor::ResourceMap, GetResourceTotals,
|
||||
(const std::unordered_map<std::string, double> &resource_map_filter),
|
||||
(const, override));
|
||||
MOCK_METHOD(std::string, GetLocalResourceViewString, (), (const, override));
|
||||
};
|
||||
|
|
|
@ -102,7 +102,8 @@ void NewPlacementGroupResourceManager::CommitBundle(
|
|||
const auto &string_id_map = cluster_resource_scheduler_->GetStringIdMap();
|
||||
const auto &task_resource_instances = *bundle_state->resources_;
|
||||
|
||||
for (const auto &resource : bundle_spec.GetFormattedResources()) {
|
||||
const auto &resources = bundle_spec.GetFormattedResources();
|
||||
for (const auto &resource : resources) {
|
||||
const auto &resource_name = resource.first;
|
||||
const auto &original_resource_name = GetOriginalResourceName(resource_name);
|
||||
if (original_resource_name != kBundle_ResourceLabel) {
|
||||
|
@ -115,7 +116,8 @@ void NewPlacementGroupResourceManager::CommitBundle(
|
|||
}
|
||||
}
|
||||
cluster_resource_scheduler_->UpdateLocalAvailableResourcesFromResourceInstances();
|
||||
update_resources_(cluster_resource_scheduler_->GetResourceTotals());
|
||||
update_resources_(
|
||||
cluster_resource_scheduler_->GetResourceTotals(/*resource_name_filter*/ resources));
|
||||
}
|
||||
|
||||
void NewPlacementGroupResourceManager::ReturnBundle(
|
||||
|
|
|
@ -1120,7 +1120,8 @@ double ClusterResourceScheduler::GetLocalAvailableCpus() const {
|
|||
}
|
||||
|
||||
ray::gcs::NodeResourceInfoAccessor::ResourceMap
|
||||
ClusterResourceScheduler::GetResourceTotals() const {
|
||||
ClusterResourceScheduler::GetResourceTotals(
|
||||
const std::unordered_map<std::string, double> &resource_map_filter) const {
|
||||
ray::gcs::NodeResourceInfoAccessor::ResourceMap map;
|
||||
auto it = nodes_.find(local_node_id_);
|
||||
RAY_CHECK(it != nodes_.end());
|
||||
|
@ -1128,6 +1129,10 @@ ClusterResourceScheduler::GetResourceTotals() const {
|
|||
for (size_t i = 0; i < local_resources.predefined_resources.size(); i++) {
|
||||
std::string resource_name = ResourceEnumToString(static_cast<PredefinedResources>(i));
|
||||
double resource_total = local_resources.predefined_resources[i].total.Double();
|
||||
if (resource_map_filter.count(resource_name) == 0u) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (resource_total > 0) {
|
||||
auto data = std::make_shared<rpc::ResourceTableData>();
|
||||
data->set_resource_capacity(resource_total);
|
||||
|
@ -1138,6 +1143,10 @@ ClusterResourceScheduler::GetResourceTotals() const {
|
|||
for (auto entry : local_resources.custom_resources) {
|
||||
std::string resource_name = string_to_int_map_.Get(entry.first);
|
||||
double resource_total = entry.second.total.Double();
|
||||
if (resource_map_filter.count(resource_name) == 0u) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (resource_total > 0) {
|
||||
auto data = std::make_shared<rpc::ResourceTableData>();
|
||||
data->set_resource_capacity(resource_total);
|
||||
|
|
|
@ -404,8 +404,15 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
|
|||
/// fields used.
|
||||
void FillResourceUsage(rpc::ResourcesData &resources_data) override;
|
||||
|
||||
/// Populate a UpdateResourcesRequest. This is inteneded to update the
|
||||
/// resource totals on a node when a custom resource is created or deleted
|
||||
/// (e.g. during the placement group lifecycle).
|
||||
///
|
||||
/// \param resource_map_filter When returning the resource map, the returned result will
|
||||
/// only contain the keys in the filter. Note that only the key of the map is used.
|
||||
/// \return The total resource capacity of the node.
|
||||
ray::gcs::NodeResourceInfoAccessor::ResourceMap GetResourceTotals() const override;
|
||||
ray::gcs::NodeResourceInfoAccessor::ResourceMap GetResourceTotals(
|
||||
const std::unordered_map<std::string, double> &resource_map_filter) const override;
|
||||
|
||||
/// Update last report resources local cache from gcs cache,
|
||||
/// this is needed when gcs fo.
|
||||
|
|
|
@ -78,8 +78,11 @@ class ClusterResourceSchedulerInterface {
|
|||
/// resource totals on a node when a custom resource is created or deleted
|
||||
/// (e.g. during the placement group lifecycle).
|
||||
///
|
||||
/// \param Output parameter. Fills out all fields.
|
||||
virtual ray::gcs::NodeResourceInfoAccessor::ResourceMap GetResourceTotals() const = 0;
|
||||
/// \param resource_map_filter When returning the resource map, the returned result will
|
||||
/// only contain the keys in the filter. Note that only the key of the map is used.
|
||||
/// \return The total resource capacity of the node.
|
||||
virtual ray::gcs::NodeResourceInfoAccessor::ResourceMap GetResourceTotals(
|
||||
const std::unordered_map<std::string, double> &resource_map_filter) const = 0;
|
||||
|
||||
/// Return local resources in human-readable string form.
|
||||
virtual std::string GetLocalResourceViewString() const = 0;
|
||||
|
|
Loading…
Add table
Reference in a new issue