Revert "[Core]Fix ray.kill doesn't cancel pending actor bug (#14025)" (#14146)

This reverts commit 1754359281.
This commit is contained in:
architkulkarni 2021-02-16 17:22:25 -08:00 committed by GitHub
parent 04d2df40cd
commit d9124e9329
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 56 additions and 330 deletions

View file

@ -59,8 +59,6 @@ public class KillActorTest extends BaseTest {
private void testKillActor(BiConsumer<ActorHandle<?>, Boolean> kill, boolean noRestart) {
ActorHandle<HangActor> actor = Ray.actor(HangActor::new).setMaxRestarts(1).remote();
// Wait for the actor to be created.
actor.task(HangActor::ping).remote().get();
ObjectRef<Boolean> result = actor.task(HangActor::hang).remote();
// The actor will hang in this task.
Assert.assertEquals(0, Ray.wait(ImmutableList.of(result), 1, 500).getReady().size());

View file

@ -1093,90 +1093,6 @@ def test_actor_resource_demand(shutdown_only):
global_state_accessor.disconnect()
def test_kill_pending_actor_with_no_restart_true():
cluster = ray.init()
global_state_accessor = GlobalStateAccessor(
cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD)
global_state_accessor.connect()
@ray.remote(resources={"WORKER": 1.0})
class PendingActor:
pass
# Kill actor with `no_restart=True`.
actor = PendingActor.remote()
# TODO(ffbin): The raylet doesn't guarantee the order when dealing with
# RequestWorkerLease and CancelWorkerLease. If we kill the actor
# immediately after creating the actor, we may not be able to clean up
# the request cached by the raylet.
# See https://github.com/ray-project/ray/issues/13545 for details.
time.sleep(1)
ray.kill(actor, no_restart=True)
def condition1():
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
if len(resource_usages.resource_load_by_shape.resource_demands) == 0:
return True
return False
# Actor is dead, so the infeasible task queue length is 0.
wait_for_condition(condition1, timeout=10)
global_state_accessor.disconnect()
ray.shutdown()
def test_kill_pending_actor_with_no_restart_false():
cluster = ray.init()
global_state_accessor = GlobalStateAccessor(
cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD)
global_state_accessor.connect()
@ray.remote(resources={"WORKER": 1.0}, max_restarts=1)
class PendingActor:
pass
# Kill actor with `no_restart=False`.
actor = PendingActor.remote()
# TODO(ffbin): The raylet doesn't guarantee the order when dealing with
# RequestWorkerLease and CancelWorkerLease. If we kill the actor
# immediately after creating the actor, we may not be able to clean up
# the request cached by the raylet.
# See https://github.com/ray-project/ray/issues/13545 for details.
time.sleep(1)
ray.kill(actor, no_restart=False)
def condition1():
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
if len(resource_usages.resource_load_by_shape.resource_demands) == 0:
return False
return True
# Actor restarts, so the infeasible task queue length is 1.
wait_for_condition(condition1, timeout=10)
# Kill actor again and actor is dead,
# so the infeasible task queue length is 0.
ray.kill(actor, no_restart=False)
def condition2():
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
if len(resource_usages.resource_load_by_shape.resource_demands) == 0:
return True
return False
wait_for_condition(condition2, timeout=10)
global_state_accessor.disconnect()
ray.shutdown()
if __name__ == "__main__":
import pytest
# Test suite is timing out. Disable on windows for now.

View file

@ -754,15 +754,12 @@ def test_warning_for_too_many_actors(shutdown_only):
def __init__(self):
time.sleep(1000)
# NOTE: We should save actor, otherwise it will be out of scope.
actors = [Foo.remote() for _ in range(num_cpus * 3)]
assert len(actors) == num_cpus * 3
[Foo.remote() for _ in range(num_cpus * 3)]
errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR
actors = [Foo.remote() for _ in range(num_cpus)]
assert len(actors) == num_cpus
[Foo.remote() for _ in range(num_cpus)]
errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR

View file

@ -902,10 +902,8 @@ def test_capture_child_actors(ray_start_cluster):
# Kill an actor and wait until it is killed.
ray.kill(a)
try:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
except ray.exceptions.RayActorError:
pass
# Now create an actor, but do not capture the current tasks
a = Actor.options(
@ -927,10 +925,8 @@ def test_capture_child_actors(ray_start_cluster):
# Kill an actor and wait until it is killed.
ray.kill(a)
try:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
except ray.exceptions.RayActorError:
pass
# Lastly, make sure when None is specified, actors are not scheduled
# on the same placement group.
@ -1420,10 +1416,8 @@ ray.shutdown()
# Kill an actor and wait until it is killed.
ray.kill(a)
try:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
except ray.exceptions.RayActorError:
pass
# We should have 2 alive pgs and 4 alive actors.
assert assert_alive_num_pg(2)

View file

@ -199,19 +199,17 @@ def test_custom_resources(ray_start_regular_shared):
assert current_resources["CPU"] == 1.0
# By default an actor should not reserve any resources.
q = Queue()
Queue()
current_resources = ray.available_resources()
assert current_resources["CPU"] == 1.0
q.shutdown()
# Specify resource requirement. The queue should now reserve 1 CPU.
q = Queue(actor_options={"num_cpus": 1})
Queue(actor_options={"num_cpus": 1})
def no_cpu_in_resources():
return "CPU" not in ray.available_resources()
wait_for_condition(no_cpu_in_resources)
q.shutdown()
if __name__ == "__main__":

View file

@ -468,10 +468,8 @@ def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put,
# Test that the actor exiting stops the reference from being pinned.
ray.kill(actor)
# Wait for the actor to exit.
try:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.delete_ref1.remote())
except ray.exceptions.RayActorError:
pass
else:
# Test that deleting the second reference stops it from being pinned.
ray.get(actor.delete_ref2.remote())

View file

@ -1672,9 +1672,7 @@ Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_r
stream << "Failed to find a corresponding actor handle for " << actor_id;
return Status::Invalid(stream.str());
}
RAY_CHECK_OK(
gcs_client_->Actors().AsyncKillActor(actor_id, force_kill, no_restart, nullptr));
direct_actor_submitter_->KillActor(actor_id, force_kill, no_restart);
return Status::OK();
}

View file

@ -732,7 +732,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Tell an actor to exit immediately, without completing outstanding work.
///
/// \param[in] actor_id ID of the actor to kill.
/// \param[in] force_kill Whether to force kill an actor by killing the worker.
/// \param[in] no_restart If set to true, the killed actor will not be
/// restarted anymore.
/// \param[out] Status

View file

@ -64,16 +64,6 @@ class ActorInfoAccessor {
virtual Status AsyncRegisterActor(const TaskSpecification &task_spec,
const StatusCallback &callback) = 0;
/// Kill actor via GCS asynchronously.
///
/// \param actor_id The ID of actor to destroy.
/// \param force_kill Whether to force kill an actor by killing the worker.
/// \param no_restart If set to true, the killed actor will not be restarted anymore.
/// \param callback Callback that will be called after the actor is destroyed.
/// \return Status
virtual Status AsyncKillActor(const ActorID &actor_id, bool force_kill, bool no_restart,
const StatusCallback &callback) = 0;
/// Asynchronously request GCS to create the actor.
///
/// This should be called after the worker has resolved the actor dependencies.

View file

@ -200,26 +200,6 @@ Status ServiceBasedActorInfoAccessor::AsyncRegisterActor(
return Status::OK();
}
Status ServiceBasedActorInfoAccessor::AsyncKillActor(
const ActorID &actor_id, bool force_kill, bool no_restart,
const ray::gcs::StatusCallback &callback) {
rpc::KillActorViaGcsRequest request;
request.set_actor_id(actor_id.Binary());
request.set_force_kill(force_kill);
request.set_no_restart(no_restart);
client_impl_->GetGcsRpcClient().KillActorViaGcs(
request, [callback](const Status &, const rpc::KillActorViaGcsReply &reply) {
if (callback) {
auto status =
reply.status().code() == (int)StatusCode::OK
? Status()
: Status(StatusCode(reply.status().code()), reply.status().message());
callback(status);
}
});
return Status::OK();
}
Status ServiceBasedActorInfoAccessor::AsyncCreateActor(
const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) {
RAY_CHECK(task_spec.IsActorCreationTask() && callback);

View file

@ -85,9 +85,6 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
Status AsyncCreateActor(const TaskSpecification &task_spec,
const StatusCallback &callback) override;
Status AsyncKillActor(const ActorID &actor_id, bool force_kill, bool no_restart,
const StatusCallback &callback) override;
Status AsyncSubscribeAll(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) override;

View file

@ -214,25 +214,6 @@ void GcsActorManager::HandleGetNamedActorInfo(
++counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST];
}
void GcsActorManager::HandleKillActorViaGcs(const rpc::KillActorViaGcsRequest &request,
rpc::KillActorViaGcsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const auto &actor_id = ActorID::FromBinary(request.actor_id());
bool force_kill = request.force_kill();
bool no_restart = request.no_restart();
if (no_restart) {
DestroyActor(actor_id);
} else {
KillActor(actor_id, force_kill, no_restart);
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
RAY_LOG(DEBUG) << "Finished killing actor, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", force_kill = " << force_kill
<< ", no_restart = " << no_restart;
++counts_[CountType::KILL_ACTOR_REQUEST];
}
Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request,
RegisterActorCallback success_callback) {
// NOTE: After the abnormal recovery of the network between GCS client and GCS server or
@ -436,11 +417,8 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
actor_to_register_callbacks_.erase(actor_id);
actor_to_create_callbacks_.erase(actor_id);
auto it = registered_actors_.find(actor_id);
if (it == registered_actors_.end()) {
RAY_LOG(INFO) << "Tried to destroy actor that does not exist " << actor_id;
return;
}
const auto &task_id = it->second->GetCreationTaskSpecification().TaskId();
RAY_CHECK(it != registered_actors_.end())
<< "Tried to destroy actor that does not exist " << actor_id;
it->second->GetMutableActorTableData()->mutable_task_spec()->Clear();
it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms());
AddDestroyedActorToCache(it->second);
@ -478,13 +456,38 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
// The actor has already been created. Destroy the process by force-killing
// it.
NotifyCoreWorkerToKillActor(actor);
KillActor(actor);
RAY_CHECK(node_it->second.erase(actor->GetWorkerID()));
if (node_it->second.empty()) {
created_actors_.erase(node_it);
}
} else {
CancelActorInScheduling(actor, task_id);
// The actor has not been created yet. It is either being scheduled or is
// pending scheduling.
auto canceled_actor_id =
gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID());
if (!canceled_actor_id.IsNil()) {
// The actor was being scheduled and has now been canceled.
RAY_CHECK(canceled_actor_id == actor_id);
} else {
auto pending_it =
std::find_if(pending_actors_.begin(), pending_actors_.end(),
[actor_id](const std::shared_ptr<GcsActor> &actor) {
return actor->GetActorID() == actor_id;
});
// The actor was pending scheduling. Remove it from the queue.
if (pending_it != pending_actors_.end()) {
pending_actors_.erase(pending_it);
} else {
// When actor creation request of this actor id is pending in raylet,
// it doesn't responds, and the actor should be still in leasing state.
// NOTE: Raylet will cancel the lease request once it receives the
// actor state notification. So this method doesn't have to cancel
// outstanding lease request by calling raylet_client->CancelWorkerLease
gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id);
}
}
}
}
@ -703,7 +706,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
[this, actor, actor_id, mutable_actor_table_data](Status status) {
// If actor was an detached actor, make sure to destroy it.
// if actor was an detached actor, make sure to destroy it.
// We need to do this because detached actors are not destroyed
// when its owners are dead because it doesn't have owners.
if (actor->IsDetached()) {
@ -931,47 +934,15 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr<GcsActor> &acto
}
}
void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
bool force_kill, bool no_restart) {
void GcsActorManager::KillActor(const std::shared_ptr<GcsActor> &actor) {
auto actor_client = worker_client_factory_(actor->GetAddress());
rpc::KillActorRequest request;
request.set_intended_actor_id(actor->GetActorID().Binary());
request.set_force_kill(force_kill);
request.set_no_restart(no_restart);
request.set_force_kill(true);
request.set_no_restart(true);
RAY_UNUSED(actor_client->KillActor(request, nullptr));
}
void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill,
bool no_restart) {
RAY_LOG(DEBUG) << "Killing actor, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id << ", force_kill = " << force_kill;
const auto &it = registered_actors_.find(actor_id);
if (it == registered_actors_.end()) {
RAY_LOG(INFO) << "Tried to kill actor that does not exist " << actor_id;
return;
}
const auto &actor = it->second;
if (actor->GetState() == rpc::ActorTableData::DEAD ||
actor->GetState() == rpc::ActorTableData::DEPENDENCIES_UNREADY) {
return;
}
// The actor is still alive or pending creation.
const auto &node_id = actor->GetNodeID();
const auto &worker_id = actor->GetWorkerID();
auto node_it = created_actors_.find(node_id);
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
// The actor has already been created. Destroy the process by force-killing
// it.
NotifyCoreWorkerToKillActor(actor, force_kill, no_restart);
} else {
const auto &task_id = actor->GetCreationTaskSpecification().TaskId();
CancelActorInScheduling(actor, task_id);
ReconstructActor(actor_id, /*need_reschedule=*/true);
}
}
void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr<GcsActor> &actor) {
if (destroyed_actors_.size() >=
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count()) {
@ -985,36 +956,6 @@ void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr<GcsActor> &
actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp());
}
void GcsActorManager::CancelActorInScheduling(const std::shared_ptr<GcsActor> &actor,
const TaskID &task_id) {
const auto &actor_id = actor->GetActorID();
const auto &node_id = actor->GetNodeID();
// The actor has not been created yet. It is either being scheduled or is
// pending scheduling.
auto canceled_actor_id =
gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID());
if (!canceled_actor_id.IsNil()) {
// The actor was being scheduled and has now been canceled.
RAY_CHECK(canceled_actor_id == actor_id);
} else {
auto pending_it = std::find_if(pending_actors_.begin(), pending_actors_.end(),
[actor_id](const std::shared_ptr<GcsActor> &actor) {
return actor->GetActorID() == actor_id;
});
// The actor was pending scheduling. Remove it from the queue.
if (pending_it != pending_actors_.end()) {
pending_actors_.erase(pending_it);
} else {
// When actor creation request of this actor id is pending in raylet,
// it doesn't responds, and the actor should be still in leasing state.
// NOTE: We will cancel outstanding lease request by calling
// `raylet_client->CancelWorkerLease`.
gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id, task_id);
}
}
}
std::string GcsActorManager::DebugString() const {
std::ostringstream stream;
stream << "GcsActorManager: {RegisterActor request count: "
@ -1023,7 +964,6 @@ std::string GcsActorManager::DebugString() const {
<< ", GetActorInfo request count: " << counts_[CountType::GET_ACTOR_INFO_REQUEST]
<< ", GetNamedActorInfo request count: "
<< counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST]
<< ", KillActor request count: " << counts_[CountType::KILL_ACTOR_REQUEST]
<< ", Registered actors count: " << registered_actors_.size()
<< ", Destroyed actors count: " << destroyed_actors_.size()
<< ", Named actors count: " << named_actors_.size()

View file

@ -190,10 +190,6 @@ class GcsActorManager : public rpc::ActorInfoHandler {
rpc::GetAllActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleKillActorViaGcs(const rpc::KillActorViaGcsRequest &request,
rpc::KillActorViaGcsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Register actor asynchronously.
///
/// \param request Contains the meta info to create the actor.
@ -340,18 +336,8 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// Kill the specified actor.
///
/// \param actor_id ID of the actor to kill.
/// \param force_kill Whether to force kill an actor by killing the worker.
/// \param no_restart If set to true, the killed actor will not be restarted anymore.
void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart);
/// Notify CoreWorker to kill the specified actor.
///
/// \param actor The actor to be killed.
/// \param force_kill Whether to force kill an actor by killing the worker.
/// \param no_restart If set to true, the killed actor will not be restarted anymore.
void NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
bool force_kill = true, bool no_restart = true);
void KillActor(const std::shared_ptr<GcsActor> &actor);
/// Add the destroyed actor to the cache. If the cache is full, one actor is randomly
/// evicted.
@ -370,13 +356,6 @@ class GcsActorManager : public rpc::ActorInfoHandler {
return actor_delta;
}
/// Cancel actor which is either being scheduled or is pending scheduling.
///
/// \param actor The actor to be cancelled.
/// \param task_id The id of actor creation task to be cancelled.
void CancelActorInScheduling(const std::shared_ptr<GcsActor> &actor,
const TaskID &task_id);
/// Callbacks of pending `RegisterActor` requests.
/// Maps actor ID to actor registration callbacks, which is used to filter duplicated
/// messages from a driver/worker caused by some network problems.
@ -434,8 +413,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
GET_ACTOR_INFO_REQUEST = 2,
GET_NAMED_ACTOR_INFO_REQUEST = 3,
GET_ALL_ACTOR_INFO_REQUEST = 4,
KILL_ACTOR_REQUEST = 5,
CountType_MAX = 6,
CountType_MAX = 10,
};
uint64_t counts_[CountType::CountType_MAX] = {0};
};

View file

@ -127,27 +127,13 @@ std::vector<ActorID> GcsActorScheduler::CancelOnNode(const NodeID &node_id) {
return actor_ids;
}
void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id,
const TaskID &task_id) {
// NOTE: This method will cancel the outstanding lease request and remove leasing
// information from the internal state.
void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) {
// NOTE: This method does not currently cancel the outstanding lease request.
// It only removes leasing information from the internal state so that
// RequestWorkerLease ignores the response from raylet.
auto node_it = node_to_actors_when_leasing_.find(node_id);
if (node_it != node_to_actors_when_leasing_.end()) {
node_it->second.erase(actor_id);
}
const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
const auto &iter = alive_nodes.find(node_id);
if (iter != alive_nodes.end()) {
const auto &node_info = iter->second;
rpc::Address address;
address.set_raylet_id(node_info->node_id());
address.set_ip_address(node_info->node_manager_address());
address.set_port(node_info->node_manager_port());
auto lease_client = GetOrConnectLeaseClient(address);
lease_client->CancelWorkerLease(
task_id, [](const Status &status, const rpc::CancelWorkerLeaseReply &reply) {});
}
RAY_CHECK(node_it != node_to_actors_when_leasing_.end());
node_it->second.erase(actor_id);
}
ActorID GcsActorScheduler::CancelOnWorker(const NodeID &node_id,
@ -252,16 +238,6 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
}
if (status.ok()) {
if (reply.worker_address().raylet_id().empty() &&
reply.retry_at_raylet_address().raylet_id().empty()) {
// Actor creation task has been cancelled. It is triggered by `ray.kill`. If
// the number of remaining restarts of the actor is not equal to 0, GCS will
// reschedule the actor, so it return directly here.
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID()
<< " creation task has been cancelled.";
return;
}
// Remove the actor from the leasing map as the reply is returned from the
// remote node.
iter->second.erase(actor_iter);

View file

@ -59,8 +59,7 @@ class GcsActorSchedulerInterface {
///
/// \param node_id ID of the node where the actor leasing request has been sent.
/// \param actor_id ID of an actor.
virtual void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id,
const TaskID &task_id) = 0;
virtual void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) = 0;
/// Cancel the actor that is being scheduled to the specified worker.
///
@ -131,8 +130,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
///
/// \param node_id ID of the node where the actor leasing request has been sent.
/// \param actor_id ID of an actor.
void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id,
const TaskID &task_id) override;
void CancelOnLeasing(const NodeID &node_id, const ActorID &actor_id) override;
/// Cancel the actor that is being scheduled to the specified worker.
///

View file

@ -35,8 +35,7 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface {
MOCK_METHOD1(CancelOnNode, std::vector<ActorID>(const NodeID &node_id));
MOCK_METHOD2(CancelOnWorker, ActorID(const NodeID &node_id, const WorkerID &worker_id));
MOCK_METHOD3(CancelOnLeasing, void(const NodeID &node_id, const ActorID &actor_id,
const TaskID &task_id));
MOCK_METHOD2(CancelOnLeasing, void(const NodeID &node_id, const ActorID &actor_id));
std::vector<std::shared_ptr<gcs::GcsActor>> actors;
};
@ -736,10 +735,8 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) {
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
const auto &actor_id = actor->GetActorID();
const auto &task_id =
TaskID::FromBinary(registered_actor->GetActorTableData().task_spec().task_id());
EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id, task_id));
const auto actor_id = actor->GetActorID();
EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id));
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id);
}

View file

@ -262,8 +262,7 @@ TEST_F(GcsActorSchedulerTest, TestLeasingCancelledWhenLeasing) {
ASSERT_EQ(1, raylet_client_->callbacks.size());
// Cancel the lease request.
const auto &task_id = TaskID::FromBinary(create_actor_request.task_spec().task_id());
gcs_actor_scheduler_->CancelOnLeasing(node_id, actor->GetActorID(), task_id);
gcs_actor_scheduler_->CancelOnLeasing(node_id, actor->GetActorID());
ASSERT_EQ(1, raylet_client_->num_workers_requested);
ASSERT_EQ(1, raylet_client_->callbacks.size());

View file

@ -87,22 +87,6 @@ message GetAllActorInfoReply {
repeated ActorTableData actor_table_data = 2;
}
// `KillActorViaGcsRequest` is sent to GCS Service to ask to kill an actor.
// `KillActorViaGcsRequest` is different from `KillActorRequest`.
// `KillActorRequest` is send to core worker to ask to kill an actor.
message KillActorViaGcsRequest {
// ID of this actor.
bytes actor_id = 1;
// Whether to force kill the actor.
bool force_kill = 2;
// If set to true, the killed actor will not be restarted anymore.
bool no_restart = 3;
}
message KillActorViaGcsReply {
GcsStatus status = 1;
}
// Service for actor info access.
service ActorInfoGcsService {
// Register actor to gcs service.
@ -115,8 +99,6 @@ service ActorInfoGcsService {
rpc GetNamedActorInfo(GetNamedActorInfoRequest) returns (GetNamedActorInfoReply);
// Get information of all actor from GCS Service.
rpc GetAllActorInfo(GetAllActorInfoRequest) returns (GetAllActorInfoReply);
// Kill actor via GCS Service.
rpc KillActorViaGcs(KillActorViaGcsRequest) returns (KillActorViaGcsReply);
}
message RegisterNodeRequest {

View file

@ -144,10 +144,6 @@ class GcsRpcClient {
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetAllActorInfo,
actor_info_grpc_client_, )
/// Kill actor via GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, KillActorViaGcs,
actor_info_grpc_client_, )
/// Register a node to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, RegisterNode, node_info_grpc_client_, )

View file

@ -125,10 +125,6 @@ class ActorInfoGcsServiceHandler {
virtual void HandleGetAllActorInfo(const GetAllActorInfoRequest &request,
GetAllActorInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleKillActorViaGcs(const KillActorViaGcsRequest &request,
KillActorViaGcsReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `ActorInfoGcsService`.
@ -152,7 +148,6 @@ class ActorInfoGrpcService : public GrpcService {
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetNamedActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetAllActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(KillActorViaGcs);
}
private: