[GCS]Fix GCS actor manager idempotent bug (#11003)

* [GCS]Fix GCS actor manager idempotent bug

* fix review comment

* fix review comment

* fix review comments

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin 2020-09-28 12:12:42 +08:00 committed by GitHub
parent 1e39c40370
commit 86e5db4d59
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -387,25 +387,28 @@ void GcsActorManager::HandleGetActorCheckpointID(
Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request,
RegisterActorCallback success_callback) {
// NOTE: After the abnormal recovery of the network between GCS client and GCS server or
// the GCS server is restarted, it is required to continue to register actor
// successfully.
RAY_CHECK(success_callback);
const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec();
auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id());
auto iter = registered_actors_.find(actor_id);
if (iter != registered_actors_.end() &&
iter->second->GetState() == rpc::ActorTableData::ALIVE) {
// In case of temporary network failures, workers will re-send multiple duplicate
// requests to GCS server.
// In this case, we can just reply.
success_callback(iter->second);
return Status::OK();
}
auto pending_register_iter = actor_to_register_callbacks_.find(actor_id);
if (pending_register_iter != actor_to_register_callbacks_.end()) {
// It is a duplicate message, just mark the callback as pending and invoke it after
// the actor has been flushed to the storage.
pending_register_iter->second.emplace_back(std::move(success_callback));
if (iter != registered_actors_.end()) {
auto pending_register_iter = actor_to_register_callbacks_.find(actor_id);
if (pending_register_iter != actor_to_register_callbacks_.end()) {
// 1. The GCS client sends the `RegisterActor` request to the GCS server.
// 2. The GCS client receives some network errors.
// 3. The GCS client resends the `RegisterActor` request to the GCS server.
pending_register_iter->second.emplace_back(std::move(success_callback));
} else {
// 1. The GCS client sends the `RegisterActor` request to the GCS server.
// 2. The GCS server flushes the actor to the storage and restarts before replying
// to the GCS client.
// 3. The GCS client resends the `RegisterActor` request to the GCS server.
success_callback(iter->second);
}
return Status::OK();
}
@ -422,7 +425,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
}
actor_to_register_callbacks_[actor_id].emplace_back(std::move(success_callback));
RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second);
registered_actors_.emplace(actor->GetActorID(), actor);
const auto &owner_address = actor->GetOwnerAddress();
auto node_id = NodeID::FromBinary(owner_address.raylet_id());
@ -469,13 +472,20 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request,
CreateActorCallback callback) {
// NOTE: After the abnormal recovery of the network between GCS client and GCS server or
// the GCS server is restarted, it is required to continue to create actor
// successfully.
RAY_CHECK(callback);
const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec();
auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id());
auto iter = registered_actors_.find(actor_id);
if (iter != registered_actors_.end() &&
iter->second->GetState() == rpc::ActorTableData::ALIVE) {
if (iter == registered_actors_.end()) {
RAY_LOG(WARNING) << "Actor " << actor_id << " may be already destroyed.";
return Status::Invalid("Actor may be already destroyed.");
}
if (iter->second->GetState() == rpc::ActorTableData::ALIVE) {
// In case of temporary network failures, workers will re-send multiple duplicate
// requests to GCS server.
// In this case, we can just reply.
@ -494,6 +504,15 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request,
// created.
actor_to_create_callbacks_[actor_id].emplace_back(std::move(callback));
// If GCS restarts while processing `CreateActor` request, GCS client will resend the
// `CreateActor` request.
// After GCS restarts, the state of the actor may not be `DEPENDENCIES_UNREADY`.
if (iter->second->GetState() != rpc::ActorTableData::DEPENDENCIES_UNREADY) {
RAY_LOG(INFO) << "Actor " << actor_id
<< " is already in the process of creation. Skip it directly.";
return Status::OK();
}
// Remove the actor from the unresolved actor map.
auto actor = std::make_shared<GcsActor>(request.task_spec());
actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::PENDING_CREATION);