[Placement Group] Allow scheduling a task on any bundle (-1, default) (#9885)

* wip

* wip

* fix tests

* wip

* wip

* wip

* wip

* wip

* add test

* update

* update

* remov debug

* comments
This commit is contained in:
Eric Liang 2020-08-06 00:05:21 -07:00 committed by GitHub
parent 45597e3158
commit 7d4f204aa8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 296 additions and 101 deletions

View file

@ -398,24 +398,22 @@ class ActorClass:
return ActorOptionWrapper()
def _remote(
self,
args=None,
kwargs=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
is_direct_call=None,
max_concurrency=None,
max_restarts=None,
max_task_retries=None,
name=None,
detached=False,
placement_group_id=None,
# TODO(ekl) set default to -1 once we support -1 as "any index"
placement_group_bundle_index=0):
def _remote(self,
args=None,
kwargs=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
is_direct_call=None,
max_concurrency=None,
max_restarts=None,
max_task_retries=None,
name=None,
detached=False,
placement_group_id=None,
placement_group_bundle_index=-1):
"""Create an actor.
This method allows more flexibility than the remote method because
@ -443,7 +441,8 @@ class ActorClass:
placement_group_id: the placement group this actor belongs to,
or None if it doesn't belong to any group.
placement_group_bundle_index: the index of the bundle
if the actor belongs to a placement group.
if the actor belongs to a placement group, which may be -1 to
specify any available bundle.
Returns:
A handle to the newly created actor.

View file

@ -139,21 +139,19 @@ class RemoteFunction:
return FuncWrapper()
def _remote(
self,
args=None,
kwargs=None,
num_return_vals=None,
is_direct_call=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
max_retries=None,
placement_group_id=None,
# TODO(ekl) set default to -1 once we support -1 as "any index"
placement_group_bundle_index=0):
def _remote(self,
args=None,
kwargs=None,
num_return_vals=None,
is_direct_call=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
max_retries=None,
placement_group_id=None,
placement_group_bundle_index=-1):
"""Submit the remote function for execution."""
worker = ray.worker.global_worker
worker.check_connected()

View file

@ -135,6 +135,17 @@ def test_placement_group_task_resource_ids(ray_start_cluster):
resources = ray.get(o1)
assert len(resources) == 1, resources
assert "CPU_group_" in list(resources.keys())[0], resources
assert "CPU_group_0_" not in list(resources.keys())[0], resources
# Now retry with a bundle index constraint.
o1 = f.options(
placement_group_id=g1, placement_group_bundle_index=0).remote()
resources = ray.get(o1)
assert len(resources) == 2, resources
keys = list(resources.keys())
assert "CPU_group_" in keys[0], resources
assert "CPU_group_" in keys[1], resources
assert "CPU_group_0_" in keys[0] or "CPU_group_0_" in keys[1], resources
def test_placement_group_hang(ray_start_cluster):

View file

@ -1774,7 +1774,7 @@ def make_decorator(num_return_vals=None,
max_task_retries=None,
worker=None,
placement_group_id=None,
placement_group_bundle_index=0):
placement_group_bundle_index=-1):
def decorator(function_or_class):
if (inspect.isfunction(function_or_class)
or is_cython(function_or_class)):
@ -1864,7 +1864,8 @@ def remote(*args, **kwargs):
* **placement_group_id**: the placement group this task belongs to,
or None if it doesn't belong to any group.
* **placement_group_bundle_index**: the index of the bundle
if the task belongs to a placement group.
if the task belongs to a placement group, which may be -1 to indicate
any available bundle.
This can be done as follows:

View file

@ -60,10 +60,16 @@ std::string BundleSpecification::DebugString() const {
}
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
PlacementGroupID group_id,
const PlacementGroupID &group_id,
int64_t bundle_index) {
auto str = original_resource_name + "_group_" + group_id.Hex() + "_" +
std::to_string(bundle_index);
std::string str;
if (bundle_index >= 0) {
str = original_resource_name + "_group_" + std::to_string(bundle_index) + "_" +
group_id.Hex();
} else {
RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index;
str = original_resource_name + "_group_" + group_id.Hex();
}
RAY_CHECK(GetOriginalResourceName(str) == original_resource_name) << str;
return str;
}
@ -74,6 +80,12 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na
original_resource_name, bundle_spec.PlacementGroupId(), bundle_spec.Index());
}
bool IsBundleIndex(const std::string &resource, const PlacementGroupID &group_id,
const int bundle_index) {
return resource.find("_group_" + std::to_string(bundle_index) + "_" + group_id.Hex()) !=
std::string::npos;
}
std::string GetOriginalResourceName(const std::string &resource) {
auto idx = resource.find("_group_");
RAY_CHECK(idx >= 0) << "This isn't a placement group resource " << resource;

View file

@ -95,12 +95,17 @@ class BundleSpecification : public MessageWrapper<rpc::Bundle> {
/// Format a placement group resource, e.g., CPU -> CPU_group_YYY_i
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
PlacementGroupID group_id, int64_t bundle_index);
const PlacementGroupID &group_id,
int64_t bundle_index = -1);
/// Format a placement group resource, e.g., CPU -> CPU_group_YYY_i
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const BundleSpecification &bundle_spec);
/// Return whether a formatted resource is a bundle of the given index.
bool IsBundleIndex(const std::string &resource, const PlacementGroupID &group_id,
const int bundle_index);
/// Return the original resource name of the placement group resource.
std::string GetOriginalResourceName(const std::string &resource);

View file

@ -3,6 +3,8 @@
#include <cmath>
#include <sstream>
#include "absl/container/flat_hash_map.h"
#include "ray/common/bundle_spec.h"
#include "ray/util/logging.h"
@ -227,29 +229,48 @@ void ResourceSet::AddResources(const ResourceSet &other) {
}
}
void ResourceSet::AddBundleResources(const PlacementGroupID &bundle_id,
void ResourceSet::AddBundleResources(const PlacementGroupID &group_id,
const int bundle_index, const ResourceSet &other) {
for (const auto &resource_pair : other.GetResourceAmountMap()) {
// With bundle index (e.g., CPU_group_i_zzz).
const std::string &resource_label =
FormatPlacementGroupResource(resource_pair.first, bundle_id, bundle_index);
FormatPlacementGroupResource(resource_pair.first, group_id, bundle_index);
const FractionalResourceQuantity &resource_capacity = resource_pair.second;
resource_capacity_[resource_label] += resource_capacity;
// Without bundle index (e.g., CPU_group_zzz).
const std::string &wildcard_label =
FormatPlacementGroupResource(resource_pair.first, group_id, -1);
resource_capacity_[wildcard_label] += resource_capacity;
}
}
void ResourceSet::ReturnBundleResources(const PlacementGroupID &bundle_id,
void ResourceSet::ReturnBundleResources(const PlacementGroupID &group_id,
const int bundle_index) {
absl::flat_hash_map<std::string, FractionalResourceQuantity> to_restore;
for (auto iter = resource_capacity_.begin(); iter != resource_capacity_.end();) {
const std::string &bundle_resource_label = iter->first;
if (bundle_resource_label.find(bundle_id.Hex()) != std::string::npos) {
// We only consider the indexed resources, ignoring the wildcard resource.
// This is because when multiple bundles are created on one node, the quantity
// of the wildcard resources contains resources from multiple bundles.
if (IsBundleIndex(bundle_resource_label, group_id, bundle_index)) {
const std::string &resource_label = GetOriginalResourceName(bundle_resource_label);
const FractionalResourceQuantity &resource_capacity = iter->second;
resource_capacity_[resource_label] += resource_capacity;
to_restore[resource_label] = resource_capacity;
iter = resource_capacity_.erase(iter);
} else {
iter++;
}
}
// For each matching resource to restore (e.g., key like CPU, GPU).
for (const auto &pair : to_restore) {
resource_capacity_[pair.first] += pair.second;
auto wildcard_resource = FormatPlacementGroupResource(pair.first, group_id, -1);
resource_capacity_[wildcard_resource] -= pair.second;
if (resource_capacity_[wildcard_resource] <= 0) {
resource_capacity_.erase(wildcard_resource);
}
}
}
FractionalResourceQuantity ResourceSet::GetResource(
@ -665,26 +686,41 @@ void ResourceIdSet::AddOrUpdateResource(const std::string &resource_name,
}
}
void ResourceIdSet::AddBundleResource(const std::string &resource_name,
ResourceIds &resource_ids) {
available_resources_[resource_name] = resource_ids;
void ResourceIdSet::AddBundleResourceIds(const PlacementGroupID &group_id,
const int bundle_index,
const std::string &resource_name,
ResourceIds &resource_ids) {
auto index_name = FormatPlacementGroupResource(resource_name, group_id, bundle_index);
auto wildcard_name = FormatPlacementGroupResource(resource_name, group_id, -1);
available_resources_[index_name] = available_resources_[index_name].Plus(resource_ids);
available_resources_[wildcard_name] =
available_resources_[wildcard_name].Plus(resource_ids);
}
void ResourceIdSet::CancelResourceReserve(const std::string &resource_name) {
std::string origin_resource_name = GetOriginalResourceName(resource_name);
auto iter_orig = available_resources_.find(origin_resource_name);
auto iter_bundle = available_resources_.find(resource_name);
if (iter_bundle == available_resources_.end()) {
void ResourceIdSet::ReturnBundleResources(const PlacementGroupID &group_id,
const int bundle_index,
const std::string &original_resource_name) {
auto index_resource_name =
FormatPlacementGroupResource(original_resource_name, group_id, bundle_index);
auto iter_index = available_resources_.find(index_resource_name);
if (iter_index == available_resources_.end()) {
return;
} else {
if (iter_orig == available_resources_.end()) {
available_resources_[origin_resource_name] = iter_bundle->second;
} else {
iter_orig->second.Release(iter_bundle->second);
}
available_resources_.erase(iter_bundle);
}
// Erase and transfer the index bundle resource back to the original.
auto bundle_ids = iter_index->second;
available_resources_.erase(iter_index);
available_resources_[original_resource_name] =
(available_resources_[original_resource_name].Plus(bundle_ids));
// Also erase the the equivalent number of units from the wildcard resource.
auto wildcard_name = FormatPlacementGroupResource(original_resource_name, group_id, -1);
available_resources_[wildcard_name].Acquire(bundle_ids.TotalQuantity());
if (available_resources_[wildcard_name].TotalQuantityIsZero()) {
available_resources_.erase(wildcard_name);
}
}
void ResourceIdSet::DeleteResource(const std::string &resource_name) {
available_resources_.erase(resource_name);
}
@ -837,19 +873,19 @@ void SchedulingResources::UpdateResourceCapacity(const std::string &resource_nam
}
}
void SchedulingResources::UpdateBundleResource(const PlacementGroupID &group,
const int bundle_index,
const ResourceSet &resource_set) {
void SchedulingResources::TransferToBundleResources(const PlacementGroupID &group,
const int bundle_index,
const ResourceSet &resource_set) {
resources_available_.SubtractResourcesStrict(resource_set);
resources_available_.AddBundleResources(group, bundle_index, resource_set);
resources_total_.SubtractResourcesStrict(resource_set);
resources_total_.AddBundleResources(group, bundle_index, resource_set);
}
void SchedulingResources::ReturnBundleResource(const PlacementGroupID &group,
const int bundle_index) {
resources_available_.ReturnBundleResources(group, bundle_index);
resources_total_.ReturnBundleResources(group, bundle_index);
void SchedulingResources::ReturnBundleResources(const PlacementGroupID &group_id,
const int bundle_index) {
resources_available_.ReturnBundleResources(group_id, bundle_index);
resources_total_.ReturnBundleResources(group_id, bundle_index);
}
void SchedulingResources::DeleteResource(const std::string &resource_name) {

View file

@ -151,20 +151,25 @@ class ResourceSet {
/// \brief Aggregate resources from the other set into this set, adding any missing
/// resource labels to this set.
///
/// \param bundle_id: The placement group id.
/// This adds both the the indexed and wildcard resources (e.g., both
/// CPU_group_i_zzz and CPU_group_zzz).
///
/// \param group_id: The placement group id.
/// \param bundle_index: The index of the bundle.
/// \param other: The other resource set to add.
/// \return Void.
void AddBundleResources(const PlacementGroupID &bundle_id, const int bundle_index,
void AddBundleResources(const PlacementGroupID &group_id, const int bundle_index,
const ResourceSet &other);
/// \brief Return back all the bundle resource. Changing the resource name and adding
/// any missing resource labels to this set.
///
/// \param bundle_id: The placement group id.
/// \param bundle_index: The index of the bundle.
/// This is the inverse of AddBundleResources().
///
/// \param group_id: The placement group id.
/// \param bundle_index: The bundle index to return resources for.
/// \return Void.
void ReturnBundleResources(const PlacementGroupID &bundle_id, const int bundle_index);
void ReturnBundleResources(const PlacementGroupID &group_id, const int bundle_index);
/// \brief Subtract a set of resources from the current set of resources and
/// check that the post-subtraction result nonnegative. Assumes other
@ -422,17 +427,28 @@ class ResourceIdSet {
/// \param capacity capacity of the resource being added
void AddOrUpdateResource(const std::string &resource_name, int64_t capacity);
/// \brief Add a Bundle resource in the ResourceIdSet.
/// \brief Add a Bundle resource in the ResourceIdSet.
///
/// \param resource_name the name of the resource to create/update
/// This adds both the the indexed and wildcard resources (e.g., both
/// CPU_group_i_zzz and CPU_group_zzz).
///
/// \param group_id: The placement group id.
/// \param bundle_index: The index of the bundle.
/// \param resource_name the name of the resource to create/update (e.g., "CPU").
/// \param resource_ids resource_ids of the resource being added
void AddBundleResource(const std::string &resource_name, ResourceIds &resource_ids);
void AddBundleResourceIds(const PlacementGroupID &group_id, const int bundle_index,
const std::string &resource_name, ResourceIds &resource_ids);
/// \brief remove a Bundle resource in the ResourceIdSet.
/// \brief remove a Bundle resource in the ResourceIdSet.
///
/// \param resource_name the name of the resource to remove.
/// The bundle resources will be returned to their original resource names.
///
/// \param group_id: The placement group id.
/// \param bundle_index: The index of the bundle.
/// \param resource_name the name of the resource to remove (e.g., "CPU").
void ReturnBundleResources(const PlacementGroupID &group_id, const int bundle_index,
const std::string &resource_name);
void CancelResourceReserve(const std::string &resource_name);
/// \brief Deletes a resource in the ResourceIdSet. This does not raise an exception,
/// just deletes the resource. Tasks with acquired resources keep running.
///
@ -556,15 +572,24 @@ class SchedulingResources {
/// \brief Update total, available and load resources with the ResourceIds.
/// Create if not exists.
///
/// The resources will be transfered from their original resource names.
/// This includes both the the indexed and wildcard resources (e.g., both
/// CPU_group_i_zzz and CPU_group_zzz).
///
/// \param resource_name: Name of the resource to be modified
/// \param resource_set: New resource_set of the resource.
void UpdateBundleResource(const PlacementGroupID &group, const int bundle_index,
const ResourceSet &resource_set);
void TransferToBundleResources(const PlacementGroupID &group, const int bundle_index,
const ResourceSet &resource_set);
/// \brief delete total, available and load resources with the ResourceIds.
/// Create if not exists.
/// \param resource_name: Name of the resource to be deleted
void ReturnBundleResource(const PlacementGroupID &group, const int bundle_index);
///
/// The bundle resources will be returned to their original resource names.
/// This is the inverse of TransferToBundleResources().
///
/// \param group_id: Placement group id to delete resources for.
/// \param bundle_index: The bundle index to return resources for.
void ReturnBundleResources(const PlacementGroupID &group_id, const int bundle_index);
/// \brief Delete resource from total, available and load resources.
///

View file

@ -39,19 +39,24 @@ TEST_F(SchedulingResourcesTest, AddBundleResources) {
ResourceSet resource(resource_labels, resource_capacity);
resource_set->AddBundleResources(group_id, 1, resource);
resource_labels.pop_back();
resource_labels.push_back("CPU_group_" + group_id.Hex() + "_1");
resource_labels.push_back("CPU_group_1_" + group_id.Hex());
resource_labels.push_back("CPU_group_" + group_id.Hex());
resource_capacity.push_back(1.0);
ResourceSet result_resource(resource_labels, resource_capacity);
ASSERT_EQ(1, resource_set->IsEqual(result_resource));
}
TEST_F(SchedulingResourcesTest, AddBundleResource) {
PlacementGroupID group_id = PlacementGroupID::FromRandom();
std::string name = "CPU_group_" + group_id.Hex() + "_1";
std::string wild_name = "CPU_group_" + group_id.Hex();
std::string index_name = "CPU_group_1_" + group_id.Hex();
std::vector<int64_t> whole_ids = {1, 2, 3};
ResourceIds resource_ids(whole_ids);
resource_id_set->AddBundleResource(name, resource_ids);
ASSERT_EQ(1, resource_id_set->AvailableResources().size());
ASSERT_EQ(name, resource_id_set->AvailableResources().begin()->first);
resource_id_set->AddBundleResourceIds(group_id, 1, "CPU", resource_ids);
ASSERT_EQ(2, resource_id_set->AvailableResources().size());
for (auto res : resource_id_set->AvailableResources()) {
ASSERT_TRUE(res.first == wild_name || res.first == index_name) << res.first;
}
}
TEST_F(SchedulingResourcesTest, ReturnBundleResources) {
@ -61,10 +66,104 @@ TEST_F(SchedulingResourcesTest, ReturnBundleResources) {
ResourceSet resource(resource_labels, resource_capacity);
resource_set->AddBundleResources(group_id, 1, resource);
resource_labels.pop_back();
resource_labels.push_back("CPU_group_" + group_id.Hex() + "_1");
resource_labels.push_back("CPU_group_" + group_id.Hex());
resource_labels.push_back("CPU_group_1_" + group_id.Hex());
resource_capacity.push_back(1.0);
ResourceSet result_resource(resource_labels, resource_capacity);
ASSERT_EQ(1, resource_set->IsEqual(result_resource));
resource_set->ReturnBundleResources(group_id, 1);
ASSERT_EQ(1, resource_set->IsEqual(resource));
ASSERT_EQ(1, resource_set->IsEqual(resource))
<< resource_set->ToString() << " vs " << resource.ToString();
}
TEST_F(SchedulingResourcesTest, MultipleBundlesAddRemove) {
PlacementGroupID group_id = PlacementGroupID::FromRandom();
std::vector<std::string> resource_labels = {"CPU"};
std::vector<double> resource_capacity = {1.0};
ResourceSet resource(resource_labels, resource_capacity);
// Construct resource set containing two bundles.
resource_set->AddBundleResources(group_id, 1, resource);
resource_set->AddBundleResources(group_id, 2, resource);
resource_labels = {
"CPU_group_" + group_id.Hex(),
"CPU_group_1_" + group_id.Hex(),
"CPU_group_2_" + group_id.Hex(),
};
resource_capacity = {2.0, 1.0, 1.0};
ResourceSet result_resource(resource_labels, resource_capacity);
ASSERT_EQ(1, resource_set->IsEqual(result_resource))
<< resource_set->ToString() << " vs " << result_resource.ToString();
// Return group 2.
resource_set->ReturnBundleResources(group_id, 2);
resource_labels = {
"CPU",
"CPU_group_" + group_id.Hex(),
"CPU_group_1_" + group_id.Hex(),
};
resource_capacity = {1.0, 1.0, 1.0};
ResourceSet result_resource2(resource_labels, resource_capacity);
ASSERT_EQ(1, resource_set->IsEqual(result_resource2))
<< resource_set->ToString() << " vs " << result_resource2.ToString();
// Return group 1.
resource_set->ReturnBundleResources(group_id, 1);
ASSERT_EQ(1, resource_set->IsEqual(ResourceSet({"CPU"}, {2.0})))
<< resource_set->ToString() << " vs " << resource.ToString();
}
TEST_F(SchedulingResourcesTest, MultipleBundlesAddRemoveIdSet) {
PlacementGroupID group_id = PlacementGroupID::FromRandom();
ResourceIdSet resource_ids;
// Construct resource set containing two bundles.
auto rid1 = ResourceIds({1, 2});
auto rid2 = ResourceIds({3, 4});
resource_ids.AddBundleResourceIds(group_id, 1, "CPU", rid1);
resource_ids.AddBundleResourceIds(group_id, 2, "CPU", rid2);
resource_ids.AddBundleResourceIds(group_id, 1, "GPU", rid1);
resource_ids.AddBundleResourceIds(group_id, 2, "GPU", rid2);
auto result = ResourceSet(
{
"CPU_group_" + group_id.Hex(),
"CPU_group_1_" + group_id.Hex(),
"CPU_group_2_" + group_id.Hex(),
"GPU_group_" + group_id.Hex(),
"GPU_group_1_" + group_id.Hex(),
"GPU_group_2_" + group_id.Hex(),
},
{4.0, 2.0, 2.0, 4.0, 2.0, 2.0});
ASSERT_EQ(1, resource_ids.ToResourceSet().IsEqual(result))
<< resource_ids.ToString() << " vs " << result.ToString();
// Remove the first bundle.
resource_ids.ReturnBundleResources(group_id, 1, "CPU");
resource_ids.ReturnBundleResources(group_id, 1, "GPU");
result = ResourceSet(
{
"CPU_group_" + group_id.Hex(),
"CPU",
"CPU_group_2_" + group_id.Hex(),
"GPU_group_" + group_id.Hex(),
"GPU",
"GPU_group_2_" + group_id.Hex(),
},
{2.0, 2.0, 2.0, 2.0, 2.0, 2.0});
ASSERT_EQ(1, resource_ids.ToResourceSet().IsEqual(result))
<< resource_ids.ToString() << " vs " << result.ToString();
// Remove the second bundle.
resource_ids.ReturnBundleResources(group_id, 2, "CPU");
resource_ids.ReturnBundleResources(group_id, 2, "GPU");
result = ResourceSet(
{
"CPU",
"GPU",
},
{4.0, 4.0});
ASSERT_EQ(1, resource_ids.ToResourceSet().IsEqual(result))
<< resource_ids.ToString() << " vs " << result.ToString();
}
} // namespace ray

View file

@ -1173,12 +1173,19 @@ Status CoreWorker::SetResource(const std::string &resource_name, const double ca
std::unordered_map<std::string, double> AddPlacementGroupConstraint(
const std::unordered_map<std::string, double> &resources,
PlacementGroupID placement_group_id, int64_t bundle_index) {
if (bundle_index < 0) {
RAY_CHECK(bundle_index == -1) << "Invalid bundle index " << bundle_index;
}
std::unordered_map<std::string, double> new_resources;
if (placement_group_id != PlacementGroupID::Nil()) {
for (auto iter = resources.begin(); iter != resources.end(); iter++) {
auto new_name =
FormatPlacementGroupResource(iter->first, placement_group_id, bundle_index);
auto new_name = FormatPlacementGroupResource(iter->first, placement_group_id, -1);
new_resources[new_name] = iter->second;
if (bundle_index >= 0) {
auto index_name =
FormatPlacementGroupResource(iter->first, placement_group_id, bundle_index);
new_resources[index_name] = iter->second;
}
}
return new_resources;
}

View file

@ -1835,11 +1835,13 @@ void NodeManager::HandleCancelResourceReserve(
RAY_LOG(DEBUG) << "bundle return resource request " << bundle_spec.BundleId().first
<< bundle_spec.BundleId().second;
auto resource_set = bundle_spec.GetRequiredResources();
// TODO(ekl) doesn't this not return in-use resources? We need to be able to
// reclaim those somehow (i.e., destroy the workers allocated in the bundle).
for (auto resource : resource_set.GetResourceMap()) {
std::string resource_name = FormatPlacementGroupResource(resource.first, bundle_spec);
local_available_resources_.CancelResourceReserve(resource_name);
local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(),
bundle_spec.Index(), resource.first);
}
cluster_resource_map_[self_node_id_].ReturnBundleResource(
cluster_resource_map_[self_node_id_].ReturnBundleResources(
bundle_spec.PlacementGroupId(), bundle_spec.Index());
send_reply_callback(Status::OK(), nullptr, nullptr);
// Call task dispatch to assign work to the released resources.
@ -2010,13 +2012,13 @@ ResourceIdSet NodeManager::ScheduleBundle(
acquired_resources =
local_available_resources_.Acquire(bundle_spec.GetRequiredResources());
for (auto resource : acquired_resources.AvailableResources()) {
std::string resource_name =
FormatPlacementGroupResource(resource.first, bundle_spec);
local_available_resources_.AddBundleResource(resource_name, resource.second);
local_available_resources_.AddBundleResourceIds(bundle_spec.PlacementGroupId(),
bundle_spec.Index(), resource.first,
resource.second);
}
resource_map[self_node_id_].UpdateBundleResource(bundle_spec.PlacementGroupId(),
bundle_spec.Index(),
bundle_spec.GetRequiredResources());
resource_map[self_node_id_].TransferToBundleResources(
bundle_spec.PlacementGroupId(), bundle_spec.Index(),
bundle_spec.GetRequiredResources());
}
return acquired_resources;
}