[core] Fixing of actor creation failure (#15411)

* Fix

* fix

* format

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* format

* fix comments
This commit is contained in:
Yi Cheng 2021-04-20 15:27:45 -07:00 committed by GitHub
parent d7e31c0d13
commit dbba3a456f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 41 deletions

View file

@ -2759,13 +2759,16 @@ void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *rep
rpc::SendReplyCallback send_reply_callback) {
bool own_objects = reference_counter_->OwnObjects();
// Fail the request if it owns any object.
if (own_objects) {
reply->set_success(false);
} else {
reply->set_success(true);
Exit(rpc::WorkerExitType::INTENDED_EXIT);
}
send_reply_callback(Status::OK(), nullptr, nullptr);
reply->set_success(!own_objects);
send_reply_callback(Status::OK(),
[own_objects, this]() {
// If it doesn't own objects, we'll exit it
if (!own_objects) {
Exit(rpc::WorkerExitType::INTENDED_EXIT);
}
},
// We need to kill it if grpc failed.
[this]() { Exit(rpc::WorkerExitType::INTENDED_EXIT); });
}
void CoreWorker::YieldCurrentFiber(FiberEvent &event) {

View file

@ -742,45 +742,49 @@ void WorkerPool::TryKillingIdleWorkers() {
<< " has been idle for a a while. Kill it.";
// To avoid object lost issue caused by forcibly killing, send an RPC request to the
// worker to allow it to do cleanup before exiting.
auto rpc_client = worker->rpc_client();
RAY_CHECK(rpc_client);
rpc::ExitRequest request;
rpc_client->Exit(request, [this, worker](const ray::Status &status,
const rpc::ExitReply &r) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send exit request: " << status.ToString();
}
if (status.ok() && r.success()) {
auto &worker_state = GetStateForLanguage(worker->GetLanguage());
// If we could kill the worker properly, we remove them from the idle pool.
if (RemoveWorker(worker_state.idle, worker)) {
// If the worker is not idle at this moment, we don't mark them dead.
// In this case, the core worker will exit the process after
// finishing the assigned task, and DisconnectWorker will handle this
// part.
if (!worker->IsDead()) {
worker->MarkDead();
}
}
} else {
// We re-insert the idle worker to the back of the queue if it fails to kill the
// worker (e.g., when the worker owns the object). Without this, if the first N
// workers own objects, it can't kill idle workers that are >= N+1.
const auto &idle_pair = idle_of_all_languages_.front();
idle_of_all_languages_.push_back(idle_pair);
idle_of_all_languages_.pop_front();
RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size());
}
RAY_CHECK(pending_exit_idle_workers_.count(worker->WorkerId()));
RAY_CHECK(pending_exit_idle_workers_.erase(worker->WorkerId()));
});
if (!worker->IsDead()) {
// Register the worker to pending exit so that we can correctly calculate the
// running_size.
pending_exit_idle_workers_.emplace(worker->WorkerId(), worker);
auto rpc_client = worker->rpc_client();
RAY_CHECK(rpc_client);
RAY_CHECK(running_size > 0);
running_size--;
rpc::ExitRequest request;
rpc_client->Exit(request, [this, worker](const ray::Status &status,
const rpc::ExitReply &r) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send exit request: " << status.ToString();
}
// In case of failed to send request, we remove it from pool as well
// TODO (iycheng): We should handle the grpc failure in better way.
if (!status.ok() || r.success()) {
auto &worker_state = GetStateForLanguage(worker->GetLanguage());
// If we could kill the worker properly, we remove them from the idle pool.
RemoveWorker(worker_state.idle, worker);
// We always mark the worker as dead.
// If the worker is not idle at this moment, we'd want to mark it as dead
// so it won't be reused later.
if (!worker->IsDead()) {
worker->MarkDead();
}
} else {
// We re-insert the idle worker to the back of the queue if it fails to kill
// the worker (e.g., when the worker owns the object). Without this, if the
// first N workers own objects, it can't kill idle workers that are >= N+1.
const auto &idle_pair = idle_of_all_languages_.front();
idle_of_all_languages_.push_back(idle_pair);
idle_of_all_languages_.pop_front();
RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size());
}
RAY_CHECK(pending_exit_idle_workers_.count(worker->WorkerId()));
RAY_CHECK(pending_exit_idle_workers_.erase(worker->WorkerId()));
});
} else {
// Even it's a dead worker, we still need to remove them from the pool.
RemoveWorker(worker_state.idle, worker);
}
}
}
@ -845,7 +849,12 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
it++) {
if (task_spec.GetLanguage() != it->first->GetLanguage() ||
it->first->GetAssignedJobId() != task_spec.JobId() ||
state.pending_disconnection_workers.count(it->first) > 0) {
state.pending_disconnection_workers.count(it->first) > 0 ||
it->first->IsDead()) {
continue;
}
// These workers are exiting. So skip them.
if (pending_exit_idle_workers_.count(it->first->WorkerId())) {
continue;
}
state.idle.erase(it->first);