Add actor.__ray_kill__() to terminate actors immediately (#6523)

This commit is contained in:
Edward Oakes 2019-12-23 23:12:57 -06:00 committed by GitHub
parent 49b0ebf724
commit 6b1a57542e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 173 additions and 5 deletions

View file

@ -143,7 +143,13 @@ If necessary, you can manually terminate an actor by calling
``ray.actor.exit_actor()`` from within one of the actor methods. This will kill
the actor process and release resources associated/assigned to the actor. This
approach should generally not be necessary as actors are automatically garbage
collected.
collected. The ``ObjectID`` resulting from the task can be waited on to wait
for the actor to exit (calling ``ray.get()`` on it will raise a ``RayActorError``).
Note that this method of termination will wait until any previously submitted
tasks finish executing. If you want to terminate an actor immediately, you can
call ``actor_handle.__ray_kill__()``. This will cause the actor to exit immediately
and any pending tasks to fail. Any exit handlers installed in the actor using
``atexit`` will be called.
Passing Around Actor Handles
----------------------------

View file

@ -1034,6 +1034,14 @@ cdef class CoreWorker:
return VectorToObjectIDs(return_ids)
def kill_actor(self, ActorID actor_id):
cdef:
CActorID c_actor_id = actor_id.native()
with nogil:
check_status(self.core_worker.get().KillActor(
c_actor_id))
def resource_ids(self):
cdef:
ResourceMappingType resource_mapping = (

View file

@ -642,7 +642,7 @@ class ActorHandle(object):
self._actor_id.hex())
def __del__(self):
"""Kill the worker that is running this actor."""
"""Terminate the worker that is running this actor."""
# TODO(swang): Also clean up forked actor handles.
# Kill the worker if this is the original actor handle, created
# with Class.remote(). TODO(rkn): Even without passing handles around,
@ -671,6 +671,20 @@ class ActorHandle(object):
finally:
self.__ray_terminate__._actor_hard_ref = None
def __ray_kill__(self):
"""Kill the actor that this actor handle refers to immediately.
This will cause any outstanding tasks submitted to the actor to fail
and the actor to exit in the same way as if it crashed. In general,
you should prefer to just delete the actor handle and let it clean up
gracefull.
Returns:
None.
"""
worker = ray.worker.get_global_worker()
worker.core_worker.kill_actor(self._ray_actor_id)
@property
def _actor_id(self):
return self._ray_actor_id

View file

@ -98,6 +98,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CActorID &actor_id, const CRayFunction &function,
const c_vector[CTaskArg] &args, const CTaskOptions &options,
c_vector[CObjectID] *return_ids)
CRayStatus KillActor(const CActorID &actor_id)
unique_ptr[CProfileEvent] CreateProfileEvent(
const c_string &event_type)

View file

@ -1431,6 +1431,22 @@ ray.get(actor.ping.remote())
assert ray.get(detached_actor.ping.remote()) == "pong"
def test_kill(ray_start_regular):
@ray.remote
class Actor(object):
def hang(self):
# Never returns.
ray.get(ray.ObjectID.from_random())
actor = Actor.remote()
result = actor.hang.remote()
ready, _ = ray.wait([result], timeout=0.1)
assert len(ready) == 0
actor.__ray_kill__()
with pytest.raises(ray.exceptions.RayActorError):
ray.get(result, timeout=1)
if __name__ == "__main__":
import pytest
import sys

View file

@ -1746,6 +1746,28 @@ def remote(*args, **kwargs):
class Foo(object):
def method(self):
return 1
Remote task and actor objects returned by @ray.remote can also be
dynamically modified with the same arguments as above using
``.options()`` as follows:
.. code-block:: python
@ray.remote(num_gpus=1, max_calls=1, num_return_vals=2)
def f():
return 1, 2
g = f.options(num_gpus=2, max_calls=None)
@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo(object):
def method(self):
return 1
Bar = Foo.options(num_cpus=1, resources=None)
Running remote actors will be terminated when the actor handle to them
in Python is deleted, which will cause them to complete any outstanding
work and then shut down. If you want to kill them immediately, you can
also call ``actor_handle.__ray_kill__()``.
"""
worker = get_global_worker()

View file

@ -96,6 +96,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
RayLog::StartRayLog(app_name.str(), RayLogLevel::INFO, log_dir_);
RayLog::InstallFailureSignalHandler();
}
RAY_LOG(INFO) << "Initializing worker " << worker_context_.GetWorkerID();
// Initialize gcs client.
gcs_client_ = std::make_shared<gcs::RedisGcsClient>(gcs_options);
@ -715,6 +716,13 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
return status;
}
Status CoreWorker::KillActor(const ActorID &actor_id) {
ActorHandle *actor_handle = nullptr;
RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle));
RAY_CHECK(actor_handle->IsDirectCallActor());
return direct_actor_submitter_->KillActor(actor_id);
}
ActorID CoreWorker::DeserializeAndRegisterActorHandle(const std::string &serialized) {
std::unique_ptr<ActorHandle> actor_handle(new ActorHandle(serialized));
const ActorID actor_id = actor_handle->GetActorID();
@ -853,6 +861,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
return_ids.pop_back();
task_type = TaskType::ACTOR_CREATION_TASK;
SetActorId(task_spec.ActorCreationId());
RAY_LOG(INFO) << "Creating actor: " << actor_id_;
} else if (task_spec.IsActorTask()) {
RAY_CHECK(return_ids.size() > 0);
return_ids.pop_back();
@ -1032,6 +1041,24 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques
}
}
void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request,
rpc::KillActorReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ActorID intended_actor_id = ActorID::FromBinary(request.intended_actor_id());
if (intended_actor_id != worker_context_.GetCurrentActorID()) {
std::ostringstream stream;
stream << "Mismatched ActorID: ignoring KillActor for previous actor "
<< intended_actor_id
<< ", current actor ID: " << worker_context_.GetCurrentActorID();
auto msg = stream.str();
RAY_LOG(ERROR) << msg;
send_reply_callback(Status::Invalid(msg), nullptr, nullptr);
return;
}
RAY_LOG(INFO) << "Got KillActor, shutting down...";
Shutdown();
}
void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request,
rpc::GetCoreWorkerStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {

View file

@ -33,6 +33,7 @@
RAY_CORE_WORKER_RPC_HANDLER(PushTask, 9999) \
RAY_CORE_WORKER_RPC_HANDLER(DirectActorCallArgWaitComplete, 100) \
RAY_CORE_WORKER_RPC_HANDLER(GetObjectStatus, 9999) \
RAY_CORE_WORKER_RPC_HANDLER(KillActor, 9999) \
RAY_CORE_WORKER_RPC_HANDLER(GetCoreWorkerStats, 100)
namespace ray {
@ -324,6 +325,12 @@ class CoreWorker {
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids);
/// Tell an actor to exit immediately, without completing outstanding work.
///
/// \param[in] actor_id ID of the actor to kill.
/// \param[out] Status
Status KillActor(const ActorID &actor_id);
/// Add an actor handle from a serialized string.
///
/// This should be called when an actor handle is given to us by another task
@ -406,6 +413,10 @@ class CoreWorker {
rpc::GetObjectStatusReply *reply,
rpc::SendReplyCallback send_reply_callback);
/// Implements gRPC server handler.
void HandleKillActor(const rpc::KillActorRequest &request, rpc::KillActorReply *reply,
rpc::SendReplyCallback send_reply_callback);
/// Get statistics from core worker.
void HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request,
rpc::GetCoreWorkerStatsReply *reply,

View file

@ -1,12 +1,31 @@
#include "ray/core_worker/transport/direct_actor_transport.h"
#include <thread>
#include "ray/common/task/task.h"
#include "ray/core_worker/transport/direct_actor_transport.h"
using ray::rpc::ActorTableData;
namespace ray {
Status CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id) {
absl::MutexLock lock(&mu_);
pending_force_kills_.insert(actor_id);
auto it = rpc_clients_.find(actor_id);
if (it == rpc_clients_.end()) {
// Actor is not yet created, or is being reconstructed, cache the request
// and submit after actor is alive.
// TODO(zhijunfu): it might be possible for a user to specify an invalid
// actor handle (e.g. from unpickling), in that case it might be desirable
// to have a timeout to mark it as invalid if it doesn't show up in the
// specified time.
RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created.";
} else {
SendPendingTasks(actor_id);
}
return Status::OK();
}
Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(DEBUG) << "Submitting task " << task_spec.TaskId();
RAY_CHECK(task_spec.IsActorTask());
@ -101,6 +120,15 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id
void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) {
auto &client = rpc_clients_[actor_id];
RAY_CHECK(client);
// Check if there is a pending force kill. If there is, send it and disconnect the
// client.
if (pending_force_kills_.find(actor_id) != pending_force_kills_.end()) {
rpc::KillActorRequest request;
request.set_intended_actor_id(actor_id.Binary());
RAY_CHECK_OK(client->KillActor(request, nullptr));
pending_force_kills_.erase(actor_id);
}
// Submit all pending requests.
auto &requests = pending_requests_[actor_id];
auto head = requests.begin();

View file

@ -11,6 +11,7 @@
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/id.h"
#include "ray/common/ray_object.h"
@ -48,6 +49,12 @@ class CoreWorkerDirectActorTaskSubmitter {
/// \return Status::Invalid if the task is not yet supported.
Status SubmitTask(TaskSpecification task_spec);
/// Tell this actor to exit immediately.
///
/// \param[in] actor_id The actor_id of the actor to kill.
/// \return Status::Invalid if the actor could not be killed.
Status KillActor(const ActorID &actor_id);
/// Create connection to actor and send all pending tasks.
///
/// \param[in] actor_id Actor ID.
@ -107,6 +114,9 @@ class CoreWorkerDirectActorTaskSubmitter {
/// rpc_clients_ map.
absl::flat_hash_map<ActorID, std::string> worker_ids_ GUARDED_BY(mu_);
/// Set of actor ids that should be force killed once a client is available.
absl::flat_hash_set<ActorID> pending_force_kills_ GUARDED_BY(mu_);
/// Map from actor id to the actor's pending requests. Each actor's requests
/// are ordered by the task number in the request.
absl::flat_hash_map<ActorID, std::map<int64_t, std::unique_ptr<rpc::PushTaskRequest>>>

View file

@ -117,6 +117,14 @@ message GetObjectStatusReply {
ObjectStatus status = 1;
}
message KillActorRequest {
// ID of the actor that is intended to be killed.
bytes intended_actor_id = 1;
}
message KillActorReply {
}
message GetCoreWorkerStatsRequest {
// The ID of the worker this message is intended for.
bytes intended_worker_id = 1;
@ -137,6 +145,8 @@ service CoreWorkerService {
returns (DirectActorCallArgWaitCompleteReply);
// Ask the object's owner about the object's current status.
rpc GetObjectStatus(GetObjectStatusRequest) returns (GetObjectStatusReply);
// Request that the worker shut down without completing outstanding work.
rpc KillActor(KillActorRequest) returns (KillActorReply);
// Get metrics from core workers.
rpc GetCoreWorkerStats(GetCoreWorkerStatsRequest) returns (GetCoreWorkerStatsReply);
}

View file

@ -1,15 +1,15 @@
#ifndef RAY_RPC_CORE_WORKER_CLIENT_H
#define RAY_RPC_CORE_WORKER_CLIENT_H
#include <grpcpp/grpcpp.h>
#include <deque>
#include <memory>
#include <mutex>
#include <thread>
#include <grpcpp/grpcpp.h>
#include "absl/base/thread_annotations.h"
#include "absl/hash/hash.h"
#include "ray/common/status.h"
#include "ray/rpc/client_call.h"
#include "ray/util/logging.h"
@ -121,6 +121,12 @@ class CoreWorkerClientInterface {
return Status::NotImplemented("");
}
/// Tell this actor to exit immediately.
virtual ray::Status KillActor(const KillActorRequest &request,
const ClientCallback<KillActorReply> &callback) {
return Status::NotImplemented("");
}
virtual ray::Status GetCoreWorkerStats(
const GetCoreWorkerStatsRequest &request,
const ClientCallback<GetCoreWorkerStatsReply> &callback) {
@ -203,6 +209,15 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
return call->GetStatus();
}
virtual ray::Status KillActor(const KillActorRequest &request,
const ClientCallback<KillActorReply> &callback) override {
auto call = client_call_manager_
.CreateCall<CoreWorkerService, KillActorRequest, KillActorReply>(
*stub_, &CoreWorkerService::Stub::PrepareAsyncKillActor, request,
callback);
return call->GetStatus();
}
virtual ray::Status GetCoreWorkerStats(
const GetCoreWorkerStatsRequest &request,
const ClientCallback<GetCoreWorkerStatsReply> &callback) override {