From f9600486313797df12d941984d4fee9db11b8615 Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Tue, 20 Jul 2021 11:02:55 -0700 Subject: [PATCH] [Core] fix erase iterator while iterating over a map. (#17204) (cherry picked from commit 055a90374c4bba887d0240abefe8ec8d4494847a) --- .../gcs_server/gcs_placement_group_manager.cc | 16 ++++++++++++++-- .../plasma/create_request_queue.cc | 5 +++-- src/ray/pubsub/publisher.cc | 9 +++++++-- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index a9c306966..49ce37b0e 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -572,6 +572,8 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) { void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead( const JobID &job_id) { + std::vector groups_to_remove; + for (const auto &it : registered_placement_groups_) { auto &placement_group = it.second; if (placement_group->GetCreatorJobId() != job_id) { @@ -579,13 +581,19 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead( } placement_group->MarkCreatorJobDead(); if (placement_group->IsPlacementGroupLifetimeDone()) { - RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {}); + groups_to_remove.push_back(placement_group->GetPlacementGroupID()); } } + + for (const auto &group : groups_to_remove) { + RemovePlacementGroup(group, [](Status status) {}); + } } void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead( const ActorID &actor_id) { + std::vector groups_to_remove; + for (const auto &it : registered_placement_groups_) { auto &placement_group = it.second; if (placement_group->GetCreatorActorId() != actor_id) { @@ -593,9 +601,13 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead( } placement_group->MarkCreatorActorDead(); if (placement_group->IsPlacementGroupLifetimeDone()) { - RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {}); + groups_to_remove.push_back(placement_group->GetPlacementGroupID()); } } + + for (const auto &group : groups_to_remove) { + RemovePlacementGroup(group, [](Status status) {}); + } } void GcsPlacementGroupManager::CollectStats() const { diff --git a/src/ray/object_manager/plasma/create_request_queue.cc b/src/ray/object_manager/plasma/create_request_queue.cc index ca086a54a..28aca0a0e 100644 --- a/src/ray/object_manager/plasma/create_request_queue.cc +++ b/src/ray/object_manager/plasma/create_request_queue.cc @@ -190,9 +190,10 @@ void CreateRequestQueue::RemoveDisconnectedClientRequests( for (auto it = fulfilled_requests_.begin(); it != fulfilled_requests_.end();) { if (it->second && it->second->client == client) { - fulfilled_requests_.erase(it); + fulfilled_requests_.erase(it++); + } else { + it++; } - it++; } } diff --git a/src/ray/pubsub/publisher.cc b/src/ray/pubsub/publisher.cc index ac6b9fa9f..1d9c4f18d 100644 --- a/src/ray/pubsub/publisher.cc +++ b/src/ray/pubsub/publisher.cc @@ -300,6 +300,8 @@ bool Publisher::UnregisterSubscriberInternal(const SubscriberID &subscriber_id) void Publisher::CheckDeadSubscribers() { absl::MutexLock lock(&mutex_); + std::vector dead_subscribers; + for (const auto &it : subscribers_) { const auto &subscriber = it.second; @@ -308,13 +310,16 @@ void Publisher::CheckDeadSubscribers() { RAY_CHECK(!(disconnected && active_connection_timed_out)); if (disconnected) { - const auto &subscriber_id = it.first; - UnregisterSubscriberInternal(subscriber_id); + dead_subscribers.push_back(it.first); } else if (active_connection_timed_out) { // Refresh the long polling connection. The subscriber will send it again. subscriber->PublishIfPossible(/*force*/ true); } } + + for (const auto &subscriber_id : dead_subscribers) { + UnregisterSubscriberInternal(subscriber_id); + } } bool Publisher::CheckNoLeaks() const {