[PlacementGroup]Fix node manager release unused bundles bug (#12346)

This commit is contained in:
fangfengbin 2020-11-25 11:02:43 +08:00 committed by GitHub
parent 5934b20b96
commit 1d909321c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -577,20 +577,28 @@ void NodeManager::HandleReleaseUnusedBundles(
} }
// Kill all workers that are currently associated with the unused bundles. // Kill all workers that are currently associated with the unused bundles.
// NOTE: We can't traverse directly with `leased_workers_`, because `DestroyWorker` will
// delete the element of `leased_workers_`. So we need to filter out
// `workers_associated_with_unused_bundles` separately.
std::vector<std::shared_ptr<WorkerInterface>> workers_associated_with_unused_bundles;
for (const auto &worker_it : leased_workers_) { for (const auto &worker_it : leased_workers_) {
auto &worker = worker_it.second; auto &worker = worker_it.second;
if (0 == in_use_bundles.count(worker->GetBundleId())) { if (0 == in_use_bundles.count(worker->GetBundleId())) {
RAY_LOG(DEBUG) workers_associated_with_unused_bundles.emplace_back(worker);
<< "Destroying worker since its bundle was unused. Placement group id: "
<< worker->GetBundleId().first
<< ", bundle index: " << worker->GetBundleId().second
<< ", task id: " << worker->GetAssignedTaskId()
<< ", actor id: " << worker->GetActorId()
<< ", worker id: " << worker->WorkerId();
DestroyWorker(worker);
} }
} }
for (const auto &worker : workers_associated_with_unused_bundles) {
RAY_LOG(DEBUG)
<< "Destroying worker since its bundle was unused. Placement group id: "
<< worker->GetBundleId().first
<< ", bundle index: " << worker->GetBundleId().second
<< ", task id: " << worker->GetAssignedTaskId()
<< ", actor id: " << worker->GetActorId()
<< ", worker id: " << worker->WorkerId();
DestroyWorker(worker);
}
// Return unused bundle resources. // Return unused bundle resources.
for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) { for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) {
if (0 == in_use_bundles.count(iter->first)) { if (0 == in_use_bundles.count(iter->first)) {
@ -1819,11 +1827,14 @@ void NodeManager::HandleCancelResourceReserve(
<< bundle_spec.DebugString(); << bundle_spec.DebugString();
// Kill all workers that are currently associated with the placement group. // Kill all workers that are currently associated with the placement group.
// NOTE: We can't traverse directly with `leased_workers_`, because `DestroyWorker` will
// delete the element of `leased_workers_`. So we need to filter out
// `workers_associated_with_pg` separately.
std::vector<std::shared_ptr<WorkerInterface>> workers_associated_with_pg; std::vector<std::shared_ptr<WorkerInterface>> workers_associated_with_pg;
for (const auto &worker_it : leased_workers_) { for (const auto &worker_it : leased_workers_) {
auto &worker = worker_it.second; auto &worker = worker_it.second;
if (worker->GetBundleId().first == bundle_spec.PlacementGroupId()) { if (worker->GetBundleId().first == bundle_spec.PlacementGroupId()) {
workers_associated_with_pg.push_back(worker); workers_associated_with_pg.emplace_back(worker);
} }
} }
for (const auto &worker : workers_associated_with_pg) { for (const auto &worker : workers_associated_with_pg) {