mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Core] fix erase iterator while iterating over a map. (#17204)
(cherry picked from commit 055a90374c
)
This commit is contained in:
parent
c2d70bdb12
commit
f960048631
3 changed files with 24 additions and 6 deletions
|
@ -572,6 +572,8 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) {
|
||||||
|
|
||||||
void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead(
|
void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead(
|
||||||
const JobID &job_id) {
|
const JobID &job_id) {
|
||||||
|
std::vector<PlacementGroupID> groups_to_remove;
|
||||||
|
|
||||||
for (const auto &it : registered_placement_groups_) {
|
for (const auto &it : registered_placement_groups_) {
|
||||||
auto &placement_group = it.second;
|
auto &placement_group = it.second;
|
||||||
if (placement_group->GetCreatorJobId() != job_id) {
|
if (placement_group->GetCreatorJobId() != job_id) {
|
||||||
|
@ -579,13 +581,19 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead(
|
||||||
}
|
}
|
||||||
placement_group->MarkCreatorJobDead();
|
placement_group->MarkCreatorJobDead();
|
||||||
if (placement_group->IsPlacementGroupLifetimeDone()) {
|
if (placement_group->IsPlacementGroupLifetimeDone()) {
|
||||||
RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {});
|
groups_to_remove.push_back(placement_group->GetPlacementGroupID());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const auto &group : groups_to_remove) {
|
||||||
|
RemovePlacementGroup(group, [](Status status) {});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead(
|
void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead(
|
||||||
const ActorID &actor_id) {
|
const ActorID &actor_id) {
|
||||||
|
std::vector<PlacementGroupID> groups_to_remove;
|
||||||
|
|
||||||
for (const auto &it : registered_placement_groups_) {
|
for (const auto &it : registered_placement_groups_) {
|
||||||
auto &placement_group = it.second;
|
auto &placement_group = it.second;
|
||||||
if (placement_group->GetCreatorActorId() != actor_id) {
|
if (placement_group->GetCreatorActorId() != actor_id) {
|
||||||
|
@ -593,9 +601,13 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead(
|
||||||
}
|
}
|
||||||
placement_group->MarkCreatorActorDead();
|
placement_group->MarkCreatorActorDead();
|
||||||
if (placement_group->IsPlacementGroupLifetimeDone()) {
|
if (placement_group->IsPlacementGroupLifetimeDone()) {
|
||||||
RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {});
|
groups_to_remove.push_back(placement_group->GetPlacementGroupID());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const auto &group : groups_to_remove) {
|
||||||
|
RemovePlacementGroup(group, [](Status status) {});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void GcsPlacementGroupManager::CollectStats() const {
|
void GcsPlacementGroupManager::CollectStats() const {
|
||||||
|
|
|
@ -190,10 +190,11 @@ void CreateRequestQueue::RemoveDisconnectedClientRequests(
|
||||||
|
|
||||||
for (auto it = fulfilled_requests_.begin(); it != fulfilled_requests_.end();) {
|
for (auto it = fulfilled_requests_.begin(); it != fulfilled_requests_.end();) {
|
||||||
if (it->second && it->second->client == client) {
|
if (it->second && it->second->client == client) {
|
||||||
fulfilled_requests_.erase(it);
|
fulfilled_requests_.erase(it++);
|
||||||
}
|
} else {
|
||||||
it++;
|
it++;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace plasma
|
} // namespace plasma
|
||||||
|
|
|
@ -300,6 +300,8 @@ bool Publisher::UnregisterSubscriberInternal(const SubscriberID &subscriber_id)
|
||||||
|
|
||||||
void Publisher::CheckDeadSubscribers() {
|
void Publisher::CheckDeadSubscribers() {
|
||||||
absl::MutexLock lock(&mutex_);
|
absl::MutexLock lock(&mutex_);
|
||||||
|
std::vector<SubscriberID> dead_subscribers;
|
||||||
|
|
||||||
for (const auto &it : subscribers_) {
|
for (const auto &it : subscribers_) {
|
||||||
const auto &subscriber = it.second;
|
const auto &subscriber = it.second;
|
||||||
|
|
||||||
|
@ -308,13 +310,16 @@ void Publisher::CheckDeadSubscribers() {
|
||||||
RAY_CHECK(!(disconnected && active_connection_timed_out));
|
RAY_CHECK(!(disconnected && active_connection_timed_out));
|
||||||
|
|
||||||
if (disconnected) {
|
if (disconnected) {
|
||||||
const auto &subscriber_id = it.first;
|
dead_subscribers.push_back(it.first);
|
||||||
UnregisterSubscriberInternal(subscriber_id);
|
|
||||||
} else if (active_connection_timed_out) {
|
} else if (active_connection_timed_out) {
|
||||||
// Refresh the long polling connection. The subscriber will send it again.
|
// Refresh the long polling connection. The subscriber will send it again.
|
||||||
subscriber->PublishIfPossible(/*force*/ true);
|
subscriber->PublishIfPossible(/*force*/ true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const auto &subscriber_id : dead_subscribers) {
|
||||||
|
UnregisterSubscriberInternal(subscriber_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Publisher::CheckNoLeaks() const {
|
bool Publisher::CheckNoLeaks() const {
|
||||||
|
|
Loading…
Add table
Reference in a new issue