Ensure job registered first before return. (#19307)

## Why are these changes needed?
Before this PR, there is a race condition where:
- job register starts
- driver start to launch actor
- gcs register actor ===> crash
- job register ends

Actor registration should be forced to be after driver registration. This PR enforces that.

## Related issue number
Closes #19172
This commit is contained in:
Yi Cheng 2021-10-12 11:26:58 -07:00 committed by GitHub
parent 0ab6749602
commit bce6a498f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1101,12 +1101,21 @@ void NodeManager::ProcessRegisterClientRequestMessage(
worker->AssignTaskId(driver_task_id);
rpc::JobConfig job_config;
job_config.ParseFromString(message->serialized_job_config()->str());
Status status = worker_pool_.RegisterDriver(worker, job_config, send_reply_callback);
// Send the reply callback only after registration fully completes at the GCS.
auto cb = [this, worker_ip_address, pid, job_id, job_config,
send_reply_callback = std::move(send_reply_callback)](const Status &status,
int assigned_port) {
if (status.ok()) {
auto job_data_ptr = gcs::CreateJobTableData(job_id, /*is_dead*/ false,
worker_ip_address, pid, job_config);
RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(job_data_ptr, nullptr));
RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(
job_data_ptr,
[send_reply_callback = std::move(send_reply_callback), assigned_port](
Status status) { send_reply_callback(status, assigned_port); }));
}
};
RAY_UNUSED(worker_pool_.RegisterDriver(worker, job_config, std::move(cb)));
}
}