diff --git a/java/test/src/main/java/io/ray/test/KillActorTest.java b/java/test/src/main/java/io/ray/test/KillActorTest.java index fd92b9711..753b00a9c 100644 --- a/java/test/src/main/java/io/ray/test/KillActorTest.java +++ b/java/test/src/main/java/io/ray/test/KillActorTest.java @@ -59,6 +59,8 @@ public class KillActorTest extends BaseTest { private void testKillActor(BiConsumer, Boolean> kill, boolean noRestart) { ActorHandle actor = Ray.actor(HangActor::new).setMaxRestarts(1).remote(); + // Wait for the actor to be created. + actor.task(HangActor::ping).remote().get(); ObjectRef result = actor.task(HangActor::hang).remote(); // The actor will hang in this task. Assert.assertEquals(0, Ray.wait(ImmutableList.of(result), 1, 500).getReady().size()); diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 1913decf8..496e977fe 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -1093,6 +1093,90 @@ def test_actor_resource_demand(shutdown_only): global_state_accessor.disconnect() +def test_kill_pending_actor_with_no_restart_true(): + cluster = ray.init() + global_state_accessor = GlobalStateAccessor( + cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD) + global_state_accessor.connect() + + @ray.remote(resources={"WORKER": 1.0}) + class PendingActor: + pass + + # Kill actor with `no_restart=True`. + actor = PendingActor.remote() + # TODO(ffbin): The raylet doesn't guarantee the order when dealing with + # RequestWorkerLease and CancelWorkerLease. If we kill the actor + # immediately after creating the actor, we may not be able to clean up + # the request cached by the raylet. + # See https://github.com/ray-project/ray/issues/13545 for details. + time.sleep(1) + ray.kill(actor, no_restart=True) + + def condition1(): + message = global_state_accessor.get_all_resource_usage() + resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString( + message) + if len(resource_usages.resource_load_by_shape.resource_demands) == 0: + return True + return False + + # Actor is dead, so the infeasible task queue length is 0. + wait_for_condition(condition1, timeout=10) + + global_state_accessor.disconnect() + ray.shutdown() + + +def test_kill_pending_actor_with_no_restart_false(): + cluster = ray.init() + global_state_accessor = GlobalStateAccessor( + cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD) + global_state_accessor.connect() + + @ray.remote(resources={"WORKER": 1.0}, max_restarts=1) + class PendingActor: + pass + + # Kill actor with `no_restart=False`. + actor = PendingActor.remote() + # TODO(ffbin): The raylet doesn't guarantee the order when dealing with + # RequestWorkerLease and CancelWorkerLease. If we kill the actor + # immediately after creating the actor, we may not be able to clean up + # the request cached by the raylet. + # See https://github.com/ray-project/ray/issues/13545 for details. + time.sleep(1) + ray.kill(actor, no_restart=False) + + def condition1(): + message = global_state_accessor.get_all_resource_usage() + resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString( + message) + if len(resource_usages.resource_load_by_shape.resource_demands) == 0: + return False + return True + + # Actor restarts, so the infeasible task queue length is 1. + wait_for_condition(condition1, timeout=10) + + # Kill actor again and actor is dead, + # so the infeasible task queue length is 0. + ray.kill(actor, no_restart=False) + + def condition2(): + message = global_state_accessor.get_all_resource_usage() + resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString( + message) + if len(resource_usages.resource_load_by_shape.resource_demands) == 0: + return True + return False + + wait_for_condition(condition2, timeout=10) + + global_state_accessor.disconnect() + ray.shutdown() + + if __name__ == "__main__": import pytest # Test suite is timing out. Disable on windows for now. diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 024ff6c55..92ef90ca4 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -902,8 +902,10 @@ def test_capture_child_actors(ray_start_cluster): # Kill an actor and wait until it is killed. ray.kill(a) - with pytest.raises(ray.exceptions.RayActorError): + try: ray.get(a.ready.remote()) + except ray.exceptions.RayActorError: + pass # Now create an actor, but do not capture the current tasks a = Actor.options( @@ -925,8 +927,10 @@ def test_capture_child_actors(ray_start_cluster): # Kill an actor and wait until it is killed. ray.kill(a) - with pytest.raises(ray.exceptions.RayActorError): + try: ray.get(a.ready.remote()) + except ray.exceptions.RayActorError: + pass # Lastly, make sure when None is specified, actors are not scheduled # on the same placement group. @@ -1416,8 +1420,10 @@ ray.shutdown() # Kill an actor and wait until it is killed. ray.kill(a) - with pytest.raises(ray.exceptions.RayActorError): + try: ray.get(a.ready.remote()) + except ray.exceptions.RayActorError: + pass # We should have 2 alive pgs and 4 alive actors. assert assert_alive_num_pg(2) diff --git a/python/ray/tests/test_queue.py b/python/ray/tests/test_queue.py index 6c2fb5cf0..88cf6d7b6 100644 --- a/python/ray/tests/test_queue.py +++ b/python/ray/tests/test_queue.py @@ -199,17 +199,19 @@ def test_custom_resources(ray_start_regular_shared): assert current_resources["CPU"] == 1.0 # By default an actor should not reserve any resources. - Queue() + q = Queue() current_resources = ray.available_resources() assert current_resources["CPU"] == 1.0 + q.shutdown() # Specify resource requirement. The queue should now reserve 1 CPU. - Queue(actor_options={"num_cpus": 1}) + q = Queue(actor_options={"num_cpus": 1}) def no_cpu_in_resources(): return "CPU" not in ray.available_resources() wait_for_condition(no_cpu_in_resources) + q.shutdown() if __name__ == "__main__": diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 02638ed3d..9fcd3c25f 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -470,8 +470,10 @@ def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put, # Test that the actor exiting stops the reference from being pinned. ray.kill(actor) # Wait for the actor to exit. - with pytest.raises(ray.exceptions.RayActorError): + try: ray.get(actor.delete_ref1.remote()) + except ray.exceptions.RayActorError: + pass else: # Test that deleting the second reference stops it from being pinned. ray.get(actor.delete_ref2.remote()) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 6c8287c15..f7c663b50 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1629,7 +1629,9 @@ Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_r stream << "Failed to find a corresponding actor handle for " << actor_id; return Status::Invalid(stream.str()); } - direct_actor_submitter_->KillActor(actor_id, force_kill, no_restart); + + RAY_CHECK_OK( + gcs_client_->Actors().AsyncKillActor(actor_id, force_kill, no_restart, nullptr)); return Status::OK(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 6fa24c29e..83242c000 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -728,6 +728,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Tell an actor to exit immediately, without completing outstanding work. /// /// \param[in] actor_id ID of the actor to kill. + /// \param[in] force_kill Whether to force kill an actor by killing the worker. /// \param[in] no_restart If set to true, the killed actor will not be /// restarted anymore. /// \param[out] Status diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index be929ec3f..db240b411 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -64,6 +64,16 @@ class ActorInfoAccessor { virtual Status AsyncRegisterActor(const TaskSpecification &task_spec, const StatusCallback &callback) = 0; + /// Kill actor via GCS asynchronously. + /// + /// \param actor_id The ID of actor to destroy. + /// \param force_kill Whether to force kill an actor by killing the worker. + /// \param no_restart If set to true, the killed actor will not be restarted anymore. + /// \param callback Callback that will be called after the actor is destroyed. + /// \return Status + virtual Status AsyncKillActor(const ActorID &actor_id, bool force_kill, bool no_restart, + const StatusCallback &callback) = 0; + /// Asynchronously request GCS to create the actor. /// /// This should be called after the worker has resolved the actor dependencies. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index a82e0ab6b..5905966cb 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -200,6 +200,26 @@ Status ServiceBasedActorInfoAccessor::AsyncRegisterActor( return Status::OK(); } +Status ServiceBasedActorInfoAccessor::AsyncKillActor( + const ActorID &actor_id, bool force_kill, bool no_restart, + const ray::gcs::StatusCallback &callback) { + rpc::KillActorViaGcsRequest request; + request.set_actor_id(actor_id.Binary()); + request.set_force_kill(force_kill); + request.set_no_restart(no_restart); + client_impl_->GetGcsRpcClient().KillActorViaGcs( + request, [callback](const Status &, const rpc::KillActorViaGcsReply &reply) { + if (callback) { + auto status = + reply.status().code() == (int)StatusCode::OK + ? Status() + : Status(StatusCode(reply.status().code()), reply.status().message()); + callback(status); + } + }); + return Status::OK(); +} + Status ServiceBasedActorInfoAccessor::AsyncCreateActor( const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { RAY_CHECK(task_spec.IsActorCreationTask() && callback); diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index c883e7b62..8aab5198f 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -85,6 +85,9 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor { Status AsyncCreateActor(const TaskSpecification &task_spec, const StatusCallback &callback) override; + Status AsyncKillActor(const ActorID &actor_id, bool force_kill, bool no_restart, + const StatusCallback &callback) override; + Status AsyncSubscribeAll( const SubscribeCallback &subscribe, const StatusCallback &done) override; diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 2f3740654..338fc149c 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -214,6 +214,25 @@ void GcsActorManager::HandleGetNamedActorInfo( ++counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST]; } +void GcsActorManager::HandleKillActorViaGcs(const rpc::KillActorViaGcsRequest &request, + rpc::KillActorViaGcsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + const auto &actor_id = ActorID::FromBinary(request.actor_id()); + bool force_kill = request.force_kill(); + bool no_restart = request.no_restart(); + if (no_restart) { + DestroyActor(actor_id); + } else { + KillActor(actor_id, force_kill, no_restart); + } + + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + RAY_LOG(DEBUG) << "Finished killing actor, job id = " << actor_id.JobId() + << ", actor id = " << actor_id << ", force_kill = " << force_kill + << ", no_restart = " << no_restart; + ++counts_[CountType::KILL_ACTOR_REQUEST]; +} + Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request, RegisterActorCallback success_callback) { // NOTE: After the abnormal recovery of the network between GCS client and GCS server or @@ -417,8 +436,11 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { actor_to_register_callbacks_.erase(actor_id); actor_to_create_callbacks_.erase(actor_id); auto it = registered_actors_.find(actor_id); - RAY_CHECK(it != registered_actors_.end()) - << "Tried to destroy actor that does not exist " << actor_id; + if (it == registered_actors_.end()) { + RAY_LOG(INFO) << "Tried to destroy actor that does not exist " << actor_id; + return; + } + const auto &task_id = it->second->GetCreationTaskSpecification().TaskId(); it->second->GetMutableActorTableData()->mutable_task_spec()->Clear(); it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms()); AddDestroyedActorToCache(it->second); @@ -456,38 +478,13 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { if (node_it != created_actors_.end() && node_it->second.count(worker_id)) { // The actor has already been created. Destroy the process by force-killing // it. - KillActor(actor); + NotifyCoreWorkerToKillActor(actor); RAY_CHECK(node_it->second.erase(actor->GetWorkerID())); if (node_it->second.empty()) { created_actors_.erase(node_it); } } else { - // The actor has not been created yet. It is either being scheduled or is - // pending scheduling. - auto canceled_actor_id = - gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID()); - if (!canceled_actor_id.IsNil()) { - // The actor was being scheduled and has now been canceled. - RAY_CHECK(canceled_actor_id == actor_id); - } else { - auto pending_it = - std::find_if(pending_actors_.begin(), pending_actors_.end(), - [actor_id](const std::shared_ptr &actor) { - return actor->GetActorID() == actor_id; - }); - - // The actor was pending scheduling. Remove it from the queue. - if (pending_it != pending_actors_.end()) { - pending_actors_.erase(pending_it); - } else { - // When actor creation request of this actor id is pending in raylet, - // it doesn't responds, and the actor should be still in leasing state. - // NOTE: Raylet will cancel the lease request once it receives the - // actor state notification. So this method doesn't have to cancel - // outstanding lease request by calling raylet_client->CancelWorkerLease - gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id); - } - } + CancelActorInScheduling(actor, task_id); } } @@ -706,7 +703,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, [this, actor, actor_id, mutable_actor_table_data](Status status) { - // if actor was an detached actor, make sure to destroy it. + // If actor was an detached actor, make sure to destroy it. // We need to do this because detached actors are not destroyed // when its owners are dead because it doesn't have owners. if (actor->IsDetached()) { @@ -934,15 +931,47 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr &acto } } -void GcsActorManager::KillActor(const std::shared_ptr &actor) { +void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr &actor, + bool force_kill, bool no_restart) { auto actor_client = worker_client_factory_(actor->GetAddress()); rpc::KillActorRequest request; request.set_intended_actor_id(actor->GetActorID().Binary()); - request.set_force_kill(true); - request.set_no_restart(true); + request.set_force_kill(force_kill); + request.set_no_restart(no_restart); RAY_UNUSED(actor_client->KillActor(request, nullptr)); } +void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill, + bool no_restart) { + RAY_LOG(DEBUG) << "Killing actor, job id = " << actor_id.JobId() + << ", actor id = " << actor_id << ", force_kill = " << force_kill; + const auto &it = registered_actors_.find(actor_id); + if (it == registered_actors_.end()) { + RAY_LOG(INFO) << "Tried to kill actor that does not exist " << actor_id; + return; + } + + const auto &actor = it->second; + if (actor->GetState() == rpc::ActorTableData::DEAD || + actor->GetState() == rpc::ActorTableData::DEPENDENCIES_UNREADY) { + return; + } + + // The actor is still alive or pending creation. + const auto &node_id = actor->GetNodeID(); + const auto &worker_id = actor->GetWorkerID(); + auto node_it = created_actors_.find(node_id); + if (node_it != created_actors_.end() && node_it->second.count(worker_id)) { + // The actor has already been created. Destroy the process by force-killing + // it. + NotifyCoreWorkerToKillActor(actor, force_kill, no_restart); + } else { + const auto &task_id = actor->GetCreationTaskSpecification().TaskId(); + CancelActorInScheduling(actor, task_id); + ReconstructActor(actor_id, /*need_reschedule=*/true); + } +} + void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr &actor) { if (destroyed_actors_.size() >= RayConfig::instance().maximum_gcs_destroyed_actor_cached_count()) { @@ -956,6 +985,36 @@ void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr & actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp()); } +void GcsActorManager::CancelActorInScheduling(const std::shared_ptr &actor, + const TaskID &task_id) { + const auto &actor_id = actor->GetActorID(); + const auto &node_id = actor->GetNodeID(); + // The actor has not been created yet. It is either being scheduled or is + // pending scheduling. + auto canceled_actor_id = + gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID()); + if (!canceled_actor_id.IsNil()) { + // The actor was being scheduled and has now been canceled. + RAY_CHECK(canceled_actor_id == actor_id); + } else { + auto pending_it = std::find_if(pending_actors_.begin(), pending_actors_.end(), + [actor_id](const std::shared_ptr &actor) { + return actor->GetActorID() == actor_id; + }); + + // The actor was pending scheduling. Remove it from the queue. + if (pending_it != pending_actors_.end()) { + pending_actors_.erase(pending_it); + } else { + // When actor creation request of this actor id is pending in raylet, + // it doesn't responds, and the actor should be still in leasing state. + // NOTE: We will cancel outstanding lease request by calling + // `raylet_client->CancelWorkerLease`. + gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id, task_id); + } + } +} + std::string GcsActorManager::DebugString() const { std::ostringstream stream; stream << "GcsActorManager: {RegisterActor request count: " @@ -964,6 +1023,7 @@ std::string GcsActorManager::DebugString() const { << ", GetActorInfo request count: " << counts_[CountType::GET_ACTOR_INFO_REQUEST] << ", GetNamedActorInfo request count: " << counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST] + << ", KillActor request count: " << counts_[CountType::KILL_ACTOR_REQUEST] << ", Registered actors count: " << registered_actors_.size() << ", Destroyed actors count: " << destroyed_actors_.size() << ", Named actors count: " << named_actors_.size() diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index d3ffc3097..f2db9345f 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -190,6 +190,10 @@ class GcsActorManager : public rpc::ActorInfoHandler { rpc::GetAllActorInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; + void HandleKillActorViaGcs(const rpc::KillActorViaGcsRequest &request, + rpc::KillActorViaGcsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Register actor asynchronously. /// /// \param request Contains the meta info to create the actor. @@ -336,8 +340,18 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// Kill the specified actor. /// + /// \param actor_id ID of the actor to kill. + /// \param force_kill Whether to force kill an actor by killing the worker. + /// \param no_restart If set to true, the killed actor will not be restarted anymore. + void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart); + + /// Notify CoreWorker to kill the specified actor. + /// /// \param actor The actor to be killed. - void KillActor(const std::shared_ptr &actor); + /// \param force_kill Whether to force kill an actor by killing the worker. + /// \param no_restart If set to true, the killed actor will not be restarted anymore. + void NotifyCoreWorkerToKillActor(const std::shared_ptr &actor, + bool force_kill = true, bool no_restart = true); /// Add the destroyed actor to the cache. If the cache is full, one actor is randomly /// evicted. @@ -356,6 +370,13 @@ class GcsActorManager : public rpc::ActorInfoHandler { return actor_delta; } + /// Cancel actor which is either being scheduled or is pending scheduling. + /// + /// \param actor The actor to be cancelled. + /// \param task_id The id of actor creation task to be cancelled. + void CancelActorInScheduling(const std::shared_ptr &actor, + const TaskID &task_id); + /// Callbacks of pending `RegisterActor` requests. /// Maps actor ID to actor registration callbacks, which is used to filter duplicated /// messages from a driver/worker caused by some network problems. @@ -413,7 +434,8 @@ class GcsActorManager : public rpc::ActorInfoHandler { GET_ACTOR_INFO_REQUEST = 2, GET_NAMED_ACTOR_INFO_REQUEST = 3, GET_ALL_ACTOR_INFO_REQUEST = 4, - CountType_MAX = 10, + KILL_ACTOR_REQUEST = 5, + CountType_MAX = 6, }; uint64_t counts_[CountType::CountType_MAX] = {0}; }; diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 9c81c8c0e..1b4201c4f 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -127,13 +127,27 @@ std::vector GcsActorScheduler::CancelOnNode(const NodeID &node_id) { return actor_ids; } -void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) { - // NOTE: This method does not currently cancel the outstanding lease request. - // It only removes leasing information from the internal state so that - // RequestWorkerLease ignores the response from raylet. +void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id, + const TaskID &task_id) { + // NOTE: This method will cancel the outstanding lease request and remove leasing + // information from the internal state. auto node_it = node_to_actors_when_leasing_.find(node_id); - RAY_CHECK(node_it != node_to_actors_when_leasing_.end()); - node_it->second.erase(actor_id); + if (node_it != node_to_actors_when_leasing_.end()) { + node_it->second.erase(actor_id); + } + + const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); + const auto &iter = alive_nodes.find(node_id); + if (iter != alive_nodes.end()) { + const auto &node_info = iter->second; + rpc::Address address; + address.set_raylet_id(node_info->node_id()); + address.set_ip_address(node_info->node_manager_address()); + address.set_port(node_info->node_manager_port()); + auto lease_client = GetOrConnectLeaseClient(address); + lease_client->CancelWorkerLease( + task_id, [](const Status &status, const rpc::CancelWorkerLeaseReply &reply) {}); + } } ActorID GcsActorScheduler::CancelOnWorker(const NodeID &node_id, @@ -238,6 +252,16 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, } if (status.ok()) { + if (reply.worker_address().raylet_id().empty() && + reply.retry_at_raylet_address().raylet_id().empty()) { + // Actor creation task has been cancelled. It is triggered by `ray.kill`. If + // the number of remaining restarts of the actor is not equal to 0, GCS will + // reschedule the actor, so it return directly here. + RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() + << " creation task has been cancelled."; + return; + } + // Remove the actor from the leasing map as the reply is returned from the // remote node. iter->second.erase(actor_iter); diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index 71dd35108..c0e3d430e 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -59,7 +59,8 @@ class GcsActorSchedulerInterface { /// /// \param node_id ID of the node where the actor leasing request has been sent. /// \param actor_id ID of an actor. - virtual void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) = 0; + virtual void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id, + const TaskID &task_id) = 0; /// Cancel the actor that is being scheduled to the specified worker. /// @@ -130,7 +131,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { /// /// \param node_id ID of the node where the actor leasing request has been sent. /// \param actor_id ID of an actor. - void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) override; + void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id, + const TaskID &task_id) override; /// Cancel the actor that is being scheduled to the specified worker. /// diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index b88c6702b..b8edb6e82 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -35,7 +35,8 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface { MOCK_METHOD1(CancelOnNode, std::vector(const NodeID &node_id)); MOCK_METHOD2(CancelOnWorker, ActorID(const NodeID &node_id, const WorkerID &worker_id)); - MOCK_METHOD2(CancelOnLeasing, void(const NodeID &node_id, const ActorID &actor_id)); + MOCK_METHOD3(CancelOnLeasing, void(const NodeID &node_id, const ActorID &actor_id, + const TaskID &task_id)); std::vector> actors; }; @@ -735,8 +736,10 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { address.set_raylet_id(node_id.Binary()); address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); - const auto actor_id = actor->GetActorID(); - EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id)); + const auto &actor_id = actor->GetActorID(); + const auto &task_id = + TaskID::FromBinary(registered_actor->GetActorTableData().task_spec().task_id()); + EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id, task_id)); gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id); } diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index d84f99b3f..bd98d65ef 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -262,7 +262,8 @@ TEST_F(GcsActorSchedulerTest, TestLeasingCancelledWhenLeasing) { ASSERT_EQ(1, raylet_client_->callbacks.size()); // Cancel the lease request. - gcs_actor_scheduler_->CancelOnLeasing(node_id, actor->GetActorID()); + const auto &task_id = TaskID::FromBinary(create_actor_request.task_spec().task_id()); + gcs_actor_scheduler_->CancelOnLeasing(node_id, actor->GetActorID(), task_id); ASSERT_EQ(1, raylet_client_->num_workers_requested); ASSERT_EQ(1, raylet_client_->callbacks.size()); diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index ed5ca92e2..6e2c450dd 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -92,6 +92,22 @@ message GetAllActorInfoReply { repeated ActorTableData actor_table_data = 2; } +// `KillActorViaGcsRequest` is sent to GCS Service to ask to kill an actor. +// `KillActorViaGcsRequest` is different from `KillActorRequest`. +// `KillActorRequest` is send to core worker to ask to kill an actor. +message KillActorViaGcsRequest { + // ID of this actor. + bytes actor_id = 1; + // Whether to force kill the actor. + bool force_kill = 2; + // If set to true, the killed actor will not be restarted anymore. + bool no_restart = 3; +} + +message KillActorViaGcsReply { + GcsStatus status = 1; +} + // Service for actor info access. service ActorInfoGcsService { // Register actor to gcs service. @@ -104,6 +120,8 @@ service ActorInfoGcsService { rpc GetNamedActorInfo(GetNamedActorInfoRequest) returns (GetNamedActorInfoReply); // Get information of all actor from GCS Service. rpc GetAllActorInfo(GetAllActorInfoRequest) returns (GetAllActorInfoReply); + // Kill actor via GCS Service. + rpc KillActorViaGcs(KillActorViaGcsRequest) returns (KillActorViaGcsReply); } message RegisterNodeRequest { diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index bf9a72bed..bae0e56bd 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -144,6 +144,10 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetAllActorInfo, actor_info_grpc_client_, ) + /// Kill actor via GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, KillActorViaGcs, + actor_info_grpc_client_, ) + /// Register a node to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, RegisterNode, node_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 328aa5f73..246a5ee9e 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -125,6 +125,10 @@ class ActorInfoGcsServiceHandler { virtual void HandleGetAllActorInfo(const GetAllActorInfoRequest &request, GetAllActorInfoReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleKillActorViaGcs(const KillActorViaGcsRequest &request, + KillActorViaGcsReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `ActorInfoGcsService`. @@ -148,6 +152,7 @@ class ActorInfoGrpcService : public GrpcService { ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(GetNamedActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(GetAllActorInfo); + ACTOR_INFO_SERVICE_RPC_HANDLER(KillActorViaGcs); } private: