[core] Attach call site to ObjectRefs, print on error (#17971)

* Attach call site to ObjectRef

* flag

* Fix build

* build

* build

* build

* x

* x

* skip on windows

* lint
This commit is contained in:
Stephanie Wang 2021-09-01 15:29:05 -07:00 committed by GitHub
parent d470e679df
commit d43d297d9a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 366 additions and 239 deletions

View file

@ -129,7 +129,8 @@ std::vector<std::unique_ptr<::ray::TaskArg>> TransformArgs(
} else {
RAY_CHECK(arg.id);
ray_arg = absl::make_unique<ray::TaskArgByReference>(ObjectID::FromBinary(*arg.id),
ray::rpc::Address{});
ray::rpc::Address{},
/*call_site=*/"");
}
ray_args.push_back(std::move(ray_arg));
}

View file

@ -36,14 +36,18 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
TaskOptions options{};
options.name = call_options.name;
options.resources = call_options.resources;
std::vector<ObjectID> return_ids;
std::vector<rpc::ObjectReference> return_refs;
if (invocation.task_type == TaskType::ACTOR_TASK) {
core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation),
invocation.args, options, &return_ids);
return_refs = core_worker.SubmitActorTask(
invocation.actor_id, BuildRayFunction(invocation), invocation.args, options);
} else {
core_worker.SubmitTask(BuildRayFunction(invocation), invocation.args, options,
&return_ids, 1, false,
std::make_pair(PlacementGroupID::Nil(), -1), true, "");
return_refs = core_worker.SubmitTask(
BuildRayFunction(invocation), invocation.args, options, 1, false,
std::make_pair(PlacementGroupID::Nil(), -1), true, "");
}
std::vector<ObjectID> return_ids;
for (const auto &ref : return_refs) {
return_ids.push_back(ObjectID::FromBinary(ref.object_id()));
}
return return_ids[0];
}

View file

@ -123,7 +123,7 @@ Status TaskExecutor::ExecuteTask(
ray::TaskType task_type, const std::string task_name, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args_buffer,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids, const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,

View file

@ -76,7 +76,7 @@ class TaskExecutor {
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids, const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,

View file

@ -81,6 +81,7 @@ cdef class ObjectRef(BaseID):
# of active IDs in the core worker so we know whether we should clean
# it up.
c_bool in_core_worker
c_string call_site_data
cdef CObjectID native(self)

View file

@ -46,6 +46,7 @@ from cython.operator import dereference, postincrement
from ray.includes.common cimport (
CBuffer,
CAddress,
CObjectReference,
CLanguage,
CObjectReference,
CRayObject,
@ -182,10 +183,11 @@ cdef RayObjectsToDataMetadataPairs(
return data_metadata_pairs
cdef VectorToObjectRefs(const c_vector[CObjectID] &object_refs):
cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs):
result = []
for i in range(object_refs.size()):
result.append(ObjectRef(object_refs[i].Binary()))
result.append(ObjectRef(object_refs[i].object_id(),
object_refs[i].call_site()))
return result
@ -328,6 +330,7 @@ cdef prepare_args(
int64_t total_inlined
shared_ptr[CBuffer] arg_data
c_vector[CObjectID] inlined_ids
c_string put_arg_call_site
c_vector[CObjectReference] inlined_refs
worker = ray.worker.global_worker
@ -341,7 +344,8 @@ cdef prepare_args(
unique_ptr[CTaskArg](new CTaskArgByReference(
c_arg,
CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_arg))))
c_arg),
arg.call_site())))
else:
serialized_arg = worker.get_serialization_context().serialize(arg)
@ -356,6 +360,8 @@ cdef prepare_args(
metadata_fields[0], language))
size = serialized_arg.total_bytes
if RayConfig.instance().record_ref_creation_sites():
get_py_stack(&put_arg_call_site)
# TODO(edoakes): any objects containing ObjectRefs are spilled to
# plasma here. This is inefficient for small objects, but inlined
# arguments aren't associated ObjectRefs right now so this is a
@ -383,7 +389,8 @@ cdef prepare_args(
new CTaskArgByReference(CObjectID.FromBinary(
core_worker.put_serialized_object(
serialized_arg, inline_small_object=False)),
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress())))
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
put_arg_call_site)))
cdef raise_if_dependency_failed(arg):
@ -403,7 +410,7 @@ cdef execute_task(
const CRayFunction &ray_function,
const unordered_map[c_string, double] &c_resources,
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectID] &c_arg_reference_ids,
const c_vector[CObjectReference] &c_arg_refs,
const c_vector[CObjectID] &c_return_ids,
const c_string debugger_breakpoint,
c_vector[shared_ptr[CRayObject]] *returns,
@ -526,7 +533,7 @@ cdef execute_task(
args, kwargs = [], {}
else:
metadata_pairs = RayObjectsToDataMetadataPairs(c_args)
object_refs = VectorToObjectRefs(c_arg_reference_ids)
object_refs = VectorToObjectRefs(c_arg_refs)
if core_worker.current_actor_is_asyncio():
# We deserialize objects in event loop thread to
@ -658,7 +665,7 @@ cdef CRayStatus task_execution_handler(
const CRayFunction &ray_function,
const unordered_map[c_string, double] &c_resources,
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectID] &c_arg_reference_ids,
const c_vector[CObjectReference] &c_arg_refs,
const c_vector[CObjectID] &c_return_ids,
const c_string debugger_breakpoint,
c_vector[shared_ptr[CRayObject]] *returns,
@ -670,7 +677,7 @@ cdef CRayStatus task_execution_handler(
# The call to execute_task should never raise an exception. If
# it does, that indicates that there was an internal error.
execute_task(task_type, task_name, ray_function, c_resources,
c_args, c_arg_reference_ids, c_return_ids,
c_args, c_arg_refs, c_return_ids,
debugger_breakpoint, returns,
is_application_level_error)
except Exception as e:
@ -747,11 +754,17 @@ cdef void run_on_util_worker_handler(
cdef c_vector[c_string] spill_objects_handler(
const c_vector[CObjectID]& object_ids_to_spill,
const c_vector[c_string]& owner_addresses) nogil:
cdef c_vector[c_string] return_urls
const c_vector[CObjectReference]& object_refs_to_spill) nogil:
cdef:
c_vector[c_string] return_urls
c_vector[c_string] owner_addresses
with gil:
object_refs = VectorToObjectRefs(object_ids_to_spill)
object_refs = VectorToObjectRefs(object_refs_to_spill)
for i in range(object_refs_to_spill.size()):
owner_addresses.push_back(
object_refs_to_spill[i].owner_address()
.SerializeAsString())
try:
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER,
@ -774,7 +787,7 @@ cdef c_vector[c_string] spill_objects_handler(
cdef int64_t restore_spilled_objects_handler(
const c_vector[CObjectID]& object_ids_to_restore,
const c_vector[CObjectReference]& object_refs_to_restore,
const c_vector[c_string]& object_urls) nogil:
cdef:
int64_t bytes_restored = 0
@ -783,7 +796,7 @@ cdef int64_t restore_spilled_objects_handler(
size = object_urls.size()
for i in range(size):
urls.append(object_urls[i])
object_refs = VectorToObjectRefs(object_ids_to_restore)
object_refs = VectorToObjectRefs(object_refs_to_restore)
try:
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER,
@ -897,7 +910,8 @@ cdef void get_py_stack(c_string* stack_out) nogil:
frame.f_code.co_filename, frame.f_code.co_name,
frame.f_lineno))
frame = frame.f_back
stack_out[0] = " | ".join(msg_frames).encode("ascii")
stack_out[0] = (ray_constants.CALL_STACK_LINE_DELIMITER
.join(msg_frames).encode("ascii"))
cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str):
cdef shared_ptr[CBuffer] empty_metadata
@ -1338,13 +1352,13 @@ cdef class CoreWorker:
unordered_map[c_string, double] c_resources
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
c_vector[CObjectID] return_ids
CPlacementGroupID c_placement_group_id = \
placement_group_id.native()
c_string c_serialized_runtime_env
unordered_map[c_string, c_string] \
c_override_environment_variables = \
override_environment_variables
c_vector[CObjectReference] return_refs
with self.profile_event(b"submit_task"):
c_serialized_runtime_env = \
@ -1357,19 +1371,19 @@ cdef class CoreWorker:
# NOTE(edoakes): releasing the GIL while calling this method causes
# segfaults. See relevant issue for details:
# https://github.com/ray-project/ray/pull/12803
CCoreWorkerProcess.GetCoreWorker().SubmitTask(
return_refs = CCoreWorkerProcess.GetCoreWorker().SubmitTask(
ray_function, args_vector, CTaskOptions(
name, num_returns, c_resources,
b"",
c_serialized_runtime_env,
c_override_environment_variables),
&return_ids, max_retries, retry_exceptions,
max_retries, retry_exceptions,
c_pair[CPlacementGroupID, int64_t](
c_placement_group_id, placement_group_bundle_index),
placement_group_capture_child_tasks,
debugger_breakpoint)
return VectorToObjectRefs(return_ids)
return VectorToObjectRefs(return_refs)
def create_actor(self,
Language language,
@ -1509,7 +1523,7 @@ cdef class CoreWorker:
unordered_map[c_string, double] c_resources
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
c_vector[CObjectID] return_ids
c_vector[CObjectReference] return_refs
with self.profile_event(b"submit_task"):
if num_method_cpus > 0:
@ -1521,13 +1535,12 @@ cdef class CoreWorker:
# NOTE(edoakes): releasing the GIL while calling this method causes
# segfaults. See relevant issue for details:
# https://github.com/ray-project/ray/pull/12803
CCoreWorkerProcess.GetCoreWorker().SubmitActorTask(
return_refs = CCoreWorkerProcess.GetCoreWorker().SubmitActorTask(
c_actor_id,
ray_function,
args_vector, CTaskOptions(name, num_returns, c_resources),
&return_ids)
args_vector, CTaskOptions(name, num_returns, c_resources))
return VectorToObjectRefs(return_ids)
return VectorToObjectRefs(return_refs)
def kill_actor(self, ActorID actor_id, c_bool no_restart):
cdef:

View file

@ -3,6 +3,7 @@ from traceback import format_exception
import ray.cloudpickle as pickle
from ray.core.generated.common_pb2 import RayException, Language, PYTHON
import ray.ray_constants as ray_constants
import colorama
import setproctitle
@ -285,11 +286,23 @@ class ObjectLostError(RayError):
object_ref_hex: Hex ID of the object.
"""
def __init__(self, object_ref_hex):
def __init__(self, object_ref_hex, call_site):
self.object_ref_hex = object_ref_hex
self.call_site = call_site.replace(
ray_constants.CALL_STACK_LINE_DELIMITER, "\n ")
def __str__(self):
return (f"Object {self.object_ref_hex} is lost due to node failure.")
msg = (f"Object {self.object_ref_hex} cannot be retrieved due to node "
"failure or system error.")
if self.call_site:
msg += (f" The ObjectRef was created at: {self.call_site}")
else:
msg += (
" To see information about where this ObjectRef was created "
"in Python, set the environment variable "
"RAY_record_ref_creation_sites=1 during `ray start` and "
"`ray.init()`.")
return msg
class GetTimeoutError(RayError):

View file

@ -152,12 +152,15 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
pass
cdef cppclass CAddress "ray::rpc::Address":
CAddress()
const c_string &SerializeAsString()
const c_string &SerializeAsString() const
void ParseFromString(const c_string &serialized)
void CopyFrom(const CAddress& address)
const c_string &worker_id()
cdef cppclass CObjectReference "ray::rpc::ObjectReference":
CObjectReference()
CAddress owner_address() const
const c_string &object_id() const
const c_string &call_site() const
# This is a workaround for C++ enum class since Cython has no corresponding
# representation.
@ -243,7 +246,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CTaskArgByReference "ray::TaskArgByReference":
CTaskArgByReference(const CObjectID &object_id,
const CAddress &owner_address)
const CAddress &owner_address,
const c_string &call_site)
cdef cppclass CTaskArgByValue "ray::TaskArgByValue":
CTaskArgByValue(const shared_ptr[CRayObject] &data)

View file

@ -26,6 +26,7 @@ from ray.includes.gcs_client cimport CGcsClient
from ray.includes.common cimport (
CAddress,
CObjectReference,
CActorCreationOptions,
CBuffer,
CPlacementGroupCreationOptions,
@ -99,10 +100,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CWorkerType GetWorkerType()
CLanguage GetLanguage()
void SubmitTask(
c_vector[CObjectReference] SubmitTask(
const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids,
const CTaskOptions &options,
int max_retries,
c_bool retry_exceptions,
c_pair[CPlacementGroupID, int64_t] placement_options,
@ -120,11 +121,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CPlacementGroupID &placement_group_id)
CRayStatus WaitPlacementGroupReady(
const CPlacementGroupID &placement_group_id, int timeout_ms)
void SubmitActorTask(
c_vector[CObjectReference] SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
const CTaskOptions &options,
c_vector[CObjectID] *return_ids)
const CTaskOptions &options)
CRayStatus KillActor(
const CActorID &actor_id, c_bool force_kill,
c_bool no_restart)
@ -276,7 +276,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CRayFunction &ray_function,
const unordered_map[c_string, double] &resources,
const c_vector[shared_ptr[CRayObject]] &args,
const c_vector[CObjectID] &arg_reference_ids,
const c_vector[CObjectReference] &arg_refs,
const c_vector[CObjectID] &return_ids,
const c_string debugger_breakpoint,
c_vector[shared_ptr[CRayObject]] *returns,
@ -288,10 +288,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
(CRayStatus() nogil) check_signals
(void() nogil) gc_collect
(c_vector[c_string](
const c_vector[CObjectID] &,
const c_vector[c_string] &) nogil) spill_objects
const c_vector[CObjectReference] &) nogil) spill_objects
(int64_t(
const c_vector[CObjectID] &,
const c_vector[CObjectReference] &,
const c_vector[c_string] &) nogil) restore_spilled_objects
(void(
const c_vector[c_string]&,

View file

@ -35,10 +35,11 @@ def _set_future_helper(
cdef class ObjectRef(BaseID):
def __init__(self, id):
def __init__(self, id, call_site_data=""):
check_id(id)
self.data = CObjectID.FromBinary(<c_string>id)
self.in_core_worker = False
self.call_site_data = call_site_data
worker = ray.worker.global_worker
# TODO(edoakes): We should be able to remove the in_core_worker flag.
@ -84,6 +85,9 @@ cdef class ObjectRef(BaseID):
def job_id(self):
return self.task_id().job_id()
def call_site(self):
return decode(self.call_site_data)
cdef size_t hash(self):
return self.data.Hash()

View file

@ -64,3 +64,5 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_bool enable_timeline() const
uint32_t max_grpc_message_size() const
c_bool record_ref_creation_sites() const

View file

@ -107,3 +107,7 @@ cdef class Config:
@staticmethod
def max_grpc_message_size():
return RayConfig.instance().max_grpc_message_size()
@staticmethod
def record_ref_creation_sites():
return RayConfig.instance().record_ref_creation_sites()

View file

@ -272,3 +272,7 @@ HEALTHCHECK_EXPIRATION_S = os.environ.get("RAY_HEALTHCHECK_EXPIRATION_S", 10)
# Should be kept in sync with kSetupWorkerFilename in
# src/ray/common/constants.h.
SETUP_WORKER_FILENAME = "setup_worker.py"
# Used to separate lines when formatting the call stack where an ObjectRef was
# created.
CALL_STACK_LINE_DELIMITER = " | "

View file

@ -28,7 +28,7 @@ class DeserializationError(Exception):
pass
def _object_ref_deserializer(binary, owner_address, object_status):
def _object_ref_deserializer(binary, call_site, owner_address, object_status):
# NOTE(suquark): This function should be a global function so
# cloudpickle can access it directly. Otherwise cloudpickle
# has to dump the whole function definition, which is inefficient.
@ -37,7 +37,7 @@ def _object_ref_deserializer(binary, owner_address, object_status):
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectRef is greater than 0 by the
# time the core worker resolves the value of the object.
obj_ref = ray.ObjectRef(binary)
obj_ref = ray.ObjectRef(binary, call_site)
# TODO(edoakes): we should be able to just capture a reference
# to 'self' here instead, but this function is itself pickled
@ -92,7 +92,7 @@ class SerializationContext:
obj, owner_address, object_status = (
worker.core_worker.serialize_and_promote_object_ref(obj))
return _object_ref_deserializer, \
(obj.binary(), owner_address, object_status)
(obj.binary(), obj.call_site(), owner_address, object_status)
self._register_cloudpickle_reducer(ray.ObjectRef, object_ref_reducer)
serialization_addons.apply(self)
@ -223,7 +223,8 @@ class SerializationContext:
elif error_type == ErrorType.Value("TASK_CANCELLED"):
return TaskCancelledError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return ObjectLostError(object_ref.hex())
return ObjectLostError(object_ref.hex(),
object_ref.call_site())
elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"):
return RuntimeEnvSetupError()
else:

View file

@ -156,5 +156,72 @@ if __name__ == "__main__":
assert x == 42
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
@pytest.mark.parametrize("debug_enabled", [False, True])
def test_object_lost_error(ray_start_cluster, debug_enabled):
cluster = ray_start_cluster
system_config = {
"num_heartbeats_timeout": 3,
}
if debug_enabled:
system_config["record_ref_creation_sites"] = True
cluster.add_node(num_cpus=0, _system_config=system_config)
ray.init(address=cluster.address)
worker_node = cluster.add_node(num_cpus=1)
@ray.remote(num_cpus=1)
class Actor:
def __init__(self):
return
def foo(self):
return "x" * 1000_000
def done(self):
return
@ray.remote
def borrower(ref):
ray.get(ref[0])
@ray.remote
def task_arg(ref):
return
a = Actor.remote()
x = a.foo.remote()
ray.get(a.done.remote())
cluster.remove_node(worker_node, allow_graceful=False)
cluster.add_node(num_cpus=1)
y = borrower.remote([x])
try:
ray.get(x)
assert False
except ray.exceptions.ObjectLostError as e:
error = str(e)
print(error)
assert ("actor call" in error) == debug_enabled
assert ("test_object_lost_error" in error) == debug_enabled
try:
ray.get(y)
assert False
except ray.exceptions.RayTaskError as e:
error = str(e)
print(error)
assert ("actor call" in error) == debug_enabled
assert ("test_object_lost_error" in error) == debug_enabled
try:
ray.get(task_arg.remote(x))
except ray.exceptions.RayTaskError as e:
error = str(e)
print(error)
assert ("actor call" in error) == debug_enabled
assert ("test_object_lost_error" in error) == debug_enabled
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -34,19 +34,22 @@ class TaskArgByReference : public TaskArg {
///
/// \param[in] object_id Id of the argument.
/// \return The task argument.
TaskArgByReference(const ObjectID &object_id, const rpc::Address &owner_address)
: id_(object_id), owner_address_(owner_address) {}
TaskArgByReference(const ObjectID &object_id, const rpc::Address &owner_address,
const std::string &call_site)
: id_(object_id), owner_address_(owner_address), call_site_(call_site) {}
void ToProto(rpc::TaskArg *arg_proto) const {
auto ref = arg_proto->mutable_object_ref();
ref->set_object_id(id_.Binary());
ref->mutable_owner_address()->CopyFrom(owner_address_);
ref->set_call_site(call_site_);
}
private:
/// Id of the argument if passed by reference, otherwise nullptr.
const ObjectID id_;
const rpc::Address owner_address_;
const std::string call_site_;
};
class TaskArgByValue : public TaskArg {

View file

@ -39,9 +39,8 @@ void BuildCommonTaskSpec(
const std::vector<std::unique_ptr<TaskArg>> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
std::vector<ObjectID> *return_ids, const BundleID &bundle_id,
bool placement_group_capture_child_tasks, const std::string debugger_breakpoint,
const std::string &serialized_runtime_env,
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string debugger_breakpoint, const std::string &serialized_runtime_env,
const std::unordered_map<std::string, std::string> &override_environment_variables,
const std::string &concurrency_group_name = "") {
// Build common task spec.
@ -55,12 +54,6 @@ void BuildCommonTaskSpec(
for (const auto &arg : args) {
builder.AddArg(*arg);
}
// Compute return IDs.
return_ids->resize(num_returns);
for (size_t i = 0; i < num_returns; i++) {
(*return_ids)[i] = ObjectID::FromIndex(task_id, i + 1);
}
}
JobID GetProcessJobID(const CoreWorkerOptions &options) {
@ -1643,13 +1636,11 @@ std::unordered_map<std::string, double> AddPlacementGroupConstraint(
return resources;
}
void CoreWorker::SubmitTask(const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids, int max_retries,
bool retry_exceptions, BundleID placement_options,
bool placement_group_capture_child_tasks,
const std::string &debugger_breakpoint) {
std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
const RayFunction &function, const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options, int max_retries, bool retry_exceptions,
BundleID placement_options, bool placement_group_capture_child_tasks,
const std::string &debugger_breakpoint) {
TaskSpecBuilder builder;
const auto next_task_index = worker_context_.GetNextTaskIndex();
const auto task_id =
@ -1671,27 +1662,28 @@ void CoreWorker::SubmitTask(const RayFunction &function,
override_environment_variables.insert(current_override_environment_variables.begin(),
current_override_environment_variables.end());
// TODO(ekl) offload task building onto a thread pool for performance
BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, task_options.num_returns,
constrained_resources, required_resources, return_ids,
placement_options, placement_group_capture_child_tasks,
debugger_breakpoint, task_options.serialized_runtime_env,
override_environment_variables);
BuildCommonTaskSpec(
builder, worker_context_.GetCurrentJobID(), task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_,
function, args, task_options.num_returns, constrained_resources, required_resources,
placement_options, placement_group_capture_child_tasks, debugger_breakpoint,
task_options.serialized_runtime_env, override_environment_variables);
builder.SetNormalTaskSpec(max_retries, retry_exceptions);
TaskSpecification task_spec = builder.Build();
RAY_LOG(DEBUG) << "Submit task " << task_spec.DebugString();
std::vector<rpc::ObjectReference> returned_refs;
if (options_.is_local_mode) {
ExecuteTaskLocalMode(task_spec);
returned_refs = ExecuteTaskLocalMode(task_spec);
} else {
task_manager_->AddPendingTask(task_spec.CallerAddress(), task_spec, CurrentCallSite(),
max_retries);
returned_refs = task_manager_->AddPendingTask(task_spec.CallerAddress(), task_spec,
CurrentCallSite(), max_retries);
io_service_.post(
[this, task_spec]() {
RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec));
},
"CoreWorker.SubmitTask");
}
return returned_refs;
}
Status CoreWorker::CreateActor(const RayFunction &function,
@ -1717,7 +1709,6 @@ Status CoreWorker::CreateActor(const RayFunction &function,
actor_creation_options.override_environment_variables;
override_environment_variables.insert(current_override_environment_variables.begin(),
current_override_environment_variables.end());
std::vector<ObjectID> return_ids;
TaskSpecBuilder builder;
auto new_placement_resources =
AddPlacementGroupConstraint(actor_creation_options.placement_resources,
@ -1731,17 +1722,18 @@ Status CoreWorker::CreateActor(const RayFunction &function,
actor_name.empty()
? function.GetFunctionDescriptor()->DefaultTaskName()
: actor_name + ":" + function.GetFunctionDescriptor()->CallString();
BuildCommonTaskSpec(
builder, job_id, actor_creation_task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_,
function, args, 1, new_resource, new_placement_resources, &return_ids,
actor_creation_options.placement_options,
actor_creation_options.placement_group_capture_child_tasks,
"", /* debugger_breakpoint */
actor_creation_options.serialized_runtime_env, override_environment_variables);
BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, 1, new_resource,
new_placement_resources, actor_creation_options.placement_options,
actor_creation_options.placement_group_capture_child_tasks,
"", /* debugger_breakpoint */
actor_creation_options.serialized_runtime_env,
override_environment_variables);
auto actor_handle = std::make_unique<ActorHandle>(
actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0],
actor_id, GetCallerId(), rpc_address_, job_id,
/*actor_cursor=*/ObjectID::FromIndex(actor_creation_task_id, 1),
function.GetLanguage(), function.GetFunctionDescriptor(), extension_data,
actor_creation_options.max_task_retries);
std::string serialized_actor_handle;
@ -1871,10 +1863,9 @@ Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_gro
return status_future.get();
}
void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids) {
std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(
const ActorID &actor_id, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args, const TaskOptions &task_options) {
auto actor_handle = actor_manager_->GetActorHandle(actor_id);
// Add one for actor cursor object id for tasks.
@ -1891,36 +1882,37 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun
? function.GetFunctionDescriptor()->DefaultTaskName()
: task_options.name;
const std::unordered_map<std::string, std::string> override_environment_variables = {};
BuildCommonTaskSpec(
builder, actor_handle->CreationJobID(), actor_task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_,
function, args, num_returns, task_options.resources, required_resources, return_ids,
std::make_pair(PlacementGroupID::Nil(), -1),
true, /* placement_group_capture_child_tasks */
"", /* debugger_breakpoint */
"{}", /* serialized_runtime_env */
override_environment_variables, task_options.concurrency_group_name);
BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, num_returns, task_options.resources,
required_resources, std::make_pair(PlacementGroupID::Nil(), -1),
true, /* placement_group_capture_child_tasks */
"", /* debugger_breakpoint */
"{}", /* serialized_runtime_env */
override_environment_variables,
task_options.concurrency_group_name);
// NOTE: placement_group_capture_child_tasks and runtime_env will
// be ignored in the actor because we should always follow the actor's option.
const ObjectID new_cursor = return_ids->back();
// TODO(swang): Do we actually need to set this ObjectID?
const ObjectID new_cursor = ObjectID::FromIndex(actor_task_id, num_returns);
actor_handle->SetActorTaskSpec(builder, new_cursor);
// Remove cursor from return ids.
return_ids->pop_back();
// Submit task.
TaskSpecification task_spec = builder.Build();
std::vector<rpc::ObjectReference> returned_refs;
if (options_.is_local_mode) {
ExecuteTaskLocalMode(task_spec, actor_id);
returned_refs = ExecuteTaskLocalMode(task_spec, actor_id);
} else {
task_manager_->AddPendingTask(rpc_address_, task_spec, CurrentCallSite(),
actor_handle->MaxTaskRetries());
returned_refs = task_manager_->AddPendingTask(
rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries());
io_service_.post(
[this, task_spec]() {
RAY_UNUSED(direct_actor_submitter_->SubmitTask(task_spec));
},
"CoreWorker.SubmitActorTask");
}
return returned_refs;
}
Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill,
@ -2172,14 +2164,13 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()};
std::vector<std::shared_ptr<RayObject>> args;
std::vector<ObjectID> arg_reference_ids;
std::vector<rpc::ObjectReference> arg_refs;
// This includes all IDs that were passed by reference and any IDs that were
// inlined in the task spec. These references will be pinned during the task
// execution and unpinned once the task completes. We will notify the caller
// about any IDs that we are still borrowing by the time the task completes.
std::vector<ObjectID> borrowed_ids;
RAY_CHECK_OK(
GetAndPinArgsForExecutor(task_spec, &args, &arg_reference_ids, &borrowed_ids));
RAY_CHECK_OK(GetAndPinArgsForExecutor(task_spec, &args, &arg_refs, &borrowed_ids));
std::vector<ObjectID> return_ids;
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
@ -2216,9 +2207,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
status = options_.task_execution_callback(
task_type, task_spec.GetName(), func,
task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids,
return_ids, task_spec.GetDebuggerBreakpoint(), return_objects,
creation_task_exception_pb_bytes, is_application_level_error);
task_spec.GetRequiredResources().GetResourceMap(), args, arg_refs, return_ids,
task_spec.GetDebuggerBreakpoint(), return_objects, creation_task_exception_pb_bytes,
is_application_level_error);
// Get the reference counts for any IDs that we borrowed during this task and
// return them to the caller. This will notify the caller of any IDs that we
@ -2292,18 +2283,28 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id,
return status;
}
void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
const ActorID &actor_id) {
std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
const TaskSpecification &task_spec, const ActorID &actor_id) {
auto resource_ids = std::make_shared<ResourceMappingType>();
auto return_objects = std::vector<std::shared_ptr<RayObject>>();
auto borrowed_refs = ReferenceCounter::ReferenceTableProto();
if (!task_spec.IsActorCreationTask()) {
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
std::vector<rpc::ObjectReference> returned_refs;
size_t num_returns = task_spec.NumReturns();
if (task_spec.IsActorTask()) {
num_returns--;
}
for (size_t i = 0; i < num_returns; i++) {
if (!task_spec.IsActorCreationTask()) {
reference_counter_->AddOwnedObject(task_spec.ReturnId(i),
/*inner_ids=*/{}, rpc_address_,
CurrentCallSite(), -1,
/*is_reconstructable=*/false);
}
rpc::ObjectReference ref;
ref.set_object_id(task_spec.ReturnId(i).Binary());
ref.mutable_owner_address()->CopyFrom(task_spec.CallerAddress());
returned_refs.push_back(std::move(ref));
}
auto old_id = GetActorId();
SetActorId(actor_id);
@ -2311,15 +2312,16 @@ void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
RAY_UNUSED(ExecuteTask(task_spec, resource_ids, &return_objects, &borrowed_refs,
&is_application_level_error));
SetActorId(old_id);
return returned_refs;
}
Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids,
std::vector<rpc::ObjectReference> *arg_refs,
std::vector<ObjectID> *borrowed_ids) {
auto num_args = task.NumArgs();
args->resize(num_args);
arg_reference_ids->resize(num_args);
arg_refs->resize(num_args);
absl::flat_hash_set<ObjectID> by_ref_ids;
absl::flat_hash_map<ObjectID, std::vector<size_t>> by_ref_indices;
@ -2332,7 +2334,8 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task,
RAY_UNUSED(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
task.ArgId(i)));
}
const auto &arg_id = task.ArgId(i);
const auto &arg_ref = task.ArgRef(i);
const auto arg_id = ObjectID::FromBinary(arg_ref.object_id());
by_ref_ids.insert(arg_id);
auto it = by_ref_indices.find(arg_id);
if (it == by_ref_indices.end()) {
@ -2340,7 +2343,7 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task,
} else {
it->second.push_back(i);
}
arg_reference_ids->at(i) = arg_id;
arg_refs->at(i) = arg_ref;
// Pin all args passed by reference for the duration of the task. This
// ensures that when the task completes, we can retrieve metadata about
// any borrowed ObjectIDs that were serialized in the argument's value.
@ -2369,7 +2372,7 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task,
bool copy_data = options_.language == Language::PYTHON;
args->at(i) =
std::make_shared<RayObject>(data, metadata, task.ArgInlinedRefs(i), copy_data);
arg_reference_ids->at(i) = ObjectID::Nil();
arg_refs->at(i).set_object_id(ObjectID::Nil().Binary());
// The task borrows all ObjectIDs that were serialized in the inlined
// arguments. The task will receive references to these IDs, so it is
// possible for the task to continue borrowing these arguments by the
@ -2940,18 +2943,9 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
rpc::SpillObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (options_.spill_objects != nullptr) {
std::vector<ObjectID> object_ids_to_spill;
object_ids_to_spill.reserve(request.object_ids_to_spill_size());
for (const auto &id_binary : request.object_ids_to_spill()) {
object_ids_to_spill.push_back(ObjectID::FromBinary(id_binary));
}
std::vector<std::string> owner_addresses;
owner_addresses.reserve(request.owner_addresses_size());
for (const auto &owner_address : request.owner_addresses()) {
owner_addresses.push_back(owner_address.SerializeAsString());
}
std::vector<std::string> object_urls =
options_.spill_objects(object_ids_to_spill, owner_addresses);
auto object_refs =
VectorFromProtobuf<rpc::ObjectReference>(request.object_refs_to_spill());
std::vector<std::string> object_urls = options_.spill_objects(object_refs);
for (size_t i = 0; i < object_urls.size(); i++) {
reply->add_spilled_objects_url(std::move(object_urls[i]));
}
@ -2985,10 +2979,12 @@ void CoreWorker::HandleRestoreSpilledObjects(
rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) {
if (options_.restore_spilled_objects != nullptr) {
// Get a list of object ids.
std::vector<ObjectID> object_ids_to_restore;
object_ids_to_restore.reserve(request.object_ids_to_restore_size());
std::vector<rpc::ObjectReference> object_refs_to_restore;
object_refs_to_restore.reserve(request.object_ids_to_restore_size());
for (const auto &id_binary : request.object_ids_to_restore()) {
object_ids_to_restore.push_back(ObjectID::FromBinary(id_binary));
rpc::ObjectReference ref;
ref.set_object_id(id_binary);
object_refs_to_restore.push_back(std::move(ref));
}
// Get a list of spilled_object_urls.
std::vector<std::string> spilled_objects_url;
@ -2997,7 +2993,7 @@ void CoreWorker::HandleRestoreSpilledObjects(
spilled_objects_url.push_back(url);
}
auto total =
options_.restore_spilled_objects(object_ids_to_restore, spilled_objects_url);
options_.restore_spilled_objects(object_refs_to_restore, spilled_objects_url);
reply->set_bytes_restored_total(total);
send_reply_callback(Status::OK(), nullptr, nullptr);
} else {

View file

@ -67,7 +67,7 @@ struct CoreWorkerOptions {
TaskType task_type, const std::string task_name, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids, const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<RayObject>> *results,
std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes,
@ -152,11 +152,11 @@ struct CoreWorkerOptions {
/// be held up in garbage objects.
std::function<void()> gc_collect;
/// Application-language callback to spill objects to external storage.
std::function<std::vector<std::string>(const std::vector<ObjectID> &,
const std::vector<std::string> &)>
std::function<std::vector<std::string>(const std::vector<rpc::ObjectReference> &)>
spill_objects;
/// Application-language callback to restore objects from external storage.
std::function<int64_t(const std::vector<ObjectID> &, const std::vector<std::string> &)>
std::function<int64_t(const std::vector<rpc::ObjectReference> &,
const std::vector<std::string> &)>
restore_spilled_objects;
/// Application-language callback to delete objects from external storage.
std::function<void(const std::vector<std::string> &, rpc::WorkerType)>
@ -703,19 +703,18 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] function The remote function to execute.
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \param[out] return_ids Ids of the return objects.
/// \param[in] max_retires max number of retry when the task fails.
/// \param[in] placement_options placement group options.
/// \param[in] placement_group_capture_child_tasks whether or not the submitted task
/// \param[in] debugger_breakpoint breakpoint to drop into for the debugger after this
/// task starts executing, or "" if we do not want to drop into the debugger.
/// should capture parent's placement group implicilty.
void SubmitTask(const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options, std::vector<ObjectID> *return_ids,
int max_retries, bool retry_exceptions, BundleID placement_options,
bool placement_group_capture_child_tasks,
const std::string &debugger_breakpoint);
/// \return ObjectRefs returned by this task.
std::vector<rpc::ObjectReference> SubmitTask(
const RayFunction &function, const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options, int max_retries, bool retry_exceptions,
BundleID placement_options, bool placement_group_capture_child_tasks,
const std::string &debugger_breakpoint);
/// Create an actor.
///
@ -770,14 +769,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] function The remote function to execute.
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \param[out] return_ids Ids of the return objects.
/// \return Status error if the task is invalid or if the task submission
/// failed. Tasks can be invalid for direct actor calls because not all tasks
/// are currently supported.
void SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids);
/// \return ObjectRefs returned by this task.
std::vector<rpc::ObjectReference> SubmitActorTask(
const ActorID &actor_id, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args, const TaskOptions &task_options);
/// Tell an actor to exit immediately, without completing outstanding work.
///
@ -1129,8 +1124,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Execute a local mode task (runs normal ExecuteTask)
///
/// \param spec[in] task_spec Task specification.
void ExecuteTaskLocalMode(const TaskSpecification &task_spec,
const ActorID &actor_id = ActorID::Nil());
std::vector<rpc::ObjectReference> ExecuteTaskLocalMode(
const TaskSpecification &task_spec, const ActorID &actor_id = ActorID::Nil());
/// KillActor API for a local mode.
Status KillActorLocalMode(const ActorID &actor_id);
@ -1172,7 +1167,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \return Error if the values could not be retrieved.
Status GetAndPinArgsForExecutor(const TaskSpecification &task,
std::vector<std::shared_ptr<RayObject>> *args,
std::vector<ObjectID> *arg_reference_ids,
std::vector<rpc::ObjectReference> *arg_refs,
std::vector<ObjectID> *pinned_ids);
/// Process a subscribe message for wait for object eviction.

View file

@ -97,7 +97,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
[](TaskType task_type, const std::string task_name, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids, const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<RayObject>> *results,
std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb,

View file

@ -74,7 +74,8 @@ inline std::vector<std::unique_ptr<TaskArg>> ToTaskArgs(JNIEnv *env, jobject arg
RAY_CHECK(java_owner_address);
auto owner_address = JavaProtobufObjectToNativeProtobufObject<rpc::Address>(
env, java_owner_address);
return std::unique_ptr<TaskArg>(new TaskArgByReference(id, owner_address));
return std::unique_ptr<TaskArg>(
new TaskArgByReference(id, owner_address, /*call_site=*/""));
}
auto java_value =
static_cast<jbyteArray>(env->GetObjectField(arg, java_function_arg_value));
@ -303,15 +304,18 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSub
auto task_options = ToTaskOptions(env, numReturns, callOptions);
auto placement_group_options = ToPlacementGroupOptions(env, callOptions);
std::vector<ObjectID> return_ids;
// TODO (kfstorm): Allow setting `max_retries` via `CallOptions`.
CoreWorkerProcess::GetCoreWorker().SubmitTask(
ray_function, task_args, task_options, &return_ids,
auto return_refs = CoreWorkerProcess::GetCoreWorker().SubmitTask(
ray_function, task_args, task_options,
/*max_retries=*/0,
/*retry_exceptions=*/false,
/*placement_options=*/placement_group_options,
/*placement_group_capture_child_tasks=*/true,
/*debugger_breakpoint*/ "");
std::vector<ObjectID> return_ids;
for (const auto &ref : return_refs) {
return_ids.push_back(ObjectID::FromBinary(ref.object_id()));
}
// This is to avoid creating an empty java list and boost performance.
if (return_ids.empty()) {
@ -350,9 +354,12 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(
RAY_CHECK(callOptions != nullptr);
auto task_options = ToTaskOptions(env, numReturns, callOptions);
auto return_refs = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, ray_function, task_args, task_options);
std::vector<ObjectID> return_ids;
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, ray_function, task_args,
task_options, &return_ids);
for (const auto &ref : return_refs) {
return_ids.push_back(ObjectID::FromBinary(ref.object_id()));
}
// This is to avoid creating an empty java list and boost performance.
if (return_ids.empty()) {

View file

@ -30,9 +30,9 @@ const int64_t kTaskFailureThrottlingThreshold = 50;
// Throttle task failure logs to once this interval.
const int64_t kTaskFailureLoggingFrequencyMillis = 5000;
void TaskManager::AddPendingTask(const rpc::Address &caller_address,
const TaskSpecification &spec,
const std::string &call_site, int max_retries) {
std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
const rpc::Address &caller_address, const TaskSpecification &spec,
const std::string &call_site, int max_retries) {
RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries
<< " retries";
@ -62,8 +62,9 @@ void TaskManager::AddPendingTask(const rpc::Address &caller_address,
if (spec.IsActorTask()) {
num_returns--;
}
if (!spec.IsActorCreationTask()) {
for (size_t i = 0; i < num_returns; i++) {
std::vector<rpc::ObjectReference> returned_refs;
for (size_t i = 0; i < num_returns; i++) {
if (!spec.IsActorCreationTask()) {
// We pass an empty vector for inner IDs because we do not know the return
// value of the task yet. If the task returns an ID(s), the worker will
// publish the WaitForRefRemoved message that we are now a borrower for
@ -73,6 +74,12 @@ void TaskManager::AddPendingTask(const rpc::Address &caller_address,
/*inner_ids=*/{}, caller_address, call_site, -1,
/*is_reconstructable=*/true);
}
rpc::ObjectReference ref;
ref.set_object_id(spec.ReturnId(i).Binary());
ref.mutable_owner_address()->CopyFrom(caller_address);
ref.set_call_site(call_site);
returned_refs.push_back(std::move(ref));
}
{
@ -82,6 +89,8 @@ void TaskManager::AddPendingTask(const rpc::Address &caller_address,
.second);
num_pending_tasks_++;
}
return returned_refs;
}
Status TaskManager::ResubmitTask(const TaskID &task_id,

View file

@ -94,9 +94,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// \param[in] spec The spec of the pending task.
/// \param[in] max_retries Number of times this task may be retried
/// on failure.
/// \return Void.
void AddPendingTask(const rpc::Address &caller_address, const TaskSpecification &spec,
const std::string &call_site, int max_retries = 0);
/// \return ObjectRefs returned by this task.
std::vector<rpc::ObjectReference> AddPendingTask(const rpc::Address &caller_address,
const TaskSpecification &spec,
const std::string &call_site,
int max_retries = 0);
/// Resubmit a task that has completed execution before. This is used to
/// reconstruct objects stored in Plasma that were lost.

View file

@ -26,6 +26,7 @@
#include "hiredis/async.h"
#include "hiredis/hiredis.h"
#include "ray/common/buffer.h"
#include "ray/common/common_protocol.h"
#include "ray/common/ray_object.h"
#include "ray/common/task/task_spec.h"
#include "ray/common/test_util.h"
@ -204,12 +205,11 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id,
std::unordered_map<std::string, double> &resources) {
std::vector<std::unique_ptr<TaskArg>> args;
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")};
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options,
&return_ids);
auto return_ids = ObjectRefsToIds(
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options));
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results));
@ -242,16 +242,17 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &res
std::vector<std::unique_ptr<TaskArg>> args;
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
buffer1, nullptr, std::vector<rpc::ObjectReference>())));
args.emplace_back(new TaskArgByReference(object_id, driver.GetRpcAddress()));
args.emplace_back(
new TaskArgByReference(object_id, driver.GetRpcAddress(), /*call_site=*/""));
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
TaskOptions options;
std::vector<ObjectID> return_ids;
driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0,
/*retry_exceptions=*/false,
std::make_pair(PlacementGroupID::Nil(), -1), true,
/*debugger_breakpoint=*/"");
auto return_refs = driver.SubmitTask(
func, args, options, /*max_retries=*/0,
/*retry_exceptions=*/false, std::make_pair(PlacementGroupID::Nil(), -1), true,
/*debugger_breakpoint=*/"");
auto return_ids = ObjectRefsToIds(return_refs);
ASSERT_EQ(return_ids.size(), 1);
@ -289,11 +290,11 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
buffer2, nullptr, std::vector<rpc::ObjectReference>())));
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ASSERT_EQ(return_ids.size(), 1);
std::vector<std::shared_ptr<RayObject>> results;
@ -327,15 +328,16 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
// Create arguments with PassByRef and PassByValue.
std::vector<std::unique_ptr<TaskArg>> args;
args.emplace_back(new TaskArgByReference(object_id, driver.GetRpcAddress()));
args.emplace_back(
new TaskArgByReference(object_id, driver.GetRpcAddress(), /*call_site=*/""));
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
buffer2, nullptr, std::vector<rpc::ObjectReference>())));
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ASSERT_EQ(return_ids.size(), 1);
@ -394,11 +396,11 @@ void CoreWorkerTest::TestActorRestart(
buffer1, nullptr, std::vector<rpc::ObjectReference>())));
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ASSERT_EQ(return_ids.size(), 1);
// Verify if it's expected data.
std::vector<std::shared_ptr<RayObject>> results;
@ -437,11 +439,11 @@ void CoreWorkerTest::TestActorFailure(
buffer1, nullptr, std::vector<rpc::ObjectReference>())));
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ASSERT_EQ(return_ids.size(), 1);
all_results.emplace_back(std::make_pair(return_ids[0], buffer1));
@ -523,7 +525,6 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
rpc::Address address;
for (int i = 0; i < num_tasks; i++) {
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
auto num_returns = options.num_returns;
TaskSpecBuilder builder;
@ -571,11 +572,11 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
buffer, nullptr, std::vector<rpc::ObjectReference>())));
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
auto return_ids =
ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options));
ASSERT_EQ(return_ids.size(), 1);
object_ids.emplace_back(return_ids[0]);
}
@ -845,9 +846,6 @@ TEST_F(SingleNodeTest, TestCancelTasks) {
FunctionDescriptorBuilder::BuildPython("WhileTrueLoop", "", "", ""));
RayFunction func2(Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("WhileTrueLoop", "", "", ""));
// Return IDs for the two functions that implement while(true) loops.
std::vector<ObjectID> return_ids1;
std::vector<ObjectID> return_ids2;
// Create default args and options needed to submit the tasks that encapsulate func1 and
// func2.
@ -855,17 +853,17 @@ TEST_F(SingleNodeTest, TestCancelTasks) {
TaskOptions options;
// Submit func1. The function should start looping forever.
driver.SubmitTask(func1, args, options, &return_ids1, /*max_retries=*/0,
/*retry_exceptions=*/false,
std::make_pair(PlacementGroupID::Nil(), -1), true,
/*debugger_breakpoint=*/"");
auto return_ids1 = ObjectRefsToIds(driver.SubmitTask(
func1, args, options, /*max_retries=*/0,
/*retry_exceptions=*/false, std::make_pair(PlacementGroupID::Nil(), -1), true,
/*debugger_breakpoint=*/""));
ASSERT_EQ(return_ids1.size(), 1);
// Submit func2. The function should be queued at the worker indefinitely.
driver.SubmitTask(func2, args, options, &return_ids2, /*max_retries=*/0,
/*retry_exceptions=*/false,
std::make_pair(PlacementGroupID::Nil(), -1), true,
/*debugger_breakpoint=*/"");
auto return_ids2 = ObjectRefsToIds(driver.SubmitTask(
func2, args, options, /*max_retries=*/0,
/*retry_exceptions=*/false, std::make_pair(PlacementGroupID::Nil(), -1), true,
/*debugger_breakpoint=*/""));
ASSERT_EQ(return_ids2.size(), 1);
// Cancel func2 by removing it from the worker's queue

View file

@ -60,7 +60,7 @@ class MockWorker {
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<RayObject>> *results) {

View file

@ -254,6 +254,10 @@ message ObjectReference {
bytes object_id = 1;
// The address of the object's owner.
Address owner_address = 2;
// Language call site of the object reference (i.e., file and line number).
// Used to print debugging information if there is an error retrieving the
// object.
string call_site = 3;
}
message ObjectReferenceCount {

View file

@ -277,11 +277,7 @@ message PlasmaObjectReadyReply {
}
message SpillObjectsRequest {
// The IDs of objects to be spilled.
repeated bytes object_ids_to_spill = 1;
// The owner addresses of the objects to be spilled. Must be in the same order as
// object_ids_to_spill.
repeated Address owner_addresses = 2;
repeated ObjectReference object_refs_to_spill = 1;
}
message SpillObjectsReply {

View file

@ -238,10 +238,11 @@ void LocalObjectManager::SpillObjectsInternal(
rpc::SpillObjectsRequest request;
for (const auto &object_id : objects_to_spill) {
RAY_LOG(DEBUG) << "Sending spill request for object " << object_id;
request.add_object_ids_to_spill(object_id.Binary());
auto ref = request.add_object_refs_to_spill();
ref->set_object_id(object_id.Binary());
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
request.add_owner_addresses()->MergeFrom(it->second.second);
ref->mutable_owner_address()->CopyFrom(it->second.second);
}
io_worker->rpc_client()->SpillObjects(
request, [this, objects_to_spill, callback, io_worker](

View file

@ -129,12 +129,12 @@ RayTask CreateTask(const std::unordered_map<std::string, double> &required_resou
if (!args.empty()) {
for (auto &arg : args) {
spec_builder.AddArg(TaskArgByReference(arg, rpc::Address()));
spec_builder.AddArg(TaskArgByReference(arg, rpc::Address(), ""));
}
} else {
for (int i = 0; i < num_args; i++) {
ObjectID put_id = ObjectID::FromIndex(RandomTaskId(), /*index=*/i + 1);
spec_builder.AddArg(TaskArgByReference(put_id, rpc::Address()));
spec_builder.AddArg(TaskArgByReference(put_id, rpc::Address(), ""));
}
}

View file

@ -21,9 +21,9 @@ namespace streaming {
using ray::core::CoreWorkerProcess;
using ray::core::TaskOptions;
void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids) {
std::vector<rpc::ObjectReference> SendInternal(const ActorID &peer_actor_id,
std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num) {
std::unordered_map<std::string, double> resources;
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
TaskOptions options{name, return_num, resources};
@ -44,8 +44,8 @@ void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffe
std::move(buffer), meta, std::vector<rpc::ObjectReference>(), true)));
std::vector<std::shared_ptr<RayObject>> results;
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args,
options, &return_ids);
return CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args,
options);
}
} // namespace streaming
} // namespace ray

View file

@ -29,9 +29,9 @@ using ray::core::RayFunction;
/// \param[in] function the function descriptor of peer's function.
/// \param[in] return_num return value number of the call.
/// \param[out] return_ids return ids from SubmitActorTask.
void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids);
std::vector<rpc::ObjectReference> SendInternal(const ActorID &peer_actor_id,
std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num);
} // namespace streaming
} // namespace ray

View file

@ -1,6 +1,7 @@
#include "queue/transport.h"
#include "queue/utils.h"
#include "ray/common/common_protocol.h"
#include "ray/streaming/streaming.h"
namespace ray {
@ -11,16 +12,15 @@ static constexpr int TASK_OPTION_RETURN_NUM_1 = 1;
void Transport::Send(std::shared_ptr<LocalMemoryBuffer> buffer) {
STREAMING_LOG(DEBUG) << "Transport::Send buffer size: " << buffer->Size();
std::vector<ObjectID> return_ids;
ray::streaming::SendInternal(peer_actor_id_, std::move(buffer), async_func_,
TASK_OPTION_RETURN_NUM_0, return_ids);
RAY_UNUSED(ray::streaming::SendInternal(peer_actor_id_, std::move(buffer), async_func_,
TASK_OPTION_RETURN_NUM_0));
}
std::shared_ptr<LocalMemoryBuffer> Transport::SendForResult(
std::shared_ptr<LocalMemoryBuffer> buffer, int64_t timeout_ms) {
std::vector<ObjectID> return_ids;
ray::streaming::SendInternal(peer_actor_id_, buffer, sync_func_,
TASK_OPTION_RETURN_NUM_1, return_ids);
auto return_refs = ray::streaming::SendInternal(peer_actor_id_, buffer, sync_func_,
TASK_OPTION_RETURN_NUM_1);
auto return_ids = ObjectRefsToIds(return_refs);
std::vector<std::shared_ptr<RayObject>> results;
Status get_st =

View file

@ -517,7 +517,7 @@ class StreamingWorker {
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<RayObject>> *results) {

View file

@ -1,6 +1,7 @@
#pragma once
#include "hiredis/hiredis.h"
#include "ray/common/common_protocol.h"
#include "ray/common/test_util.h"
#include "ray/util/filesystem.h"
@ -82,11 +83,10 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
msg.ToBytes(), nullptr, std::vector<rpc::ObjectReference>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options{"", 0, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")};
driver.SubmitActorTask(self_actor_id, func, args, options, &return_ids);
RAY_UNUSED(driver.SubmitActorTask(self_actor_id, func, args, options));
}
void SubmitTestToActor(ActorID &actor_id, const std::string test) {
@ -98,11 +98,10 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
buffer, nullptr, std::vector<rpc::ObjectReference>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options("", 0, resources);
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"", test, "execute_test", "")};
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
RAY_UNUSED(driver.SubmitActorTask(actor_id, func, args, options));
}
bool CheckCurTest(ActorID &actor_id, const std::string test_name) {
@ -114,11 +113,11 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
buffer, nullptr, std::vector<rpc::ObjectReference>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"", "", "check_current_test_status", "")};
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
auto return_refs = driver.SubmitActorTask(actor_id, func, args, options);
auto return_ids = ObjectRefsToIds(return_refs);
std::vector<bool> wait_results;
std::vector<std::shared_ptr<RayObject>> results;