fix testActorRestart failure bug (#8613)

This commit is contained in:
fangfengbin 2020-05-27 11:10:45 +08:00 committed by GitHub
parent 8da00af74e
commit 99dd6a581d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -183,15 +183,17 @@ void GcsServer::InitGcsActorManager() {
// the GCS.
gcs_actor_manager_->OnNodeDead(ClientID::FromBinary(node->node_id()));
});
RAY_CHECK_OK(redis_gcs_client_->Workers().AsyncSubscribeToWorkerFailures(
[this](const WorkerID &id, const rpc::WorkerFailureData &worker_failure_data) {
auto on_subscribe = [this](const std::string &id, const std::string &data) {
rpc::WorkerFailureData worker_failure_data;
worker_failure_data.ParseFromString(data);
auto &worker_address = worker_failure_data.worker_address();
WorkerID worker_id = WorkerID::FromBinary(worker_address.worker_id());
WorkerID worker_id = WorkerID::FromBinary(id);
ClientID node_id = ClientID::FromBinary(worker_address.raylet_id());
gcs_actor_manager_->OnWorkerDead(node_id, worker_id,
worker_failure_data.intentional_disconnect());
},
/*done_callback=*/nullptr));
};
RAY_CHECK_OK(gcs_pub_sub_->SubscribeAll(WORKER_FAILURE_CHANNEL, on_subscribe, nullptr));
}
std::unique_ptr<rpc::JobInfoHandler> GcsServer::InitJobInfoHandler() {