Remove actor handle IDs (#5889)

* Remove actor handle ID from main ActorHandle constructor

* Set the actor caller ID when calling submit task instead of in the actor handle

* Remove ActorHandle::Fork, remove actor handle ID from protobuf

* Make inner actor handle const, remove new_actor_handles

* Move caller ID into the common task spec, start refactoring raylet

* Some fixes for forking actor handles

* Store ActorHandle state in CoreWorker, only expose actor ID to Python

* Remove some unused fields

* lint

* doc

* fix merge

* Remove ActorHandleID from python/cpp

* doc

* Fix core worker test

* Move actor table subscription to CoreWorker, reset actor handles on actor failure

* lint

* Remove GCS client from direct actor

* fix tests

* Fix

* Fix tests for raylet codepath

* Fix local mode

* Fix multithreaded test

* Fix AsyncSubscribe issue...

* doc

* fix serve

* Revert bazel
This commit is contained in:
Stephanie Wang 2019-10-17 12:36:34 -04:00 committed by GitHub
parent d70abcfd70
commit 3ac8592dcf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 507 additions and 656 deletions

View file

@ -65,7 +65,6 @@ except ImportError as e:
from ray._raylet import (
ActorCheckpointID,
ActorClassID,
ActorHandleID,
ActorID,
ClientID,
Config as _Config,
@ -154,7 +153,6 @@ __all__ = [
__all__ += [
"ActorCheckpointID",
"ActorClassID",
"ActorHandleID",
"ActorID",
"ClientID",
"JobID",

View file

@ -23,6 +23,7 @@ from libcpp.vector cimport vector as c_vector
from cython.operator import dereference, postincrement
from ray.includes.common cimport (
CActorHandle,
CLanguage,
CRayObject,
CRayStatus,
@ -432,6 +433,13 @@ cdef class CoreWorker:
with nogil:
self.core_worker.get().SetCurrentTaskId(c_task_id)
def set_actor_id(self, ActorID actor_id):
cdef:
CActorID c_actor_id = actor_id.native()
with nogil:
self.core_worker.get().SetActorId(c_actor_id)
def get_current_task_id(self):
return TaskID(self.core_worker.get().GetCurrentTaskId().Binary())
@ -622,6 +630,7 @@ cdef class CoreWorker:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
CTaskID caller_id
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
@ -629,9 +638,11 @@ cdef class CoreWorker:
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
caller_id = self.core_worker.get().GetCallerId()
with nogil:
check_status(self.core_worker.get().Tasks().SubmitTask(
caller_id,
ray_function, args_vector, task_options, &return_ids))
return VectorToObjectIDs(return_ids)
@ -643,12 +654,13 @@ cdef class CoreWorker:
resources,
placement_resources):
cdef:
ActorHandle actor_handle = ActorHandle.__new__(ActorHandle)
unique_ptr[CActorHandle] actor_handle
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[c_string] dynamic_worker_options
unordered_map[c_string, double] c_resources
unordered_map[c_string, double] c_placement_resources
CTaskID caller_id
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
@ -656,30 +668,38 @@ cdef class CoreWorker:
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
caller_id = self.core_worker.get().GetCallerId()
with nogil:
check_status(self.core_worker.get().Tasks().CreateActor(
caller_id,
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, False, c_resources,
c_placement_resources, dynamic_worker_options),
&actor_handle.inner))
&actor_handle))
return actor_handle
actor_id = ActorID(actor_handle.get().GetActorID().Binary())
inserted = self.core_worker.get().AddActorHandle(
move(actor_handle))
assert inserted, "Actor {} already exists".format(actor_id)
return actor_id
def submit_actor_task(self,
ActorHandle handle,
ActorID actor_id,
function_descriptor,
args,
int num_return_vals,
resources):
cdef:
CActorID c_actor_id = actor_id.native()
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
CTaskID caller_id
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
@ -687,10 +707,13 @@ cdef class CoreWorker:
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
caller_id = self.core_worker.get().GetCallerId()
with nogil:
check_status(self.core_worker.get().Tasks().SubmitActorTask(
handle.inner.get()[0], ray_function,
caller_id,
self.core_worker.get().GetActorHandle(c_actor_id),
ray_function,
args_vector, task_options, &return_ids))
return VectorToObjectIDs(return_ids)
@ -702,3 +725,18 @@ cdef class CoreWorker:
return ProfileEvent.make(
self.core_worker.get().CreateProfileEvent(c_event_type),
extra_data)
def deserialize_actor_handle(self, c_string bytes):
cdef:
unique_ptr[CActorHandle] actor_handle
actor_handle.reset(new CActorHandle(bytes))
actor_id = ActorID(actor_handle.get().GetActorID().Binary())
self.core_worker.get().AddActorHandle(move(actor_handle))
return actor_id
def serialize_actor_handle(self, ActorID actor_id):
cdef:
CActorID c_actor_id = actor_id.native()
c_string output
self.core_worker.get().GetActorHandle(c_actor_id).Serialize(&output)
return output

View file

@ -16,7 +16,7 @@ import ray.ray_constants as ray_constants
import ray._raylet
import ray.signature as signature
import ray.worker
from ray import ActorID, ActorHandleID, ActorClassID, profiling
from ray import ActorID, ActorClassID, profiling
logger = logging.getLogger(__name__)
@ -372,9 +372,6 @@ class ActorClass(object):
actor_id = ActorID.from_random()
worker.actors[actor_id] = meta.modified_class(
*copy.deepcopy(args), **copy.deepcopy(kwargs))
core_handle = ray._raylet.ActorHandle(
actor_id, ActorHandleID.nil(), worker.current_job_id,
function_descriptor.get_function_descriptor_list())
else:
# Export the actor.
if (meta.last_export_session_and_job !=
@ -404,13 +401,13 @@ class ActorClass(object):
function_signature = meta.method_signatures[function_name]
creation_args = signature.extend_args(function_signature, args,
kwargs)
core_handle = worker.core_worker.create_actor(
actor_id = worker.core_worker.create_actor(
function_descriptor.get_function_descriptor_list(),
creation_args, meta.max_reconstructions, resources,
actor_placement_resources)
actor_handle = ActorHandle(
core_handle,
actor_id,
meta.modified_class.__module__,
meta.class_name,
meta.actor_method_names,
@ -436,7 +433,7 @@ class ActorHandle(object):
cloudpickle).
Attributes:
_ray_core_handle: Core worker actor handle for this actor.
_ray_actor_id: Actor ID.
_ray_module_name: The module name of this actor.
_ray_actor_method_names: The names of the actor methods.
_ray_method_decorators: Optional decorators for the function
@ -454,7 +451,7 @@ class ActorHandle(object):
"""
def __init__(self,
core_handle,
actor_id,
module_name,
class_name,
actor_method_names,
@ -464,7 +461,7 @@ class ActorHandle(object):
actor_method_cpus,
session_and_job,
original_handle=False):
self._ray_core_handle = core_handle
self._ray_actor_id = actor_id
self._ray_module_name = module_name
self._ray_original_handle = original_handle
self._ray_actor_method_names = actor_method_names
@ -518,7 +515,7 @@ class ActorHandle(object):
function, function_descriptor, args, num_return_vals)
else:
object_ids = worker.core_worker.submit_actor_task(
self._ray_core_handle,
self._ray_actor_id,
function_descriptor.get_function_descriptor_list(), args,
num_return_vals, {"CPU": self._ray_actor_method_cpus})
@ -579,8 +576,8 @@ class ActorHandle(object):
# and we don't need to send `__ray_terminate__` again.
logger.warning(
"Actor is garbage collected in the wrong driver." +
" Actor id = %s, class name = %s.",
self._ray_core_handle.actor_id(), self._ray_class_name)
" Actor id = %s, class name = %s.", self._ray_actor_id,
self._ray_class_name)
return
if worker.connected and self._ray_original_handle:
# TODO(rkn): Should we be passing in the actor cursor as a
@ -589,11 +586,7 @@ class ActorHandle(object):
@property
def _actor_id(self):
return self._ray_core_handle.actor_id()
@property
def _actor_handle_id(self):
return self._ray_core_handle.actor_handle_id()
return self._ray_actor_id
def _serialization_helper(self, ray_forking):
"""This is defined in order to make pickling work.
@ -605,8 +598,13 @@ class ActorHandle(object):
Returns:
A dictionary of the information needed to reconstruct the object.
"""
worker = ray.worker.get_global_worker()
worker.check_connected()
state = {
"core_handle": self._ray_core_handle.fork(ray_forking).to_bytes(),
# Local mode just uses the actor ID.
"core_handle": worker.core_worker.serialize_actor_handle(
self._ray_actor_id)
if hasattr(worker, "core_worker") else self._ray_actor_id,
"module_name": self._ray_module_name,
"class_name": self._ray_class_name,
"actor_method_names": self._ray_actor_method_names,
@ -632,8 +630,9 @@ class ActorHandle(object):
self.__init__(
# TODO(swang): Accessing the worker's current task ID is not
# thread-safe.
ray._raylet.ActorHandle.from_bytes(state["core_handle"],
worker.current_task_id),
# Local mode just uses the actor ID.
worker.core_worker.deserialize_actor_handle(state["core_handle"])
if hasattr(worker, "core_worker") else state["core_handle"],
state["module_name"],
state["class_name"],
state["actor_method_names"],

View file

@ -27,11 +27,11 @@ class MetricMonitor:
return True
def add_target(self, target_handle):
hex_id = target_handle._ray_core_handle.actor_id().hex()
hex_id = target_handle._actor_id.hex()
self.actor_handles[hex_id] = target_handle
def remove_target(self, target_handle):
hex_id = target_handle._ray_core_handle.actor_id().hex()
hex_id = target_handle._actor_id.hex()
self.actor_handles.pop(hex_id)
def scrape(self):

View file

@ -8,7 +8,6 @@ from libcpp.vector cimport vector as c_vector
from ray.includes.unique_ids cimport (
CActorID,
CActorHandleID,
CJobID,
CWorkerID,
CObjectID,
@ -199,19 +198,9 @@ cdef extern from "ray/core_worker/task_interface.h" nogil:
const c_vector[c_string] &dynamic_worker_options)
cdef cppclass CActorHandle "ray::ActorHandle":
CActorHandle(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CJobID &job_id, const CObjectID &initial_cursor,
const CLanguage actor_language, c_bool is_direct_call,
const c_vector[c_string] &actor_creation_task_function_descriptor)
CActorHandle(CActorHandle &other, c_bool in_band)
CActorHandle(
const c_string &serialized, const CTaskID &current_task_id)
CActorHandle(const c_string &serialized)
CActorID GetActorID() const
CActorHandleID GetActorHandleID() const
unique_ptr[CActorHandle] Fork()
unique_ptr[CActorHandle] ForkForSerialization()
void Serialize(c_string *output)
cdef extern from "ray/gcs/gcs_client_interface.h" nogil:

View file

@ -3,7 +3,6 @@ from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from ray.includes.common cimport (
CActorHandle,
CGcsClientOptions,
)
@ -25,48 +24,3 @@ cdef class GcsClientOptions:
cdef CGcsClientOptions* native(self):
return <CGcsClientOptions*>(self.inner.get())
cdef class ActorHandle:
"""Cython wrapper class of C++ `ray::ActorHandle`."""
cdef:
unique_ptr[CActorHandle] inner
def __init__(self, ActorID actor_id, ActorHandleID actor_handle_id,
JobID job_id, list creation_function_descriptor):
cdef:
c_vector[c_string] c_descriptor
ObjectID cursor = ObjectID.from_random()
c_descriptor = string_vector_from_list(creation_function_descriptor)
self.inner.reset(new CActorHandle(
actor_id.native(), actor_handle_id.native(), job_id.native(),
cursor.native(), LANGUAGE_PYTHON, False, c_descriptor))
def fork(self, c_bool ray_forking):
cdef:
ActorHandle other = ActorHandle.__new__(ActorHandle)
if ray_forking:
other.inner = self.inner.get().Fork()
else:
other.inner = self.inner.get().ForkForSerialization()
return other
@staticmethod
def from_bytes(c_string bytes, TaskID current_task_id):
cdef:
ActorHandle self = ActorHandle.__new__(ActorHandle)
self.inner.reset(new CActorHandle(bytes, current_task_id.native()))
return self
def to_bytes(self):
cdef:
c_string output
self.inner.get().Serialize(&output)
return output
def actor_id(self):
return ActorID(self.inner.get().GetActorID().Binary())
def actor_handle_id(self):
return ActorHandleID(self.inner.get().GetActorHandleID().Binary())

View file

@ -5,6 +5,7 @@ from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from ray.includes.unique_ids cimport (
CActorID,
CJobID,
CTaskID,
CObjectID,
@ -32,13 +33,16 @@ cdef extern from "ray/core_worker/profiling.h" nogil:
cdef extern from "ray/core_worker/task_interface.h" namespace "ray" nogil:
cdef cppclass CTaskSubmissionInterface "CoreWorkerTaskInterface":
CRayStatus SubmitTask(
const CTaskID &caller_id,
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids)
CRayStatus CreateActor(
const CTaskID &caller_id,
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CActorCreationOptions &options,
unique_ptr[CActorHandle] *handle)
CRayStatus SubmitActorTask(
const CTaskID &caller_id,
CActorHandle &handle, const CRayFunction &function,
const c_vector[CTaskArg] &args, const CTaskOptions &options,
c_vector[CObjectID] *return_ids)
@ -87,3 +91,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
void SetCurrentJobId(const CJobID &job_id)
CTaskID GetCurrentTaskId()
void SetCurrentTaskId(const CTaskID &task_id)
void SetActorId(const CActorID &actor_id)
const CActorID &GetActorId()
CTaskID GetCallerId()
c_bool AddActorHandle(unique_ptr[CActorHandle] handle)
CActorHandle &GetActorHandle(const CActorID &actor_id)

View file

@ -10,7 +10,6 @@ from ray.includes.common cimport (
ResourceSet,
)
from ray.includes.unique_ids cimport (
CActorHandleID,
CActorID,
CJobID,
CObjectID,
@ -71,10 +70,8 @@ cdef extern from "ray/common/task/task_spec.h" nogil:
CObjectID PreviousActorTaskDummyObjectId() const
uint64_t MaxActorReconstructions() const
CActorID ActorId() const
CActorHandleID ActorHandleId() const
uint64_t ActorCounter() const
CObjectID ActorDummyObject() const
c_vector[CActorHandleID] NewActorHandles() const
cdef extern from "ray/common/task/task_execution_spec.h" nogil:

View file

@ -68,11 +68,6 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
CActorID Of(CJobID job_id, CTaskID parent_task_id,
int64_t parent_task_counter)
cdef cppclass CActorHandleID "ray::ActorHandleID"(CUniqueID):
@staticmethod
CActorHandleID FromBinary(const c_string &binary)
cdef cppclass CClientID "ray::ClientID"(CUniqueID):
@staticmethod

View file

@ -11,7 +11,6 @@ import os
from ray.includes.unique_ids cimport (
CActorCheckpointID,
CActorClassID,
CActorHandleID,
CActorID,
CClientID,
CConfigID,
@ -343,16 +342,6 @@ cdef class ActorID(BaseID):
return self.data.Hash()
cdef class ActorHandleID(UniqueID):
def __init__(self, id):
check_id(id)
self.data = CActorHandleID.FromBinary(<c_string>id)
cdef CActorHandleID native(self):
return <CActorHandleID>self.data
cdef class ActorCheckpointID(UniqueID):
def __init__(self, id):
@ -385,7 +374,6 @@ cdef class ActorClassID(UniqueID):
_ID_TYPES = [
ActorCheckpointID,
ActorClassID,
ActorHandleID,
ActorID,
ClientID,
JobID,

View file

@ -1447,14 +1447,14 @@ def test_multithreading(ray_start_2_cpus):
time.sleep(delay_ms / 1000.0)
return value
@ray.remote
class Echo(object):
def echo(self, value):
return value
def test_api_in_multi_threads():
"""Test using Ray api in multiple threads."""
@ray.remote
class Echo(object):
def echo(self, value):
return value
# Test calling remote functions in multiple threads.
def test_remote_call():
value = random.randint(0, 1000000)

View file

@ -867,7 +867,9 @@ class Worker(object):
# TODO(rkn): It would be preferable for actor creation tasks to share
# more of the code path with regular task execution.
if task.is_actor_creation_task():
# TODO: Remove Worker.actor_id and just use CoreWorker.GetActorId.
self.actor_id = task.actor_creation_id()
self.core_worker.set_actor_id(task.actor_creation_id())
self.actor_creation_task_id = task.task_id()
actor_class = self.function_actor_manager.load_actor_class(
job_id, function_descriptor)

View file

@ -346,39 +346,6 @@ ObjectID ObjectID::GenerateObjectId(const std::string &task_id_binary,
return ret;
}
const ActorHandleID ComputeForkedActorHandleId(const ActorHandleID &actor_handle_id,
int64_t num_forks) {
// Compute hashes.
SHA256_CTX ctx;
sha256_init(&ctx);
sha256_update(&ctx, reinterpret_cast<const BYTE *>(actor_handle_id.Data()),
actor_handle_id.Size());
sha256_update(&ctx, reinterpret_cast<const BYTE *>(&num_forks), sizeof(num_forks));
// Compute the final actor handle ID from the hash.
BYTE buff[DIGEST_SIZE];
sha256_final(&ctx, buff);
RAY_CHECK(DIGEST_SIZE >= ActorHandleID::Size());
return ActorHandleID::FromBinary(std::string(buff, buff + ActorHandleID::Size()));
}
const ActorHandleID ComputeSerializedActorHandleId(const ActorHandleID &actor_handle_id,
const TaskID &current_task_id) {
// Compute hashes.
SHA256_CTX ctx;
sha256_init(&ctx);
sha256_update(&ctx, reinterpret_cast<const BYTE *>(actor_handle_id.Data()),
actor_handle_id.Size());
sha256_update(&ctx, reinterpret_cast<const BYTE *>(current_task_id.Data()),
current_task_id.Size());
// Compute the final actor handle ID from the hash.
BYTE buff[DIGEST_SIZE];
sha256_final(&ctx, buff);
RAY_CHECK(DIGEST_SIZE >= ActorHandleID::Size());
return ActorHandleID::FromBinary(std::string(buff, buff + ActorHandleID::Size()));
}
JobID JobID::FromInt(uint32_t value) {
std::vector<uint8_t> data(JobID::Size(), 0);
std::memcpy(data.data(), &value, JobID::Size());

View file

@ -369,23 +369,6 @@ std::ostream &operator<<(std::ostream &os, const ObjectID &id);
// Restore the compiler alignment to default (8 bytes).
#pragma pack(pop)
/// Compute an actor handle ID for a newly forked actor handle.
///
/// \param actor_handle_id The actor handle ID of the existing actor handle.
/// \param num_forks The number of forks of the existing actor handle.
/// \return Generated actor handle ID.
const ActorHandleID ComputeForkedActorHandleId(const ActorHandleID &actor_handle_id,
int64_t num_forks);
/// Compute an actor handle ID for a new actor handle created by an
/// out-of-band serialization mechanism.
///
/// \param actor_handle_id The actor handle ID of the existing actor handle.
/// \param current_task_id The current task ID.
/// \return Generated actor handle ID.
const ActorHandleID ComputeSerializedActorHandleId(const ActorHandleID &actor_handle_id,
const TaskID &current_task_id);
template <typename T>
BaseID<T>::BaseID() {
// Using const_cast to directly change data is dangerous. The cached

View file

@ -6,7 +6,6 @@
DEFINE_UNIQUE_ID(FunctionID)
DEFINE_UNIQUE_ID(ActorClassID)
DEFINE_UNIQUE_ID(ActorHandleID)
DEFINE_UNIQUE_ID(ActorCheckpointID)
DEFINE_UNIQUE_ID(WorkerID)
DEFINE_UNIQUE_ID(ConfigID)

View file

@ -151,6 +151,10 @@ std::vector<std::string> TaskSpecification::DynamicWorkerOptions() const {
message_->actor_creation_task_spec().dynamic_worker_options());
}
TaskID TaskSpecification::CallerId() const {
return TaskID::FromBinary(message_->caller_id());
}
// === Below are getter methods specific to actor tasks.
ActorID TaskSpecification::ActorId() const {
@ -158,11 +162,6 @@ ActorID TaskSpecification::ActorId() const {
return ActorID::FromBinary(message_->actor_task_spec().actor_id());
}
ActorHandleID TaskSpecification::ActorHandleId() const {
RAY_CHECK(IsActorTask());
return ActorHandleID::FromBinary(message_->actor_task_spec().actor_handle_id());
}
uint64_t TaskSpecification::ActorCounter() const {
RAY_CHECK(IsActorTask());
return message_->actor_task_spec().actor_counter();
@ -185,12 +184,6 @@ ObjectID TaskSpecification::ActorDummyObject() const {
return ReturnId(NumReturns() - 1);
}
std::vector<ActorHandleID> TaskSpecification::NewActorHandles() const {
RAY_CHECK(IsActorTask());
return IdVectorFromProtobuf<ActorHandleID>(
message_->actor_task_spec().new_actor_handles());
}
bool TaskSpecification::IsDirectCall() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().is_direct_call();
@ -224,8 +217,8 @@ std::string TaskSpecification::DebugString() const {
} else if (IsActorTask()) {
// Print actor task spec.
stream << ", actor_task_spec={actor_id=" << ActorId()
<< ", actor_handle_id=" << ActorHandleId()
<< ", actor_counter=" << ActorCounter() << "}";
<< ", actor_caller_id=" << CallerId() << ", actor_counter=" << ActorCounter()
<< "}";
}
return stream.str();

View file

@ -132,7 +132,7 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
ActorID ActorId() const;
ActorHandleID ActorHandleId() const;
TaskID CallerId() const;
uint64_t ActorCounter() const;
@ -140,8 +140,6 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
ObjectID PreviousActorTaskDummyObjectId() const;
std::vector<ActorHandleID> NewActorHandles() const;
bool IsDirectCall() const;
ObjectID ActorDummyObject() const;

View file

@ -26,7 +26,8 @@ class TaskSpecBuilder {
TaskSpecBuilder &SetCommonTaskSpec(
const TaskID &task_id, const Language &language,
const std::vector<std::string> &function_descriptor, const JobID &job_id,
const TaskID &parent_task_id, uint64_t parent_counter, uint64_t num_returns,
const TaskID &parent_task_id, uint64_t parent_counter, const TaskID &caller_id,
uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources) {
message_->set_type(TaskType::NORMAL_TASK);
@ -38,6 +39,7 @@ class TaskSpecBuilder {
message_->set_task_id(task_id.Binary());
message_->set_parent_task_id(parent_task_id.Binary());
message_->set_parent_counter(parent_counter);
message_->set_caller_id(caller_id.Binary());
message_->set_num_returns(num_returns);
message_->mutable_required_resources()->insert(required_resources.begin(),
required_resources.end());
@ -107,23 +109,18 @@ class TaskSpecBuilder {
/// See `common.proto` for meaning of the arguments.
///
/// \return Reference to the builder object itself.
TaskSpecBuilder &SetActorTaskSpec(
const ActorID &actor_id, const ActorHandleID &actor_handle_id,
const ObjectID &actor_creation_dummy_object_id,
const ObjectID &previous_actor_task_dummy_object_id, uint64_t actor_counter,
const std::vector<ActorHandleID> &new_handle_ids = {}) {
TaskSpecBuilder &SetActorTaskSpec(const ActorID &actor_id,
const ObjectID &actor_creation_dummy_object_id,
const ObjectID &previous_actor_task_dummy_object_id,
uint64_t actor_counter) {
message_->set_type(TaskType::ACTOR_TASK);
auto actor_spec = message_->mutable_actor_task_spec();
actor_spec->set_actor_id(actor_id.Binary());
actor_spec->set_actor_handle_id(actor_handle_id.Binary());
actor_spec->set_actor_creation_dummy_object_id(
actor_creation_dummy_object_id.Binary());
actor_spec->set_previous_actor_task_dummy_object_id(
previous_actor_task_dummy_object_id.Binary());
actor_spec->set_actor_counter(actor_counter);
for (const auto &id : new_handle_ids) {
actor_spec->add_new_actor_handles(id.Binary());
}
return *this;
}

View file

@ -2,84 +2,44 @@
#include "ray/core_worker/actor_handle.h"
namespace {
ray::rpc::ActorHandle CreateInnerActorHandle(
const class ActorID &actor_id, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call,
const std::vector<std::string> &actor_creation_task_function_descriptor) {
ray::rpc::ActorHandle inner;
inner.set_actor_id(actor_id.Data(), actor_id.Size());
inner.set_creation_job_id(job_id.Data(), job_id.Size());
inner.set_actor_language(actor_language);
*inner.mutable_actor_creation_task_function_descriptor() = {
actor_creation_task_function_descriptor.begin(),
actor_creation_task_function_descriptor.end()};
inner.set_actor_cursor(initial_cursor.Binary());
inner.set_is_direct_call(is_direct_call);
return inner;
}
ray::rpc::ActorHandle CreateInnerActorHandleFromString(const std::string &serialized) {
ray::rpc::ActorHandle inner;
inner.ParseFromString(serialized);
return inner;
}
} // namespace
namespace ray {
ActorHandle::ActorHandle(
const class ActorID &actor_id, const class ActorHandleID &actor_handle_id,
const class JobID &job_id, const ObjectID &initial_cursor,
const Language actor_language, bool is_direct_call,
const std::vector<std::string> &actor_creation_task_function_descriptor) {
inner_.set_actor_id(actor_id.Data(), actor_id.Size());
inner_.set_actor_handle_id(actor_handle_id.Data(), actor_handle_id.Size());
inner_.set_creation_job_id(job_id.Data(), job_id.Size());
inner_.set_actor_language(actor_language);
*inner_.mutable_actor_creation_task_function_descriptor() = {
actor_creation_task_function_descriptor.begin(),
actor_creation_task_function_descriptor.end()};
inner_.set_actor_cursor(initial_cursor.Binary());
inner_.set_is_direct_call(is_direct_call);
// Increment the task counter to account for the actor creation task.
task_counter_++;
}
const class ActorID &actor_id, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call,
const std::vector<std::string> &actor_creation_task_function_descriptor)
: ActorHandle(CreateInnerActorHandle(actor_id, job_id, initial_cursor, actor_language,
is_direct_call,
actor_creation_task_function_descriptor)) {}
std::unique_ptr<ActorHandle> ActorHandle::Fork() {
std::unique_lock<std::mutex> guard(mutex_);
std::unique_ptr<ActorHandle> child =
std::unique_ptr<ActorHandle>(new ActorHandle(inner_));
child->inner_ = inner_;
const class ActorHandleID new_actor_handle_id =
ComputeForkedActorHandleId(GetActorHandleID(), num_forks_++);
// Notify the backend to expect this new actor handle. The backend will
// not release the cursor for any new handles until the first task for
// each of the new handles is submitted.
// NOTE(swang): There is currently no garbage collection for actor
// handles until the actor itself is removed.
new_actor_handles_.push_back(new_actor_handle_id);
guard.unlock();
child->inner_.set_actor_handle_id(new_actor_handle_id.Data(),
new_actor_handle_id.Size());
return child;
}
std::unique_ptr<ActorHandle> ActorHandle::ForkForSerialization() {
std::unique_lock<std::mutex> guard(mutex_);
std::unique_ptr<ActorHandle> child =
std::unique_ptr<ActorHandle>(new ActorHandle(inner_));
child->inner_ = inner_;
// The execution dependency for a serialized actor handle is never safe
// to release, since it could be deserialized and submit another
// dependent task at any time. Therefore, we notify the backend of a
// random handle ID that will never actually be used.
new_actor_handles_.push_back(ActorHandleID::FromRandom());
guard.unlock();
// We set the actor handle ID to nil to signal that this actor handle was
// created by an out-of-band fork. A new actor handle ID will be computed
// when the handle is deserialized.
const class ActorHandleID new_actor_handle_id = ActorHandleID::Nil();
child->inner_.set_actor_handle_id(new_actor_handle_id.Data(),
new_actor_handle_id.Size());
return child;
}
ActorHandle::ActorHandle(const std::string &serialized, const TaskID &current_task_id) {
inner_.ParseFromString(serialized);
// If the actor handle ID is nil, this serialized handle was created by an out-of-band
// mechanism (see fork constructor above), so we compute a new actor handle ID.
// TODO(pcm): This still leads to a lot of actor handles being
// created, there should be a better way to handle serialized
// actor handles.
// TODO(swang): Deserializing the same actor handle twice in the same
// task will break the application, and deserializing it twice in the
// same actor is likely a performance bug. We should consider
// logging a warning in these cases.
if (ActorHandleID::FromBinary(inner_.actor_handle_id()).IsNil()) {
const class ActorHandleID new_actor_handle_id = ComputeSerializedActorHandleId(
ActorHandleID::FromBinary(inner_.actor_handle_id()), current_task_id);
inner_.set_actor_handle_id(new_actor_handle_id.Data(), new_actor_handle_id.Size());
}
}
ActorHandle::ActorHandle(const std::string &serialized)
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}
void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder,
const TaskTransportType transport_type,
@ -90,18 +50,18 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder,
const ObjectID actor_creation_dummy_object_id =
ObjectID::ForTaskReturn(actor_creation_task_id, /*index=*/1,
/*transport_type=*/static_cast<int>(transport_type));
builder.SetActorTaskSpec(GetActorID(), GetActorHandleID(),
actor_creation_dummy_object_id,
/*previous_actor_task_dummy_object_id=*/ActorCursor(),
task_counter_++, new_actor_handles_);
inner_.set_actor_cursor(new_cursor.Binary());
new_actor_handles_.clear();
builder.SetActorTaskSpec(GetActorID(), actor_creation_dummy_object_id,
/*previous_actor_task_dummy_object_id=*/actor_cursor_,
task_counter_++);
actor_cursor_ = new_cursor;
}
void ActorHandle::Serialize(std::string *output) {
void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); }
void ActorHandle::Reset() {
std::unique_lock<std::mutex> guard(mutex_);
inner_.SerializeToString(output);
task_counter_ = 0;
actor_cursor_ = ObjectID::FromBinary(inner_.actor_cursor());
}
} // namespace ray

View file

@ -13,33 +13,20 @@ namespace ray {
class ActorHandle {
public:
ActorHandle(ray::rpc::ActorHandle inner) : inner_(inner) {}
ActorHandle(ray::rpc::ActorHandle inner)
: inner_(inner), actor_cursor_(ObjectID::FromBinary(inner_.actor_cursor())) {}
// Constructs a new ActorHandle as part of the actor creation process.
ActorHandle(const ActorID &actor_id, const ActorHandleID &actor_handle_id,
const JobID &job_id, const ObjectID &initial_cursor,
const Language actor_language, bool is_direct_call,
ActorHandle(const ActorID &actor_id, const JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
bool is_direct_call,
const std::vector<std::string> &actor_creation_task_function_descriptor);
/// Constructs an ActorHandle from a serialized string.
ActorHandle(const std::string &serialized, const TaskID &current_task_id);
/// Forks a child ActorHandle. This will modify the handle to account for the newly
/// forked child handle. This should only be used for forks that are part of a Ray
/// API call (e.g., passing an actor handle into a remote function).
std::unique_ptr<ActorHandle> Fork();
/// Forks a child ActorHandle. This will *not* modify the handle to account for the
/// newly forked child handle. This should be used by application-level code for
/// serialization in order to pass an actor handle for uses not covered by the Ray API.
std::unique_ptr<ActorHandle> ForkForSerialization();
ActorHandle(const std::string &serialized);
ActorID GetActorID() const { return ActorID::FromBinary(inner_.actor_id()); };
ActorHandleID GetActorHandleID() const {
return ActorHandleID::FromBinary(inner_.actor_handle_id());
};
/// ID of the job that created the actor (it is possible that the handle
/// exists on a job with a different job ID).
JobID CreationJobID() const { return JobID::FromBinary(inner_.creation_job_id()); };
@ -50,8 +37,6 @@ class ActorHandle {
return VectorFromProtobuf(inner_.actor_creation_task_function_descriptor());
};
ObjectID ActorCursor() const { return ObjectID::FromBinary(inner_.actor_cursor()); }
bool IsDirectCallActor() const { return inner_.is_direct_call(); }
void SetActorTaskSpec(TaskSpecBuilder &builder, const TaskTransportType transport_type,
@ -59,22 +44,27 @@ class ActorHandle {
void Serialize(std::string *output);
/// Reset the handle state next task submitted.
///
/// This should be called whenever the actor is restarted, since the new
/// instance of the actor does not have the previous sequence number.
/// TODO: We should also move the other actor state (status and IP) inside
/// ActorHandle and reset them in this method.
void Reset();
private:
// Protobuf-defined persistent state of the actor handle.
ray::rpc::ActorHandle inner_;
// Number of times this handle has been forked.
uint64_t num_forks_ = 0;
const ray::rpc::ActorHandle inner_;
/// The unique id of the dummy object returned by the previous task.
/// TODO: This can be removed once we schedule actor tasks by task counter
/// only.
// TODO: Save this state in the core worker.
ObjectID actor_cursor_;
// Number of tasks that have been submitted on this handle.
uint64_t task_counter_ = 0;
/// The new actor handles that were created from this handle
/// since the last task on this handle was submitted. This is
/// used to garbage-collect dummy objects that are no longer
/// necessary in the backend.
std::vector<ray::ActorHandleID> new_actor_handles_;
/// Guards actor_cursor_ and task_counter_.
std::mutex mutex_;
FRIEND_TEST(ZeroNodeTest, TestActorHandle);

View file

@ -49,7 +49,7 @@ CoreWorker::CoreWorker(
std::unique_ptr<CoreWorkerObjectInterface>(new CoreWorkerObjectInterface(
worker_context_, raylet_client_, store_socket, use_memory_store));
task_interface_ = std::unique_ptr<CoreWorkerTaskInterface>(new CoreWorkerTaskInterface(
worker_context_, raylet_client_, *object_interface_, io_service_, *gcs_client_));
worker_context_, raylet_client_, *object_interface_, io_service_));
// Initialize task execution.
int rpc_server_port = 0;
@ -90,7 +90,7 @@ CoreWorker::CoreWorker(
builder.SetCommonTaskSpec(task_id, language_, empty_descriptor,
worker_context_.GetCurrentJobID(),
TaskID::ComputeDriverTaskId(worker_context_.GetWorkerID()),
0, 0, empty_resources, empty_resources);
0, GetCallerId(), 0, empty_resources, empty_resources);
std::shared_ptr<gcs::TaskTableData> data = std::make_shared<gcs::TaskTableData>();
data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage());
@ -127,4 +127,71 @@ std::unique_ptr<worker::ProfileEvent> CoreWorker::CreateProfileEvent(
new worker::ProfileEvent(profiler_, event_type));
}
void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
worker_context_.SetCurrentTaskId(task_id);
main_thread_task_id_ = task_id;
// Clear all actor handles at the end of each non-actor task.
if (actor_id_.IsNil() && task_id.IsNil()) {
for (const auto &handle : actor_handles_) {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr));
}
actor_handles_.clear();
}
}
TaskID CoreWorker::GetCallerId() const {
TaskID caller_id;
ActorID actor_id = GetActorId();
if (!actor_id.IsNil()) {
caller_id = TaskID::ForActorCreationTask(actor_id);
} else {
caller_id = main_thread_task_id_;
}
return caller_id;
}
bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle) {
const auto &actor_id = actor_handle->GetActorID();
auto inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second;
if (inserted) {
// Register a callback to handle actor notifications.
auto actor_notification_callback = [this](const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
if (it->second->IsDirectCallActor()) {
// We have to reset the actor handle since the next instance of the
// actor will not have the last sequence number that we sent.
// TODO: Remove the check for direct calls. We do not reset for the
// raylet codepath because it tries to replay all tasks since the
// last actor checkpoint.
it->second->Reset();
}
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id, nullptr));
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors.
}
task_interface_->HandleDirectActorUpdate(actor_id, actor_data);
RAY_LOG(INFO) << "received notification on actor, state="
<< static_cast<int>(actor_data.state()) << ", actor_id: " << actor_id
<< ", ip address: " << actor_data.ip_address()
<< ", port: " << actor_data.port();
};
RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe(
actor_id, actor_notification_callback, nullptr));
}
return inserted;
}
ActorHandle &CoreWorker::GetActorHandle(const ActorID &actor_id) {
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
return *it->second;
}
} // namespace ray

View file

@ -82,10 +82,42 @@ class CoreWorker {
void SetCurrentJobId(const JobID &job_id) { worker_context_.SetCurrentJobId(job_id); }
// TODO(edoakes): remove this once Python core worker uses the task interfaces.
void SetCurrentTaskId(const TaskID &task_id) {
worker_context_.SetCurrentTaskId(task_id);
void SetCurrentTaskId(const TaskID &task_id);
void SetActorId(const ActorID &actor_id) {
RAY_CHECK(actor_id_.IsNil());
actor_id_ = actor_id;
}
const ActorID &GetActorId() const { return actor_id_; }
/// Get the caller ID used to submit tasks from this worker to an actor.
///
/// \return The caller ID. For non-actor tasks, this is the current task ID.
/// For actors, this is the current actor ID. To make sure that all caller
/// IDs have the same type, we embed the actor ID in a TaskID with the rest
/// of the bytes zeroed out.
TaskID GetCallerId() const;
/// Give this worker a handle to an actor.
///
/// This handle will remain as long as the current actor or task is
/// executing, even if the Python handle goes out of scope. Tasks submitted
/// through this handle are guaranteed to execute in the same order in which
/// they are submitted.
///
/// \param actor_handle The handle to the actor.
/// \return True if the handle was added and False if we already had a handle
/// to the same actor.
bool AddActorHandle(std::unique_ptr<ActorHandle> actor_handle);
/// Get a handle to an actor. This asserts that the worker actually has this
/// handle.
///
/// \param actor_id The actor handle to get.
/// \return A handle to the requested actor.
ActorHandle &GetActorHandle(const ActorID &actor_id);
private:
void StartIOService();
@ -94,6 +126,12 @@ class CoreWorker {
const std::string raylet_socket_;
const std::string log_dir_;
WorkerContext worker_context_;
/// The ID of the current task being executed by the main thread. If there
/// are multiple threads, they will have a thread-local task ID stored in the
/// worker context.
TaskID main_thread_task_id_;
/// Our actor ID. If this is nil, then we execute only stateless tasks.
ActorID actor_id_;
/// Event loop where the IO events are handled. e.g. async GCS operations.
boost::asio::io_service io_service_;
@ -107,6 +145,9 @@ class CoreWorker {
std::unique_ptr<CoreWorkerTaskInterface> task_interface_;
std::unique_ptr<CoreWorkerObjectInterface> object_interface_;
/// Map from actor ID to a handle to that actor.
std::unordered_map<ActorID, std::unique_ptr<ActorHandle>> actor_handles_;
/// Only available if it's not a driver.
std::unique_ptr<CoreWorkerTaskExecutionInterface> task_execution_interface_;
};

View file

@ -8,32 +8,30 @@ namespace ray {
CoreWorkerTaskInterface::CoreWorkerTaskInterface(
WorkerContext &worker_context, std::unique_ptr<RayletClient> &raylet_client,
CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service,
gcs::RedisGcsClient &gcs_client)
CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service)
: worker_context_(worker_context) {
task_submitters_.emplace(TaskTransportType::RAYLET,
std::unique_ptr<CoreWorkerRayletTaskSubmitter>(
new CoreWorkerRayletTaskSubmitter(raylet_client)));
task_submitters_.emplace(
TaskTransportType::DIRECT_ACTOR,
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(
io_service, gcs_client,
object_interface.CreateStoreProvider(StoreProviderType::MEMORY))));
task_submitters_.emplace(TaskTransportType::DIRECT_ACTOR,
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(
io_service, object_interface.CreateStoreProvider(
StoreProviderType::MEMORY))));
}
void CoreWorkerTaskInterface::BuildCommonTaskSpec(
TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
const int task_index, const RayFunction &function, const std::vector<TaskArg> &args,
uint64_t num_returns,
const int task_index, const TaskID &caller_id, const RayFunction &function,
const std::vector<TaskArg> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
TaskTransportType transport_type, std::vector<ObjectID> *return_ids) {
// Build common task spec.
builder.SetCommonTaskSpec(task_id, function.GetLanguage(),
function.GetFunctionDescriptor(), job_id,
worker_context_.GetCurrentTaskID(), task_index, num_returns,
required_resources, required_placement_resources);
builder.SetCommonTaskSpec(
task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
worker_context_.GetCurrentTaskID(), task_index, caller_id, num_returns,
required_resources, required_placement_resources);
// Set task arguments.
for (const auto &arg : args) {
if (arg.IsPassedByReference()) {
@ -52,7 +50,8 @@ void CoreWorkerTaskInterface::BuildCommonTaskSpec(
}
}
Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
Status CoreWorkerTaskInterface::SubmitTask(const TaskID &caller_id,
const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids) {
@ -62,14 +61,15 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
TaskID::ForNormalTask(worker_context_.GetCurrentJobID(),
worker_context_.GetCurrentTaskID(), next_task_index);
BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id,
next_task_index, function, args, task_options.num_returns,
task_options.resources, {}, TaskTransportType::RAYLET, return_ids);
next_task_index, caller_id, function, args,
task_options.num_returns, task_options.resources, {},
TaskTransportType::RAYLET, return_ids);
return task_submitters_[TaskTransportType::RAYLET]->SubmitTask(builder.Build());
}
Status CoreWorkerTaskInterface::CreateActor(
const RayFunction &function, const std::vector<TaskArg> &args,
const ActorCreationOptions &actor_creation_options,
const TaskID &caller_id, const RayFunction &function,
const std::vector<TaskArg> &args, const ActorCreationOptions &actor_creation_options,
std::unique_ptr<ActorHandle> *actor_handle) {
const int next_task_index = worker_context_.GetNextTaskIndex();
const ActorID actor_id =
@ -79,8 +79,8 @@ Status CoreWorkerTaskInterface::CreateActor(
const JobID job_id = worker_context_.GetCurrentJobID();
std::vector<ObjectID> return_ids;
TaskSpecBuilder builder;
BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, next_task_index, function,
args, 1, actor_creation_options.resources,
BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, next_task_index, caller_id,
function, args, 1, actor_creation_options.resources,
actor_creation_options.placement_resources,
TaskTransportType::RAYLET, &return_ids);
builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions,
@ -88,14 +88,14 @@ Status CoreWorkerTaskInterface::CreateActor(
actor_creation_options.is_direct_call);
*actor_handle = std::unique_ptr<ActorHandle>(new ActorHandle(
actor_id, ActorHandleID::Nil(), job_id, /*actor_cursor=*/return_ids[0],
function.GetLanguage(), actor_creation_options.is_direct_call,
function.GetFunctionDescriptor()));
actor_id, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(),
actor_creation_options.is_direct_call, function.GetFunctionDescriptor()));
return task_submitters_[TaskTransportType::RAYLET]->SubmitTask(builder.Build());
}
Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
Status CoreWorkerTaskInterface::SubmitActorTask(const TaskID &caller_id,
ActorHandle &actor_handle,
const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
@ -114,7 +114,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(),
next_task_index, actor_handle.GetActorID());
BuildCommonTaskSpec(builder, actor_handle.CreationJobID(), actor_task_id,
next_task_index, function, args, num_returns,
next_task_index, caller_id, function, args, num_returns,
task_options.resources, {}, transport_type, return_ids);
const ObjectID new_cursor = return_ids->back();
@ -128,4 +128,12 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
return status;
}
void CoreWorkerTaskInterface::HandleDirectActorUpdate(
const ActorID &actor_id, const gcs::ActorTableData &actor_data) {
auto &submitter = task_submitters_[TaskTransportType::DIRECT_ACTOR];
auto &direct_actor_submitter =
reinterpret_cast<std::unique_ptr<CoreWorkerDirectActorTaskSubmitter> &>(submitter);
direct_actor_submitter->HandleActorUpdate(actor_id, actor_data);
}
} // namespace ray

View file

@ -65,43 +65,54 @@ class CoreWorkerTaskInterface {
CoreWorkerTaskInterface(WorkerContext &worker_context,
std::unique_ptr<RayletClient> &raylet_client,
CoreWorkerObjectInterface &object_interface,
boost::asio::io_service &io_service,
gcs::RedisGcsClient &gcs_client);
boost::asio::io_service &io_service);
/// Submit a normal task.
///
/// \param[in] caller_id ID of the task submitter.
/// \param[in] function The remote function to execute.
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \param[out] return_ids Ids of the return objects.
/// \return Status.
Status SubmitTask(const RayFunction &function, const std::vector<TaskArg> &args,
const TaskOptions &task_options, std::vector<ObjectID> *return_ids);
Status SubmitTask(const TaskID &caller_id, const RayFunction &function,
const std::vector<TaskArg> &args, const TaskOptions &task_options,
std::vector<ObjectID> *return_ids);
/// Create an actor.
///
/// \param[in] caller_id ID of the task submitter.
/// \param[in] function The remote function that generates the actor object.
/// \param[in] args Arguments of this task.
/// \param[in] actor_creation_options Options for this actor creation task.
/// \param[out] actor_handle Handle to the actor.
/// \return Status.
Status CreateActor(const RayFunction &function, const std::vector<TaskArg> &args,
Status CreateActor(const TaskID &caller_id, const RayFunction &function,
const std::vector<TaskArg> &args,
const ActorCreationOptions &actor_creation_options,
std::unique_ptr<ActorHandle> *actor_handle);
/// Submit an actor task.
///
/// \param[in] caller_id ID of the task submitter.
/// \param[in] actor_handle Handle to the actor.
/// \param[in] function The remote function to execute.
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \param[out] return_ids Ids of the return objects.
/// \return Status.
Status SubmitActorTask(ActorHandle &actor_handle, const RayFunction &function,
const std::vector<TaskArg> &args,
Status SubmitActorTask(const TaskID &caller_id, ActorHandle &actor_handle,
const RayFunction &function, const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids);
/// Handle an update about an actor.
///
/// \param[in] actor_id The ID of the actor whose status has changed.
/// \param[in] actor_data The actor's new status information.
void HandleDirectActorUpdate(const ActorID &actor_id,
const gcs::ActorTableData &actor_data);
private:
/// Build common attributes of the task spec, and compute return ids.
///
@ -120,8 +131,8 @@ class CoreWorkerTaskInterface {
/// \return Void.
void BuildCommonTaskSpec(
TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
const int task_index, const RayFunction &function, const std::vector<TaskArg> &args,
uint64_t num_returns,
const int task_index, const TaskID &caller_id, const RayFunction &function,
const std::vector<TaskArg> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
TaskTransportType transport_type, std::vector<ObjectID> *return_ids);

View file

@ -48,12 +48,11 @@ std::shared_ptr<Buffer> GenerateRandomBuffer() {
return std::make_shared<LocalMemoryBuffer>(arg1.data(), arg1.size(), true);
}
std::unique_ptr<ActorHandle> CreateActorHelper(
CoreWorker &worker, std::unordered_map<std::string, double> &resources,
bool is_direct_call, uint64_t max_reconstructions) {
ActorHandle &CreateActorHelper(CoreWorker &worker,
std::unordered_map<std::string, double> &resources,
bool is_direct_call, uint64_t max_reconstructions) {
std::unique_ptr<ActorHandle> actor_handle;
// Test creating actor.
uint8_t array[] = {1, 2, 3};
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
@ -65,8 +64,11 @@ std::unique_ptr<ActorHandle> CreateActorHelper(
max_reconstructions, is_direct_call, resources, resources, {}};
// Create an actor.
RAY_CHECK_OK(worker.Tasks().CreateActor(func, args, actor_options, &actor_handle));
return actor_handle;
RAY_CHECK_OK(worker.Tasks().CreateActor(worker.GetCallerId(), func, args, actor_options,
&actor_handle));
ActorID actor_id = actor_handle->GetActorID();
RAY_CHECK(worker.AddActorHandle(std::move(actor_handle)));
return worker.GetActorHandle(actor_id);
}
class CoreWorkerTest : public ::testing::Test {
@ -241,7 +243,8 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &res
TaskOptions options;
std::vector<ObjectID> return_ids;
RAY_CHECK_OK(driver.Tasks().SubmitTask(func, args, options, &return_ids));
RAY_CHECK_OK(driver.Tasks().SubmitTask(driver.GetCallerId(), func, args, options,
&return_ids));
ASSERT_EQ(return_ids.size(), 1);
@ -265,7 +268,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1",
nullptr);
auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000);
auto &actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000);
// Test submitting some tasks with by-value args for that actor.
{
@ -285,8 +288,8 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, {});
RAY_CHECK_OK(driver.Tasks().SubmitActorTask(*actor_handle, func, args, options,
&return_ids));
RAY_CHECK_OK(driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle,
func, args, options, &return_ids));
ASSERT_EQ(return_ids.size(), 1);
ASSERT_TRUE(return_ids[0].IsReturnObject());
ASSERT_EQ(
@ -326,8 +329,8 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, {});
auto status =
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids);
auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, func,
args, options, &return_ids);
if (is_direct_call) {
// For direct actor call, submitting a task with by-reference arguments
// would fail.
@ -356,10 +359,10 @@ void CoreWorkerTest::TestActorReconstruction(
nullptr);
// creating actor.
auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000);
auto &actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000);
// Wait for actor alive event.
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true,
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true,
30 * 1000 /* 30s */));
RAY_LOG(INFO) << "actor has been created";
@ -374,9 +377,9 @@ void CoreWorkerTest::TestActorReconstruction(
ASSERT_EQ(system("pkill mock_worker"), 0);
// Wait for actor restruction event, and then for alive event.
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), false,
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), false,
30 * 1000 /* 30s */));
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true,
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true,
30 * 1000 /* 30s */));
RAY_LOG(INFO) << "actor has been reconstructed";
@ -394,8 +397,8 @@ void CoreWorkerTest::TestActorReconstruction(
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, {});
auto status =
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids);
auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle,
func, args, options, &return_ids);
RAY_CHECK_OK(status);
ASSERT_EQ(return_ids.size(), 1);
// Verify if it's expected data.
@ -415,7 +418,7 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map<std::string, double> &r
nullptr);
// creating actor.
auto actor_handle =
auto &actor_handle =
CreateActorHelper(driver, resources, is_direct_call, 0 /* not reconstructable */);
// Test submitting some tasks with by-value args for that actor.
@ -441,8 +444,8 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map<std::string, double> &r
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, {});
auto status =
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids);
auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle,
func, args, options, &return_ids);
if (i < task_index_to_kill_worker) {
RAY_CHECK_OK(status);
}
@ -653,9 +656,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
ActorCreationOptions actor_options{
0, /*is_direct_call*/ true, resources, resources, {}};
const auto job_id = NextJobId();
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1),
ActorHandleID::Nil(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), true,
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), job_id,
ObjectID::FromRandom(), function.GetLanguage(), true,
function.GetFunctionDescriptor());
// Manually create `num_tasks` task specs, and for each of them create a
@ -672,7 +674,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0,
num_returns, resources, resources);
RandomTaskId(), num_returns, resources, resources);
// Set task arguments.
for (const auto &arg : args) {
if (arg.IsPassedByReference()) {
@ -699,23 +701,14 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], JobID::FromInt(1), gcs_options_, "",
"127.0.0.1", nullptr);
std::unique_ptr<ActorHandle> actor_handle;
std::vector<ObjectID> object_ids;
// Test creating actor.
uint8_t array[] = {1, 2, 3};
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
RayFunction func(ray::Language::PYTHON, {});
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));
std::unordered_map<std::string, double> resources;
ActorCreationOptions actor_options{
0, /*is_direct_call*/ true, resources, resources, {}};
// Create an actor.
RAY_CHECK_OK(driver.Tasks().CreateActor(func, args, actor_options, &actor_handle));
std::unordered_map<std::string, double> resources;
auto &actor_handle = CreateActorHelper(driver, resources,
/*is_direct_call=*/true,
/*max_reconstructions=*/0);
// wait for actor creation finish.
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true,
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true,
30 * 1000 /* 30s */));
// Test submitting some tasks with by-value args for that actor.
int64_t start_ms = current_time_ms();
@ -733,8 +726,8 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, {});
RAY_CHECK_OK(
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids));
RAY_CHECK_OK(driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, func,
args, options, &return_ids));
ASSERT_EQ(return_ids.size(), 1);
object_ids.emplace_back(return_ids[0]);
}
@ -776,75 +769,20 @@ TEST_F(ZeroNodeTest, TestWorkerContext) {
}
TEST_F(ZeroNodeTest, TestActorHandle) {
const JobID job_id = NextJobId();
const TaskID task_id = TaskID::ForDriverTask(job_id);
const ActorHandleID actor_handle_id = ActorHandleID::FromRandom();
ActorHandle parent(ActorID::Of(job_id, task_id, 1), actor_handle_id, job_id,
ObjectID::FromRandom(), Language::JAVA, false,
{"org.ray.exampleClass", "exampleMethod", "exampleSignature"});
// Test actor handle serialization and deserialization round trip.
JobID job_id = NextJobId();
ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), job_id,
ObjectID::FromRandom(), Language::PYTHON, /*is_direct_call=*/false,
{});
std::string output;
original.Serialize(&output);
ActorHandle deserialized(output);
ASSERT_EQ(deserialized.GetActorID(), original.GetActorID());
ASSERT_EQ(deserialized.ActorLanguage(), original.ActorLanguage());
ASSERT_EQ(deserialized.ActorCreationTaskFunctionDescriptor(),
original.ActorCreationTaskFunctionDescriptor());
// Test in-band forking logic.
std::unique_ptr<ActorHandle> forkedHandle1 = parent.Fork();
ASSERT_EQ(1, parent.num_forks_);
ASSERT_EQ(parent.GetActorID(), forkedHandle1->GetActorID());
ASSERT_EQ(actor_handle_id, parent.GetActorHandleID());
ASSERT_NE(parent.GetActorHandleID(), forkedHandle1->GetActorHandleID());
ASSERT_EQ(parent.ActorLanguage(), forkedHandle1->ActorLanguage());
ASSERT_EQ(parent.ActorCreationTaskFunctionDescriptor(),
forkedHandle1->ActorCreationTaskFunctionDescriptor());
ASSERT_EQ(parent.ActorCursor(), forkedHandle1->ActorCursor());
ASSERT_EQ(0, forkedHandle1->task_counter_);
ASSERT_EQ(0, forkedHandle1->num_forks_);
ASSERT_EQ(parent.new_actor_handles_.size(), 1);
ASSERT_EQ(parent.new_actor_handles_.back(), forkedHandle1->GetActorHandleID());
parent.new_actor_handles_.clear();
std::unique_ptr<ActorHandle> forkedHandle2 = parent.Fork();
ASSERT_EQ(2, parent.num_forks_);
ASSERT_EQ(0, forkedHandle2->task_counter_);
ASSERT_EQ(0, forkedHandle2->num_forks_);
ASSERT_EQ(parent.new_actor_handles_.size(), 1);
ASSERT_EQ(parent.new_actor_handles_.back(), forkedHandle2->GetActorHandleID());
parent.new_actor_handles_.clear();
// Test serialization and deserialization for in-band fork.
std::string buffer1;
forkedHandle2->Serialize(&buffer1);
ActorHandle deserializedHandle1(buffer1, task_id);
ASSERT_EQ(forkedHandle2->GetActorID(), deserializedHandle1.GetActorID());
ASSERT_EQ(forkedHandle2->GetActorHandleID(), deserializedHandle1.GetActorHandleID());
ASSERT_EQ(forkedHandle2->ActorLanguage(), deserializedHandle1.ActorLanguage());
ASSERT_EQ(forkedHandle2->ActorCreationTaskFunctionDescriptor(),
deserializedHandle1.ActorCreationTaskFunctionDescriptor());
ASSERT_EQ(forkedHandle2->ActorCursor(), deserializedHandle1.ActorCursor());
// Test out-of-band forking logic.
std::unique_ptr<ActorHandle> forkedHandle3 = parent.ForkForSerialization();
ASSERT_EQ(2, parent.num_forks_);
ASSERT_EQ(parent.GetActorID(), forkedHandle3->GetActorID());
ASSERT_EQ(actor_handle_id, parent.GetActorHandleID());
ASSERT_NE(parent.GetActorHandleID(), forkedHandle3->GetActorHandleID());
ASSERT_NE(forkedHandle2->GetActorHandleID(), forkedHandle3->GetActorHandleID());
ASSERT_EQ(parent.ActorLanguage(), forkedHandle3->ActorLanguage());
ASSERT_EQ(parent.ActorCreationTaskFunctionDescriptor(),
forkedHandle3->ActorCreationTaskFunctionDescriptor());
ASSERT_EQ(parent.ActorCursor(), forkedHandle3->ActorCursor());
ASSERT_EQ(0, forkedHandle3->task_counter_);
ASSERT_EQ(0, forkedHandle3->num_forks_);
ASSERT_EQ(parent.new_actor_handles_.size(), 1);
ASSERT_NE(parent.new_actor_handles_.back(), forkedHandle3->GetActorHandleID());
parent.new_actor_handles_.clear();
// Test serialization and deserialization for out-of-band fork.
std::string buffer2;
forkedHandle3->Serialize(&buffer2);
ActorHandle deserializedHandle2(buffer2, task_id);
ASSERT_EQ(forkedHandle3->GetActorID(), deserializedHandle2.GetActorID());
ASSERT_NE(forkedHandle3->GetActorHandleID(), deserializedHandle2.GetActorHandleID());
ASSERT_EQ(forkedHandle3->ActorLanguage(), deserializedHandle2.ActorLanguage());
ASSERT_EQ(forkedHandle3->ActorCreationTaskFunctionDescriptor(),
deserializedHandle2.ActorCreationTaskFunctionDescriptor());
ASSERT_EQ(forkedHandle3->ActorCursor(), deserializedHandle2.ActorCursor());
// TODO: Test submission from different handles.
}
TEST_F(SingleNodeTest, TestMemoryStoreProvider) {

View file

@ -15,10 +15,9 @@ bool HasByReferenceArgs(const TaskSpecification &spec) {
}
CoreWorkerDirectActorTaskSubmitter::CoreWorkerDirectActorTaskSubmitter(
boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client,
boost::asio::io_service &io_service,
std::unique_ptr<CoreWorkerStoreProvider> store_provider)
: io_service_(io_service),
gcs_client_(gcs_client),
client_call_manager_(io_service),
store_provider_(std::move(store_provider)) {}
@ -40,11 +39,6 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(
std::unique_lock<std::mutex> guard(mutex_);
if (subscribed_actors_.find(actor_id) == subscribed_actors_.end()) {
RAY_CHECK_OK(SubscribeActorUpdates(actor_id));
subscribed_actors_.insert(actor_id);
}
auto iter = actor_states_.find(actor_id);
if (iter == actor_states_.end() ||
iter->second.state_ == ActorTableData::RECONSTRUCTING) {
@ -78,61 +72,48 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(
}
}
Status CoreWorkerDirectActorTaskSubmitter::SubscribeActorUpdates(
const ActorID &actor_id) {
// Register a callback to handle actor notifications.
auto actor_notification_callback = [this](const ActorID &actor_id,
const ActorTableData &actor_data) {
std::unique_lock<std::mutex> guard(mutex_);
actor_states_.erase(actor_id);
actor_states_.emplace(
actor_id,
ActorStateData(actor_data.state(), actor_data.ip_address(), actor_data.port()));
void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate(
const ActorID &actor_id, const ActorTableData &actor_data) {
std::unique_lock<std::mutex> guard(mutex_);
actor_states_.erase(actor_id);
actor_states_.emplace(
actor_id,
ActorStateData(actor_data.state(), actor_data.ip_address(), actor_data.port()));
if (actor_data.state() == ActorTableData::ALIVE) {
// Check if this actor is the one that we're interested, if we already have
// a connection to the actor, or have pending requests for it, we should
// create a new connection.
if (pending_requests_.count(actor_id) > 0) {
ConnectAndSendPendingTasks(actor_id, actor_data.ip_address(), actor_data.port());
}
} else {
// Remove rpc client if it's dead or being reconstructed.
rpc_clients_.erase(actor_id);
if (actor_data.state() == ActorTableData::ALIVE) {
// Check if this actor is the one that we're interested, if we already have
// a connection to the actor, or have pending requests for it, we should
// create a new connection.
if (pending_requests_.count(actor_id) > 0) {
ConnectAndSendPendingTasks(actor_id, actor_data.ip_address(), actor_data.port());
}
} else {
// Remove rpc client if it's dead or being reconstructed.
rpc_clients_.erase(actor_id);
// For tasks that have been sent and are waiting for replies, treat them
// as failed when the destination actor is dead or reconstructing.
auto iter = waiting_reply_tasks_.find(actor_id);
if (iter != waiting_reply_tasks_.end()) {
for (const auto &entry : iter->second) {
const auto &task_id = entry.first;
const auto num_returns = entry.second;
TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED);
}
waiting_reply_tasks_.erase(actor_id);
}
// If this actor is permanently dead and there are pending requests, treat
// the pending tasks as failed.
if (actor_data.state() == ActorTableData::DEAD &&
pending_requests_.count(actor_id) > 0) {
for (const auto &request : pending_requests_[actor_id]) {
TreatTaskAsFailed(TaskID::FromBinary(request->task_spec().task_id()),
request->task_spec().num_returns(),
rpc::ErrorType::ACTOR_DIED);
}
pending_requests_.erase(actor_id);
// For tasks that have been sent and are waiting for replies, treat them
// as failed when the destination actor is dead or reconstructing.
auto iter = waiting_reply_tasks_.find(actor_id);
if (iter != waiting_reply_tasks_.end()) {
for (const auto &entry : iter->second) {
const auto &task_id = entry.first;
const auto num_returns = entry.second;
TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED);
}
waiting_reply_tasks_.erase(actor_id);
}
RAY_LOG(INFO) << "received notification on actor, state="
<< static_cast<int>(actor_data.state()) << ", actor_id: " << actor_id
<< ", ip address: " << actor_data.ip_address()
<< ", port: " << actor_data.port();
};
return gcs_client_.Actors().AsyncSubscribe(actor_id, actor_notification_callback,
nullptr);
// If this actor is permanently dead and there are pending requests, treat
// the pending tasks as failed.
if (actor_data.state() == ActorTableData::DEAD &&
pending_requests_.count(actor_id) > 0) {
for (const auto &request : pending_requests_[actor_id]) {
TreatTaskAsFailed(TaskID::FromBinary(request->task_spec().task_id()),
request->task_spec().num_returns(), rpc::ErrorType::ACTOR_DIED);
}
pending_requests_.erase(actor_id);
}
}
}
void CoreWorkerDirectActorTaskSubmitter::ConnectAndSendPendingTasks(
@ -211,14 +192,9 @@ void CoreWorkerDirectActorTaskSubmitter::TreatTaskAsFailed(
}
}
bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) {
bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const {
std::unique_lock<std::mutex> guard(mutex_);
if (subscribed_actors_.find(actor_id) == subscribed_actors_.end()) {
RAY_CHECK_OK(SubscribeActorUpdates(actor_id));
subscribed_actors_.insert(actor_id);
}
auto iter = actor_states_.find(actor_id);
return (iter != actor_states_.end() && iter->second.state_ == ActorTableData::ALIVE);
}
@ -252,10 +228,10 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask(
return;
}
auto it = scheduling_queue_.find(task_spec.ActorHandleId());
auto it = scheduling_queue_.find(task_spec.CallerId());
if (it == scheduling_queue_.end()) {
auto result = scheduling_queue_.emplace(
task_spec.ActorHandleId(),
task_spec.CallerId(),
std::unique_ptr<SchedulingQueue>(new SchedulingQueue(io_service_)));
it = result.first;
}

View file

@ -35,7 +35,7 @@ struct ActorStateData {
class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter {
public:
CoreWorkerDirectActorTaskSubmitter(
boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client,
boost::asio::io_service &io_service,
std::unique_ptr<CoreWorkerStoreProvider> store_provider);
/// Submit a task to an actor for execution.
@ -44,10 +44,13 @@ class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter {
/// \return Status.
Status SubmitTask(const TaskSpecification &task_spec) override;
private:
/// Subscribe to updates of an actor.
Status SubscribeActorUpdates(const ActorID &actor_id);
/// Handle an update about an actor.
///
/// \param[in] actor_id The ID of the actor whose status has changed.
/// \param[in] actor_data The actor's new status information.
void HandleActorUpdate(const ActorID &actor_id, const gcs::ActorTableData &actor_data);
private:
/// Push a task to a remote actor via the given client.
/// Note, this function doesn't return any error status code. If an error occurs while
/// sending the request, this task will be treated as failed.
@ -86,14 +89,11 @@ class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter {
///
/// \param[in] actor_id The actor ID.
/// \return Whether this actor is alive.
bool IsActorAlive(const ActorID &actor_id);
bool IsActorAlive(const ActorID &actor_id) const;
/// The IO event loop.
boost::asio::io_service &io_service_;
/// Gcs client.
gcs::RedisGcsClient &gcs_client_;
/// The `ClientCallManager` object that is shared by all `DirectActorClient`s.
rpc::ClientCallManager client_call_manager_;
@ -117,9 +117,6 @@ class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter {
/// Map from actor id to the tasks that are waiting for reply.
std::unordered_map<ActorID, std::unordered_map<TaskID, int>> waiting_reply_tasks_;
/// The set of actors which are subscribed for further updates.
std::unordered_set<ActorID> subscribed_actors_;
/// The store provider.
std::unique_ptr<CoreWorkerStoreProvider> store_provider_;
@ -230,7 +227,7 @@ class CoreWorkerDirectActorTaskReceiver : public CoreWorkerTaskReceiver,
TaskHandler task_handler_;
/// Queue of pending requests per actor handle.
/// TODO(ekl) GC these queues once the handle is no longer active.
std::unordered_map<ActorHandleID, std::unique_ptr<SchedulingQueue>> scheduling_queue_;
std::unordered_map<TaskID, std::unique_ptr<SchedulingQueue>> scheduling_queue_;
};
} // namespace ray

View file

@ -149,6 +149,7 @@ Status SubscriptionExecutor<ID, Data, Table>::AsyncSubscribe(
template <typename ID, typename Data, typename Table>
Status SubscriptionExecutor<ID, Data, Table>::AsyncUnsubscribe(
const ClientID &client_id, const ID &id, const StatusCallback &done) {
SubscribeCallback<ID, Data> subscribe = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
const auto it = id_to_callback_map_.find(id);
@ -156,14 +157,25 @@ Status SubscriptionExecutor<ID, Data, Table>::AsyncUnsubscribe(
RAY_LOG(DEBUG) << "Invalid Unsubscribe! id " << id << " client_id " << client_id;
return Status::Invalid("Invalid Unsubscribe, no existing subscription found.");
}
subscribe = std::move(it->second);
id_to_callback_map_.erase(it);
}
auto on_done = [this, id, done](Status status) {
if (status.ok()) {
RAY_CHECK(subscribe != nullptr);
auto on_done = [this, id, subscribe, done](Status status) {
if (!status.ok()) {
std::unique_lock<std::mutex> lock(mutex_);
const auto it = id_to_callback_map_.find(id);
if (it != id_to_callback_map_.end()) {
id_to_callback_map_.erase(it);
// The initial AsyncUnsubscribe deleted the callback, but the client
// has subscribed again in the meantime. This new callback will be
// called if we receive more notifications.
RAY_LOG(WARNING)
<< "Client called AsyncSubscribe on " << id
<< " while AsyncUnsubscribe was pending, but the unsubscribe failed.";
} else {
// The Unsubscribe failed, so restore the initial callback.
id_to_callback_map_[id] = subscribe;
}
}
if (done != nullptr) {

View file

@ -48,21 +48,24 @@ message TaskSpec {
bytes parent_task_id = 6;
// A count of the number of tasks submitted by the parent task before this one.
uint64 parent_counter = 7;
// Task ID of the caller. This is the same as parent_task_id for non-actors.
// This is the actor ID (embedded in a nil task ID) for actors.
bytes caller_id = 8;
// Task arguments.
repeated TaskArg args = 8;
repeated TaskArg args = 9;
// Number of return objects.
uint64 num_returns = 9;
uint64 num_returns = 10;
// Quantities of the different resources required by this task.
map<string, double> required_resources = 10;
map<string, double> required_resources = 11;
// The resources required for placing this task on a node. If this is empty,
// then the placement resources are equal to the required_resources.
map<string, double> required_placement_resources = 11;
map<string, double> required_placement_resources = 12;
// Task specification for an actor creation task.
// This field is only valid when `type == ACTOR_CREATION_TASK`.
ActorCreationTaskSpec actor_creation_task_spec = 14;
ActorCreationTaskSpec actor_creation_task_spec = 13;
// Task specification for an actor task.
// This field is only valid when `type == ACTOR_TASK`.
ActorTaskSpec actor_task_spec = 15;
ActorTaskSpec actor_task_spec = 14;
}
// Argument in the task.
@ -98,16 +101,10 @@ message ActorCreationTaskSpec {
message ActorTaskSpec {
// Actor ID of the actor that this task is executed on.
bytes actor_id = 2;
// The ID of the handle that was used to submit the task. This should be
// unique across handles with the same actor_id.
bytes actor_handle_id = 3;
// The dummy object ID of the actor creation task.
bytes actor_creation_dummy_object_id = 4;
// Number of tasks that have been submitted to this actor so far.
uint64 actor_counter = 5;
// This will be populated with all of the new actor handles that were forked
// from this handle since the last task on this handle was submitted.
repeated bytes new_actor_handles = 6;
// The dummy object ID of the previous actor task.
bytes previous_actor_task_dummy_object_id = 7;
}

View file

@ -9,9 +9,6 @@ message ActorHandle {
// ID of the actor.
bytes actor_id = 1;
// ID of this actor handle.
bytes actor_handle_id = 2;
// ID of the job that created the actor (it is possible that the handle
// exists on a job with a different job ID).
bytes creation_job_id = 3;
@ -22,8 +19,9 @@ message ActorHandle {
// Function descriptor of actor creation task.
repeated string actor_creation_task_function_descriptor = 5;
// The unique id of the last return of the last task.
// It's used as a dependency for the next task.
// The unique id of the dummy object returned by the actor creation task.
// It's used as a dependency for the first task.
// TODO: Remove this once scheduling is done by task counter only.
bytes actor_cursor = 6;
// Whether direct actor call is used.

View file

@ -9,7 +9,11 @@ namespace ray {
namespace raylet {
ActorRegistration::ActorRegistration(const ActorTableData &actor_table_data)
: actor_table_data_(actor_table_data) {}
: actor_table_data_(actor_table_data) {
// The first task submitted on each new actor handle will depend on the actor
// creation object, so we always pin it.
dummy_objects_[GetActorCreationDependency()]++;
}
ActorRegistration::ActorRegistration(const ActorTableData &actor_table_data,
const ActorCheckpointData &checkpoint_data)
@ -18,8 +22,8 @@ ActorRegistration::ActorRegistration(const ActorTableData &actor_table_data,
ObjectID::FromBinary(checkpoint_data.execution_dependency())) {
// Restore `frontier_`.
for (int64_t i = 0; i < checkpoint_data.handle_ids_size(); i++) {
auto handle_id = ActorHandleID::FromBinary(checkpoint_data.handle_ids(i));
auto &frontier_entry = frontier_[handle_id];
auto caller_id = TaskID::FromBinary(checkpoint_data.handle_ids(i));
auto &frontier_entry = frontier_[caller_id];
frontier_entry.task_counter = checkpoint_data.task_counters(i);
frontier_entry.execution_dependency =
ObjectID::FromBinary(checkpoint_data.frontier_dependencies(i));
@ -55,14 +59,14 @@ const int64_t ActorRegistration::GetRemainingReconstructions() const {
return actor_table_data_.remaining_reconstructions();
}
const std::unordered_map<ActorHandleID, ActorRegistration::FrontierLeaf>
const std::unordered_map<TaskID, ActorRegistration::FrontierLeaf>
&ActorRegistration::GetFrontier() const {
return frontier_;
}
ObjectID ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id,
ObjectID ActorRegistration::ExtendFrontier(const TaskID &caller_id,
const ObjectID &execution_dependency) {
auto &frontier_entry = frontier_[handle_id];
auto &frontier_entry = frontier_[caller_id];
// Release the reference to the previous cursor for this
// actor handle, if there was one.
ObjectID object_to_release;
@ -85,16 +89,6 @@ ObjectID ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id,
return object_to_release;
}
void ActorRegistration::AddHandle(const ActorHandleID &handle_id,
const ObjectID &execution_dependency) {
if (frontier_.find(handle_id) == frontier_.end()) {
auto &new_handle = frontier_[handle_id];
new_handle.task_counter = 0;
new_handle.execution_dependency = execution_dependency;
dummy_objects_[execution_dependency]++;
}
}
int ActorRegistration::NumHandles() const { return frontier_.size(); }
std::shared_ptr<ActorCheckpointData> ActorRegistration::GenerateCheckpointData(
@ -102,14 +96,14 @@ std::shared_ptr<ActorCheckpointData> ActorRegistration::GenerateCheckpointData(
// Make a copy of the actor registration
ActorRegistration copy = *this;
if (task) {
const auto actor_handle_id = task->GetTaskSpecification().ActorHandleId();
const auto actor_caller_id = task->GetTaskSpecification().CallerId();
const auto dummy_object = task->GetTaskSpecification().ActorDummyObject();
// Extend its frontier to include the most recent task.
// NOTE(hchen): For non-direct-call actors, this is needed because this method is
// called before `FinishAssignedTask`, which will be called when the worker tries to
// fetch the next task. For direct-call actors, checkpoint data doesn't contain
// frontier info, so we don't need to do `ExtendFrontier` here.
copy.ExtendFrontier(actor_handle_id, dummy_object);
copy.ExtendFrontier(actor_caller_id, dummy_object);
}
// Use actor's current state to generate checkpoint data.

View file

@ -97,7 +97,7 @@ class ActorRegistration {
///
/// \return The actor frontier, a map from handle ID to execution state for
/// that handle.
const std::unordered_map<ActorHandleID, FrontierLeaf> &GetFrontier() const;
const std::unordered_map<TaskID, FrontierLeaf> &GetFrontier() const;
/// Get all the dummy objects of this actor's tasks.
const std::unordered_map<ObjectID, int64_t> &GetDummyObjects() const {
@ -112,18 +112,7 @@ class ActorRegistration {
/// state. This is the execution dependency returned by the task.
/// \return The dummy object that can be released as a result of the executed
/// task. If no dummy object can be released, then this is nil.
ObjectID ExtendFrontier(const ActorHandleID &handle_id,
const ObjectID &execution_dependency);
/// Add a new handle to the actor frontier. This does nothing if the actor
/// handle already exists.
///
/// \param handle_id The ID of the handle to add.
/// \param execution_dependency This is the expected execution dependency for
/// the first task submitted on the new handle. If the new handle hasn't been
/// seen yet, then this dependency will be added to the actor frontier and is
/// not safe to release until the first task has been submitted.
void AddHandle(const ActorHandleID &handle_id, const ObjectID &execution_dependency);
ObjectID ExtendFrontier(const TaskID &caller_id, const ObjectID &execution_dependency);
/// Returns num handles to this actor entry.
///
@ -150,7 +139,7 @@ class ActorRegistration {
/// The execution frontier of the actor, which represents which tasks have
/// executed so far and which tasks may execute next, based on execution
/// dependencies. This is indexed by handle.
std::unordered_map<ActorHandleID, FrontierLeaf> frontier_;
std::unordered_map<TaskID, FrontierLeaf> frontier_;
/// This map is used to track all the unreleased dummy objects for this
/// actor. The map key is the dummy object ID, and the map value is the
/// number of actor handles that depend on that dummy object. When the map

View file

@ -139,7 +139,7 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
uint64_t num_returns) {
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, {"", "", ""}, JobID::Nil(),
RandomTaskId(), 0, num_returns, {}, {});
RandomTaskId(), 0, RandomTaskId(), num_returns, {}, {});
for (const auto &arg : arguments) {
builder.AddByRefArg(arg);
}

View file

@ -23,12 +23,12 @@ namespace {
int64_t GetExpectedTaskCounter(
const std::unordered_map<ray::ActorID, ray::raylet::ActorRegistration>
&actor_registry,
const ray::ActorID &actor_id, const ray::ActorHandleID &actor_handle_id) {
const ray::ActorID &actor_id, const ray::TaskID &actor_caller_id) {
auto actor_entry = actor_registry.find(actor_id);
RAY_CHECK(actor_entry != actor_registry.end());
const auto &frontier = actor_entry->second.GetFrontier();
int64_t expected_task_counter = 0;
auto frontier_entry = frontier.find(actor_handle_id);
auto frontier_entry = frontier.find(actor_caller_id);
if (frontier_entry != frontier.end()) {
expected_task_counter = frontier_entry->second.task_counter;
}
@ -1605,8 +1605,8 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
auto node_manager_id = actor_entry->second.GetNodeManagerId();
if (node_manager_id == gcs_client_->client_table().GetLocalClientId()) {
// The actor is local.
int64_t expected_task_counter = GetExpectedTaskCounter(
actor_registry_, spec.ActorId(), spec.ActorHandleId());
int64_t expected_task_counter =
GetExpectedTaskCounter(actor_registry_, spec.ActorId(), spec.CallerId());
if (static_cast<int64_t>(spec.ActorCounter()) < expected_task_counter) {
// A task that has already been executed before has been found. The
// task will be treated as failed if at least one of the task's
@ -1825,7 +1825,7 @@ bool NodeManager::AssignTask(const Task &task) {
// An actor task should only be ready to be assigned if it matches the
// expected task counter.
int64_t expected_task_counter =
GetExpectedTaskCounter(actor_registry_, spec.ActorId(), spec.ActorHandleId());
GetExpectedTaskCounter(actor_registry_, spec.ActorId(), spec.CallerId());
RAY_CHECK(static_cast<int64_t>(spec.ActorCounter()) == expected_task_counter)
<< "Expected actor counter: " << expected_task_counter << ", task "
<< spec.TaskId() << " has: " << spec.ActorCounter();
@ -1985,18 +1985,18 @@ std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTas
void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
ActorID actor_id;
ActorHandleID actor_handle_id;
TaskID caller_id;
const TaskSpecification task_spec = task.GetTaskSpecification();
bool resumed_from_checkpoint = false;
if (task_spec.IsActorCreationTask()) {
actor_id = task_spec.ActorCreationId();
actor_handle_id = ActorHandleID::Nil();
caller_id = TaskID::Nil();
if (checkpoint_id_to_restore_.count(actor_id) > 0) {
resumed_from_checkpoint = true;
}
} else {
actor_id = task_spec.ActorId();
actor_handle_id = task_spec.ActorHandleId();
caller_id = task_spec.CallerId();
}
if (task_spec.IsActorCreationTask()) {
@ -2004,7 +2004,6 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
worker.AssignActorId(actor_id);
// Lookup the parent actor id.
auto parent_task_id = task_spec.ParentTaskId();
RAY_CHECK(actor_handle_id.IsNil());
int port = worker.Port();
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup(
JobID::Nil(), parent_task_id,
@ -2052,36 +2051,28 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
resumed_from_checkpoint, port);
}));
} else {
// The actor was not resumed from a checkpoint. We extend the actor's
// frontier as usual since there is no frontier to restore.
ExtendActorFrontier(task_spec.ActorDummyObject(), actor_id, actor_handle_id);
auto actor_entry = actor_registry_.find(actor_id);
RAY_CHECK(actor_entry != actor_registry_.end());
// Extend the actor's frontier to include the executed task.
const ObjectID object_to_release =
actor_entry->second.ExtendFrontier(caller_id, task_spec.ActorDummyObject());
if (!object_to_release.IsNil()) {
// If there were no new actor handles created, then no other actor task
// will depend on this execution dependency, so it safe to release.
HandleObjectMissing(object_to_release);
}
// Mark the dummy object as locally available to indicate that the actor's
// state has changed and the next method can run. This is not added to the
// object table, so the update will be invisible to both the local object
// manager and the other nodes.
// NOTE(swang): The dummy objects must be marked as local whenever
// ExtendFrontier is called, and vice versa, so that we can clean up the
// dummy objects properly in case the actor fails and needs to be
// reconstructed.
HandleObjectLocal(task_spec.ActorDummyObject());
}
}
void NodeManager::ExtendActorFrontier(const ObjectID &dummy_object,
const ActorID &actor_id,
const ActorHandleID &actor_handle_id) {
auto actor_entry = actor_registry_.find(actor_id);
RAY_CHECK(actor_entry != actor_registry_.end());
// Extend the actor's frontier to include the executed task.
const ObjectID object_to_release =
actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object);
if (!object_to_release.IsNil()) {
// If there were no new actor handles created, then no other actor task
// will depend on this execution dependency, so it safe to release.
HandleObjectMissing(object_to_release);
}
// Mark the dummy object as locally available to indicate that the actor's
// state has changed and the next method can run. This is not added to the
// object table, so the update will be invisible to both the local object
// manager and the other nodes.
// NOTE(swang): The dummy objects must be marked as local whenever
// ExtendFrontier is called, and vice versa, so that we can clean up the
// dummy objects properly in case the actor fails and needs to be
// reconstructed.
HandleObjectLocal(dummy_object);
}
void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id,
const TaskSpecification &task_spec,
bool resumed_from_checkpoint,
@ -2141,9 +2132,10 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
}
}
if (!resumed_from_checkpoint) {
// The actor was not resumed from a checkpoint. We extend the actor's
// frontier as usual since there is no frontier to restore.
ExtendActorFrontier(task_spec.ActorDummyObject(), actor_id, ActorHandleID::Nil());
// The actor was not resumed from a checkpoint. Store the
// initial dummy object. All future handles to the actor will
// depend on this object.
HandleObjectLocal(task_spec.ActorDummyObject());
}
}
@ -2450,28 +2442,10 @@ void NodeManager::FinishAssignTask(const TaskID &task_id, Worker &worker, bool s
// We successfully assigned the task to the worker.
worker.AssignTaskId(spec.TaskId());
worker.AssignJobId(spec.JobId());
// Actor tasks require extra accounting to track the actor's state.
if (spec.IsActorTask()) {
auto actor_entry = actor_registry_.find(spec.ActorId());
RAY_CHECK(actor_entry != actor_registry_.end());
// Process any new actor handles that were created since the
// previous task on this handle was executed. The first task
// submitted on a new actor handle will depend on the dummy object
// returned by the previous task, so the dependency will not be
// released until this first task is submitted.
for (auto &new_handle_id : spec.NewActorHandles()) {
const auto prev_actor_task_id = spec.PreviousActorTaskDummyObjectId();
RAY_CHECK(!prev_actor_task_id.IsNil());
// Add the new handle and give it a reference to the finished task's
// execution dependency.
actor_entry->second.AddHandle(new_handle_id, prev_actor_task_id);
}
// TODO(swang): For actors with multiple actor handles, to
// guarantee that tasks are replayed in the same order after a
// failure, we must update the task's execution dependency to be
// the actor's current execution dependency.
}
// TODO(swang): For actors with multiple actor handles, to
// guarantee that tasks are replayed in the same order after a
// failure, we must update the task's execution dependency to be
// the actor's current execution dependency.
// Mark the task as running.
// (See design_docs/task_states.rst for the state transition diagram.)

View file

@ -243,13 +243,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
void FinishAssignedActorCreationTask(const ActorID &parent_actor_id,
const TaskSpecification &task_spec,
bool resumed_from_checkpoint, int port);
/// Extend actor frontier after an actor task or actor creation task executes.
///
/// \param dummy_object Dummy object corresponding to the task.
/// \param actor_id The relevant actor ID.
/// \param actor_handle_id The relevant actor handle ID.
void ExtendActorFrontier(const ObjectID &dummy_object, const ActorID &actor_id,
const ActorHandleID &actor_handle_id);
/// Make a placement decision for placeable tasks given the resource_map
/// provided. This will perform task state transitions and task forwarding.
///

View file

@ -76,7 +76,7 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
uint64_t num_returns) {
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, {"", "", ""}, JobID::Nil(),
RandomTaskId(), 0, num_returns, {}, {});
RandomTaskId(), 0, RandomTaskId(), num_returns, {}, {});
for (const auto &arg : arguments) {
builder.AddByRefArg(arg);
}

View file

@ -40,13 +40,16 @@ class DirectActorClient : public std::enable_shared_from_this<DirectActorClient>
/// \return if the rpc call succeeds
ray::Status PushTask(std::unique_ptr<PushTaskRequest> request,
const ClientCallback<PushTaskReply> &callback) {
request->set_sequence_number(next_seq_no_++);
send_queue_.push_back(std::make_pair(std::move(request), callback));
request->set_sequence_number(request->task_spec().actor_task_spec().actor_counter());
{
std::lock_guard<std::mutex> lock(mutex_);
send_queue_.push_back(std::make_pair(std::move(request), callback));
}
SendRequests();
return ray::Status::OK();
}
/// Send as many pending tasks as possible. This method is thread-safe.
/// Send as many pending tasks as possible. This method is NOT thread-safe.
///
/// The client will guarantee no more than kMaxBytesInFlight bytes of RPCs are being
/// sent at once. This prevents the server scheduling queue from being overwhelmed.
@ -115,9 +118,6 @@ class DirectActorClient : public std::enable_shared_from_this<DirectActorClient>
std::deque<std::pair<std::unique_ptr<PushTaskRequest>, ClientCallback<PushTaskReply>>>
send_queue_;
/// The next sequence number to assign to a task for this server.
int64_t next_seq_no_ = 0;
/// The number of bytes currently in flight.
int64_t rpc_bytes_in_flight_ = 0;