[gcs] Fix actor killing race condition (#17456)

This commit is contained in:
Yi Cheng 2021-08-04 10:37:56 -07:00 committed by GitHub
parent a2b0d2f99f
commit 521457b51b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 2 deletions

View file

@ -184,5 +184,47 @@ def test_fair_queueing(shutdown_only):
assert len(ready) == 1000, len(ready)
def test_actor_killing(shutdown_only):
# This is to test create and kill an actor immediately
import ray
ray.init(num_cpus=1)
@ray.remote(num_cpus=1)
class Actor:
def foo(self):
return None
worker_1 = Actor.remote()
ray.kill(worker_1)
worker_2 = Actor.remote()
assert ray.get(worker_2.foo.remote()) is None
ray.kill(worker_2)
worker_1 = Actor.options(max_restarts=1).remote()
ray.kill(worker_1, no_restart=False)
assert ray.get(worker_1.foo.remote()) is None
ray.kill(worker_1, no_restart=False)
worker_2 = Actor.remote()
assert ray.get(worker_2.foo.remote()) is None
def test_actor_scheduling(shutdown_only):
ray.init()
@ray.remote
class A:
def run_fail(self):
ray.actor.exit_actor()
def get(self):
return 1
a = A.remote()
a.run_fail.remote()
with pytest.raises(Exception):
ray.get([a.get.remote()])
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -586,12 +586,10 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
}
}
}
// The actor is already dead, most likely due to process or node failure.
if (actor->GetState() == rpc::ActorTableData::DEAD) {
return;
}
if (actor->GetState() == rpc::ActorTableData::DEPENDENCIES_UNREADY) {
// The actor creation task still has unresolved dependencies. Remove from the
// unresolved actors map.

View file

@ -440,6 +440,18 @@ void RayletBasedActorScheduler::HandleWorkerLeaseReply(
// gcs_actor_manager will reconstruct it again.
auto node_id = NodeID::FromBinary(node->node_id());
auto iter = node_to_actors_when_leasing_.find(node_id);
auto kill_worker = [&reply, this]() {
auto &worker_address = reply.worker_address();
if (!worker_address.raylet_id().empty()) {
auto cli = core_worker_clients_.GetOrConnect(worker_address);
rpc::KillActorRequest request;
// Set it to be Nil() since it hasn't been setup yet
request.set_intended_actor_id(ActorID::Nil().Binary());
request.set_force_kill(true);
request.set_no_restart(true);
RAY_UNUSED(cli->KillActor(request, nullptr));
}
};
if (iter != node_to_actors_when_leasing_.end()) {
auto actor_iter = iter->second.find(actor->GetActorID());
if (actor_iter == iter->second.end()) {
@ -450,6 +462,9 @@ void RayletBasedActorScheduler::HandleWorkerLeaseReply(
<< actor->GetActorID()
<< " has been already cancelled. The response will be ignored. Job id = "
<< actor->GetActorID().JobId();
if (actor->GetState() == rpc::ActorTableData::DEAD) {
kill_worker();
}
return;
}
@ -477,6 +492,8 @@ void RayletBasedActorScheduler::HandleWorkerLeaseReply(
} else {
RetryLeasingWorkerFromNode(actor, node);
}
} else if (actor->GetState() == rpc::ActorTableData::DEAD) {
kill_worker();
}
}