From d43d297d9ab5c69f75b854419fa8e4fd70caec44 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 1 Sep 2021 15:29:05 -0700 Subject: [PATCH] [core] Attach call site to ObjectRefs, print on error (#17971) * Attach call site to ObjectRef * flag * Fix build * build * build * build * x * x * skip on windows * lint --- cpp/src/ray/runtime/abstract_ray_runtime.cc | 3 +- .../ray/runtime/task/native_task_submitter.cc | 16 +- cpp/src/ray/runtime/task/task_executor.cc | 2 +- cpp/src/ray/runtime/task/task_executor.h | 2 +- python/ray/_raylet.pxd | 1 + python/ray/_raylet.pyx | 61 ++++--- python/ray/exceptions.py | 17 +- python/ray/includes/common.pxd | 8 +- python/ray/includes/libcoreworker.pxd | 17 +- python/ray/includes/object_ref.pxi | 6 +- python/ray/includes/ray_config.pxd | 2 + python/ray/includes/ray_config.pxi | 4 + python/ray/ray_constants.py | 4 + python/ray/serialization.py | 9 +- python/ray/tests/test_failure_4.py | 67 +++++++ src/ray/common/task/task_util.h | 7 +- src/ray/core_worker/core_worker.cc | 168 +++++++++--------- src/ray/core_worker/core_worker.h | 39 ++-- .../java/io_ray_runtime_RayNativeRuntime.cc | 2 +- ...io_ray_runtime_task_NativeTaskSubmitter.cc | 19 +- src/ray/core_worker/task_manager.cc | 19 +- src/ray/core_worker/task_manager.h | 8 +- src/ray/core_worker/test/core_worker_test.cc | 62 ++++--- src/ray/core_worker/test/mock_worker.cc | 2 +- src/ray/protobuf/common.proto | 4 + src/ray/protobuf/core_worker.proto | 6 +- src/ray/raylet/local_object_manager.cc | 5 +- .../scheduling/cluster_task_manager_test.cc | 4 +- src/ray/streaming/streaming.cc | 10 +- src/ray/streaming/streaming.h | 6 +- streaming/src/queue/transport.cc | 12 +- streaming/src/test/mock_actor.cc | 2 +- streaming/src/test/queue_tests_base.h | 11 +- 33 files changed, 366 insertions(+), 239 deletions(-) diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 76698cb2f..6e91665a0 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -129,7 +129,8 @@ std::vector> TransformArgs( } else { RAY_CHECK(arg.id); ray_arg = absl::make_unique(ObjectID::FromBinary(*arg.id), - ray::rpc::Address{}); + ray::rpc::Address{}, + /*call_site=*/""); } ray_args.push_back(std::move(ray_arg)); } diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index 35d4a4475..bbf347db0 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -36,14 +36,18 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, TaskOptions options{}; options.name = call_options.name; options.resources = call_options.resources; - std::vector return_ids; + std::vector return_refs; if (invocation.task_type == TaskType::ACTOR_TASK) { - core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation), - invocation.args, options, &return_ids); + return_refs = core_worker.SubmitActorTask( + invocation.actor_id, BuildRayFunction(invocation), invocation.args, options); } else { - core_worker.SubmitTask(BuildRayFunction(invocation), invocation.args, options, - &return_ids, 1, false, - std::make_pair(PlacementGroupID::Nil(), -1), true, ""); + return_refs = core_worker.SubmitTask( + BuildRayFunction(invocation), invocation.args, options, 1, false, + std::make_pair(PlacementGroupID::Nil(), -1), true, ""); + } + std::vector return_ids; + for (const auto &ref : return_refs) { + return_ids.push_back(ObjectID::FromBinary(ref.object_id())); } return return_ids[0]; } diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index ba9d62f06..71cb205f0 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -123,7 +123,7 @@ Status TaskExecutor::ExecuteTask( ray::TaskType task_type, const std::string task_name, const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args_buffer, - const std::vector &arg_reference_ids, + const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, std::vector> *results, std::shared_ptr &creation_task_exception_pb_bytes, diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h index 5714ef9cf..a528f17e0 100644 --- a/cpp/src/ray/runtime/task/task_executor.h +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -76,7 +76,7 @@ class TaskExecutor { const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, - const std::vector &arg_reference_ids, + const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, std::vector> *results, std::shared_ptr &creation_task_exception_pb_bytes, diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index a48bcbbe5..7a59b5f79 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -81,6 +81,7 @@ cdef class ObjectRef(BaseID): # of active IDs in the core worker so we know whether we should clean # it up. c_bool in_core_worker + c_string call_site_data cdef CObjectID native(self) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index dad0d0c78..07f841341 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -46,6 +46,7 @@ from cython.operator import dereference, postincrement from ray.includes.common cimport ( CBuffer, CAddress, + CObjectReference, CLanguage, CObjectReference, CRayObject, @@ -182,10 +183,11 @@ cdef RayObjectsToDataMetadataPairs( return data_metadata_pairs -cdef VectorToObjectRefs(const c_vector[CObjectID] &object_refs): +cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs): result = [] for i in range(object_refs.size()): - result.append(ObjectRef(object_refs[i].Binary())) + result.append(ObjectRef(object_refs[i].object_id(), + object_refs[i].call_site())) return result @@ -328,6 +330,7 @@ cdef prepare_args( int64_t total_inlined shared_ptr[CBuffer] arg_data c_vector[CObjectID] inlined_ids + c_string put_arg_call_site c_vector[CObjectReference] inlined_refs worker = ray.worker.global_worker @@ -341,7 +344,8 @@ cdef prepare_args( unique_ptr[CTaskArg](new CTaskArgByReference( c_arg, CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( - c_arg)))) + c_arg), + arg.call_site()))) else: serialized_arg = worker.get_serialization_context().serialize(arg) @@ -356,6 +360,8 @@ cdef prepare_args( metadata_fields[0], language)) size = serialized_arg.total_bytes + if RayConfig.instance().record_ref_creation_sites(): + get_py_stack(&put_arg_call_site) # TODO(edoakes): any objects containing ObjectRefs are spilled to # plasma here. This is inefficient for small objects, but inlined # arguments aren't associated ObjectRefs right now so this is a @@ -383,7 +389,8 @@ cdef prepare_args( new CTaskArgByReference(CObjectID.FromBinary( core_worker.put_serialized_object( serialized_arg, inline_small_object=False)), - CCoreWorkerProcess.GetCoreWorker().GetRpcAddress()))) + CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(), + put_arg_call_site))) cdef raise_if_dependency_failed(arg): @@ -403,7 +410,7 @@ cdef execute_task( const CRayFunction &ray_function, const unordered_map[c_string, double] &c_resources, const c_vector[shared_ptr[CRayObject]] &c_args, - const c_vector[CObjectID] &c_arg_reference_ids, + const c_vector[CObjectReference] &c_arg_refs, const c_vector[CObjectID] &c_return_ids, const c_string debugger_breakpoint, c_vector[shared_ptr[CRayObject]] *returns, @@ -526,7 +533,7 @@ cdef execute_task( args, kwargs = [], {} else: metadata_pairs = RayObjectsToDataMetadataPairs(c_args) - object_refs = VectorToObjectRefs(c_arg_reference_ids) + object_refs = VectorToObjectRefs(c_arg_refs) if core_worker.current_actor_is_asyncio(): # We deserialize objects in event loop thread to @@ -658,7 +665,7 @@ cdef CRayStatus task_execution_handler( const CRayFunction &ray_function, const unordered_map[c_string, double] &c_resources, const c_vector[shared_ptr[CRayObject]] &c_args, - const c_vector[CObjectID] &c_arg_reference_ids, + const c_vector[CObjectReference] &c_arg_refs, const c_vector[CObjectID] &c_return_ids, const c_string debugger_breakpoint, c_vector[shared_ptr[CRayObject]] *returns, @@ -670,7 +677,7 @@ cdef CRayStatus task_execution_handler( # The call to execute_task should never raise an exception. If # it does, that indicates that there was an internal error. execute_task(task_type, task_name, ray_function, c_resources, - c_args, c_arg_reference_ids, c_return_ids, + c_args, c_arg_refs, c_return_ids, debugger_breakpoint, returns, is_application_level_error) except Exception as e: @@ -747,11 +754,17 @@ cdef void run_on_util_worker_handler( cdef c_vector[c_string] spill_objects_handler( - const c_vector[CObjectID]& object_ids_to_spill, - const c_vector[c_string]& owner_addresses) nogil: - cdef c_vector[c_string] return_urls + const c_vector[CObjectReference]& object_refs_to_spill) nogil: + cdef: + c_vector[c_string] return_urls + c_vector[c_string] owner_addresses + with gil: - object_refs = VectorToObjectRefs(object_ids_to_spill) + object_refs = VectorToObjectRefs(object_refs_to_spill) + for i in range(object_refs_to_spill.size()): + owner_addresses.push_back( + object_refs_to_spill[i].owner_address() + .SerializeAsString()) try: with ray.worker._changeproctitle( ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER, @@ -774,7 +787,7 @@ cdef c_vector[c_string] spill_objects_handler( cdef int64_t restore_spilled_objects_handler( - const c_vector[CObjectID]& object_ids_to_restore, + const c_vector[CObjectReference]& object_refs_to_restore, const c_vector[c_string]& object_urls) nogil: cdef: int64_t bytes_restored = 0 @@ -783,7 +796,7 @@ cdef int64_t restore_spilled_objects_handler( size = object_urls.size() for i in range(size): urls.append(object_urls[i]) - object_refs = VectorToObjectRefs(object_ids_to_restore) + object_refs = VectorToObjectRefs(object_refs_to_restore) try: with ray.worker._changeproctitle( ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER, @@ -897,7 +910,8 @@ cdef void get_py_stack(c_string* stack_out) nogil: frame.f_code.co_filename, frame.f_code.co_name, frame.f_lineno)) frame = frame.f_back - stack_out[0] = " | ".join(msg_frames).encode("ascii") + stack_out[0] = (ray_constants.CALL_STACK_LINE_DELIMITER + .join(msg_frames).encode("ascii")) cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str): cdef shared_ptr[CBuffer] empty_metadata @@ -1338,13 +1352,13 @@ cdef class CoreWorker: unordered_map[c_string, double] c_resources CRayFunction ray_function c_vector[unique_ptr[CTaskArg]] args_vector - c_vector[CObjectID] return_ids CPlacementGroupID c_placement_group_id = \ placement_group_id.native() c_string c_serialized_runtime_env unordered_map[c_string, c_string] \ c_override_environment_variables = \ override_environment_variables + c_vector[CObjectReference] return_refs with self.profile_event(b"submit_task"): c_serialized_runtime_env = \ @@ -1357,19 +1371,19 @@ cdef class CoreWorker: # NOTE(edoakes): releasing the GIL while calling this method causes # segfaults. See relevant issue for details: # https://github.com/ray-project/ray/pull/12803 - CCoreWorkerProcess.GetCoreWorker().SubmitTask( + return_refs = CCoreWorkerProcess.GetCoreWorker().SubmitTask( ray_function, args_vector, CTaskOptions( name, num_returns, c_resources, b"", c_serialized_runtime_env, c_override_environment_variables), - &return_ids, max_retries, retry_exceptions, + max_retries, retry_exceptions, c_pair[CPlacementGroupID, int64_t]( c_placement_group_id, placement_group_bundle_index), placement_group_capture_child_tasks, debugger_breakpoint) - return VectorToObjectRefs(return_ids) + return VectorToObjectRefs(return_refs) def create_actor(self, Language language, @@ -1509,7 +1523,7 @@ cdef class CoreWorker: unordered_map[c_string, double] c_resources CRayFunction ray_function c_vector[unique_ptr[CTaskArg]] args_vector - c_vector[CObjectID] return_ids + c_vector[CObjectReference] return_refs with self.profile_event(b"submit_task"): if num_method_cpus > 0: @@ -1521,13 +1535,12 @@ cdef class CoreWorker: # NOTE(edoakes): releasing the GIL while calling this method causes # segfaults. See relevant issue for details: # https://github.com/ray-project/ray/pull/12803 - CCoreWorkerProcess.GetCoreWorker().SubmitActorTask( + return_refs = CCoreWorkerProcess.GetCoreWorker().SubmitActorTask( c_actor_id, ray_function, - args_vector, CTaskOptions(name, num_returns, c_resources), - &return_ids) + args_vector, CTaskOptions(name, num_returns, c_resources)) - return VectorToObjectRefs(return_ids) + return VectorToObjectRefs(return_refs) def kill_actor(self, ActorID actor_id, c_bool no_restart): cdef: diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 1e0a5a319..90e49e622 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -3,6 +3,7 @@ from traceback import format_exception import ray.cloudpickle as pickle from ray.core.generated.common_pb2 import RayException, Language, PYTHON +import ray.ray_constants as ray_constants import colorama import setproctitle @@ -285,11 +286,23 @@ class ObjectLostError(RayError): object_ref_hex: Hex ID of the object. """ - def __init__(self, object_ref_hex): + def __init__(self, object_ref_hex, call_site): self.object_ref_hex = object_ref_hex + self.call_site = call_site.replace( + ray_constants.CALL_STACK_LINE_DELIMITER, "\n ") def __str__(self): - return (f"Object {self.object_ref_hex} is lost due to node failure.") + msg = (f"Object {self.object_ref_hex} cannot be retrieved due to node " + "failure or system error.") + if self.call_site: + msg += (f" The ObjectRef was created at: {self.call_site}") + else: + msg += ( + " To see information about where this ObjectRef was created " + "in Python, set the environment variable " + "RAY_record_ref_creation_sites=1 during `ray start` and " + "`ray.init()`.") + return msg class GetTimeoutError(RayError): diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index c266b2923..33d9ce1a9 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -152,12 +152,15 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: pass cdef cppclass CAddress "ray::rpc::Address": CAddress() - const c_string &SerializeAsString() + const c_string &SerializeAsString() const void ParseFromString(const c_string &serialized) void CopyFrom(const CAddress& address) const c_string &worker_id() cdef cppclass CObjectReference "ray::rpc::ObjectReference": CObjectReference() + CAddress owner_address() const + const c_string &object_id() const + const c_string &call_site() const # This is a workaround for C++ enum class since Cython has no corresponding # representation. @@ -243,7 +246,8 @@ cdef extern from "ray/core_worker/common.h" nogil: cdef cppclass CTaskArgByReference "ray::TaskArgByReference": CTaskArgByReference(const CObjectID &object_id, - const CAddress &owner_address) + const CAddress &owner_address, + const c_string &call_site) cdef cppclass CTaskArgByValue "ray::TaskArgByValue": CTaskArgByValue(const shared_ptr[CRayObject] &data) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 90944251c..2f9b16d05 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -26,6 +26,7 @@ from ray.includes.gcs_client cimport CGcsClient from ray.includes.common cimport ( CAddress, + CObjectReference, CActorCreationOptions, CBuffer, CPlacementGroupCreationOptions, @@ -99,10 +100,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CWorkerType GetWorkerType() CLanguage GetLanguage() - void SubmitTask( + c_vector[CObjectReference] SubmitTask( const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, - const CTaskOptions &options, c_vector[CObjectID] *return_ids, + const CTaskOptions &options, int max_retries, c_bool retry_exceptions, c_pair[CPlacementGroupID, int64_t] placement_options, @@ -120,11 +121,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CPlacementGroupID &placement_group_id) CRayStatus WaitPlacementGroupReady( const CPlacementGroupID &placement_group_id, int timeout_ms) - void SubmitActorTask( + c_vector[CObjectReference] SubmitActorTask( const CActorID &actor_id, const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, - const CTaskOptions &options, - c_vector[CObjectID] *return_ids) + const CTaskOptions &options) CRayStatus KillActor( const CActorID &actor_id, c_bool force_kill, c_bool no_restart) @@ -276,7 +276,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CRayFunction &ray_function, const unordered_map[c_string, double] &resources, const c_vector[shared_ptr[CRayObject]] &args, - const c_vector[CObjectID] &arg_reference_ids, + const c_vector[CObjectReference] &arg_refs, const c_vector[CObjectID] &return_ids, const c_string debugger_breakpoint, c_vector[shared_ptr[CRayObject]] *returns, @@ -288,10 +288,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: (CRayStatus() nogil) check_signals (void() nogil) gc_collect (c_vector[c_string]( - const c_vector[CObjectID] &, - const c_vector[c_string] &) nogil) spill_objects + const c_vector[CObjectReference] &) nogil) spill_objects (int64_t( - const c_vector[CObjectID] &, + const c_vector[CObjectReference] &, const c_vector[c_string] &) nogil) restore_spilled_objects (void( const c_vector[c_string]&, diff --git a/python/ray/includes/object_ref.pxi b/python/ray/includes/object_ref.pxi index 593d5130e..8e86c979c 100644 --- a/python/ray/includes/object_ref.pxi +++ b/python/ray/includes/object_ref.pxi @@ -35,10 +35,11 @@ def _set_future_helper( cdef class ObjectRef(BaseID): - def __init__(self, id): + def __init__(self, id, call_site_data=""): check_id(id) self.data = CObjectID.FromBinary(id) self.in_core_worker = False + self.call_site_data = call_site_data worker = ray.worker.global_worker # TODO(edoakes): We should be able to remove the in_core_worker flag. @@ -84,6 +85,9 @@ cdef class ObjectRef(BaseID): def job_id(self): return self.task_id().job_id() + def call_site(self): + return decode(self.call_site_data) + cdef size_t hash(self): return self.data.Hash() diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 8b3140eb8..642c75085 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -64,3 +64,5 @@ cdef extern from "ray/common/ray_config.h" nogil: c_bool enable_timeline() const uint32_t max_grpc_message_size() const + + c_bool record_ref_creation_sites() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 1c1e9a1d6..a9e0457c9 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -107,3 +107,7 @@ cdef class Config: @staticmethod def max_grpc_message_size(): return RayConfig.instance().max_grpc_message_size() + + @staticmethod + def record_ref_creation_sites(): + return RayConfig.instance().record_ref_creation_sites() diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index abc72a1be..7a1229a83 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -272,3 +272,7 @@ HEALTHCHECK_EXPIRATION_S = os.environ.get("RAY_HEALTHCHECK_EXPIRATION_S", 10) # Should be kept in sync with kSetupWorkerFilename in # src/ray/common/constants.h. SETUP_WORKER_FILENAME = "setup_worker.py" + +# Used to separate lines when formatting the call stack where an ObjectRef was +# created. +CALL_STACK_LINE_DELIMITER = " | " diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 261516b2c..5b53c9ec3 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -28,7 +28,7 @@ class DeserializationError(Exception): pass -def _object_ref_deserializer(binary, owner_address, object_status): +def _object_ref_deserializer(binary, call_site, owner_address, object_status): # NOTE(suquark): This function should be a global function so # cloudpickle can access it directly. Otherwise cloudpickle # has to dump the whole function definition, which is inefficient. @@ -37,7 +37,7 @@ def _object_ref_deserializer(binary, owner_address, object_status): # the core worker to resolve the value. This is to make sure # that the ref count for the ObjectRef is greater than 0 by the # time the core worker resolves the value of the object. - obj_ref = ray.ObjectRef(binary) + obj_ref = ray.ObjectRef(binary, call_site) # TODO(edoakes): we should be able to just capture a reference # to 'self' here instead, but this function is itself pickled @@ -92,7 +92,7 @@ class SerializationContext: obj, owner_address, object_status = ( worker.core_worker.serialize_and_promote_object_ref(obj)) return _object_ref_deserializer, \ - (obj.binary(), owner_address, object_status) + (obj.binary(), obj.call_site(), owner_address, object_status) self._register_cloudpickle_reducer(ray.ObjectRef, object_ref_reducer) serialization_addons.apply(self) @@ -223,7 +223,8 @@ class SerializationContext: elif error_type == ErrorType.Value("TASK_CANCELLED"): return TaskCancelledError() elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"): - return ObjectLostError(object_ref.hex()) + return ObjectLostError(object_ref.hex(), + object_ref.call_site()) elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"): return RuntimeEnvSetupError() else: diff --git a/python/ray/tests/test_failure_4.py b/python/ray/tests/test_failure_4.py index e6f137ea1..299604e09 100644 --- a/python/ray/tests/test_failure_4.py +++ b/python/ray/tests/test_failure_4.py @@ -156,5 +156,72 @@ if __name__ == "__main__": assert x == 42 +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") +@pytest.mark.parametrize("debug_enabled", [False, True]) +def test_object_lost_error(ray_start_cluster, debug_enabled): + cluster = ray_start_cluster + system_config = { + "num_heartbeats_timeout": 3, + } + if debug_enabled: + system_config["record_ref_creation_sites"] = True + cluster.add_node(num_cpus=0, _system_config=system_config) + ray.init(address=cluster.address) + worker_node = cluster.add_node(num_cpus=1) + + @ray.remote(num_cpus=1) + class Actor: + def __init__(self): + return + + def foo(self): + return "x" * 1000_000 + + def done(self): + return + + @ray.remote + def borrower(ref): + ray.get(ref[0]) + + @ray.remote + def task_arg(ref): + return + + a = Actor.remote() + x = a.foo.remote() + ray.get(a.done.remote()) + cluster.remove_node(worker_node, allow_graceful=False) + cluster.add_node(num_cpus=1) + + y = borrower.remote([x]) + + try: + ray.get(x) + assert False + except ray.exceptions.ObjectLostError as e: + error = str(e) + print(error) + assert ("actor call" in error) == debug_enabled + assert ("test_object_lost_error" in error) == debug_enabled + + try: + ray.get(y) + assert False + except ray.exceptions.RayTaskError as e: + error = str(e) + print(error) + assert ("actor call" in error) == debug_enabled + assert ("test_object_lost_error" in error) == debug_enabled + + try: + ray.get(task_arg.remote(x)) + except ray.exceptions.RayTaskError as e: + error = str(e) + print(error) + assert ("actor call" in error) == debug_enabled + assert ("test_object_lost_error" in error) == debug_enabled + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 04c33e0f2..c011829c2 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -34,19 +34,22 @@ class TaskArgByReference : public TaskArg { /// /// \param[in] object_id Id of the argument. /// \return The task argument. - TaskArgByReference(const ObjectID &object_id, const rpc::Address &owner_address) - : id_(object_id), owner_address_(owner_address) {} + TaskArgByReference(const ObjectID &object_id, const rpc::Address &owner_address, + const std::string &call_site) + : id_(object_id), owner_address_(owner_address), call_site_(call_site) {} void ToProto(rpc::TaskArg *arg_proto) const { auto ref = arg_proto->mutable_object_ref(); ref->set_object_id(id_.Binary()); ref->mutable_owner_address()->CopyFrom(owner_address_); + ref->set_call_site(call_site_); } private: /// Id of the argument if passed by reference, otherwise nullptr. const ObjectID id_; const rpc::Address owner_address_; + const std::string call_site_; }; class TaskArgByValue : public TaskArg { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 81d4edba5..d44b546b9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -39,9 +39,8 @@ void BuildCommonTaskSpec( const std::vector> &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, - std::vector *return_ids, const BundleID &bundle_id, - bool placement_group_capture_child_tasks, const std::string debugger_breakpoint, - const std::string &serialized_runtime_env, + const BundleID &bundle_id, bool placement_group_capture_child_tasks, + const std::string debugger_breakpoint, const std::string &serialized_runtime_env, const std::unordered_map &override_environment_variables, const std::string &concurrency_group_name = "") { // Build common task spec. @@ -55,12 +54,6 @@ void BuildCommonTaskSpec( for (const auto &arg : args) { builder.AddArg(*arg); } - - // Compute return IDs. - return_ids->resize(num_returns); - for (size_t i = 0; i < num_returns; i++) { - (*return_ids)[i] = ObjectID::FromIndex(task_id, i + 1); - } } JobID GetProcessJobID(const CoreWorkerOptions &options) { @@ -1643,13 +1636,11 @@ std::unordered_map AddPlacementGroupConstraint( return resources; } -void CoreWorker::SubmitTask(const RayFunction &function, - const std::vector> &args, - const TaskOptions &task_options, - std::vector *return_ids, int max_retries, - bool retry_exceptions, BundleID placement_options, - bool placement_group_capture_child_tasks, - const std::string &debugger_breakpoint) { +std::vector CoreWorker::SubmitTask( + const RayFunction &function, const std::vector> &args, + const TaskOptions &task_options, int max_retries, bool retry_exceptions, + BundleID placement_options, bool placement_group_capture_child_tasks, + const std::string &debugger_breakpoint) { TaskSpecBuilder builder; const auto next_task_index = worker_context_.GetNextTaskIndex(); const auto task_id = @@ -1671,27 +1662,28 @@ void CoreWorker::SubmitTask(const RayFunction &function, override_environment_variables.insert(current_override_environment_variables.begin(), current_override_environment_variables.end()); // TODO(ekl) offload task building onto a thread pool for performance - BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, task_name, - worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), - rpc_address_, function, args, task_options.num_returns, - constrained_resources, required_resources, return_ids, - placement_options, placement_group_capture_child_tasks, - debugger_breakpoint, task_options.serialized_runtime_env, - override_environment_variables); + BuildCommonTaskSpec( + builder, worker_context_.GetCurrentJobID(), task_id, task_name, + worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, + function, args, task_options.num_returns, constrained_resources, required_resources, + placement_options, placement_group_capture_child_tasks, debugger_breakpoint, + task_options.serialized_runtime_env, override_environment_variables); builder.SetNormalTaskSpec(max_retries, retry_exceptions); TaskSpecification task_spec = builder.Build(); RAY_LOG(DEBUG) << "Submit task " << task_spec.DebugString(); + std::vector returned_refs; if (options_.is_local_mode) { - ExecuteTaskLocalMode(task_spec); + returned_refs = ExecuteTaskLocalMode(task_spec); } else { - task_manager_->AddPendingTask(task_spec.CallerAddress(), task_spec, CurrentCallSite(), - max_retries); + returned_refs = task_manager_->AddPendingTask(task_spec.CallerAddress(), task_spec, + CurrentCallSite(), max_retries); io_service_.post( [this, task_spec]() { RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec)); }, "CoreWorker.SubmitTask"); } + return returned_refs; } Status CoreWorker::CreateActor(const RayFunction &function, @@ -1717,7 +1709,6 @@ Status CoreWorker::CreateActor(const RayFunction &function, actor_creation_options.override_environment_variables; override_environment_variables.insert(current_override_environment_variables.begin(), current_override_environment_variables.end()); - std::vector return_ids; TaskSpecBuilder builder; auto new_placement_resources = AddPlacementGroupConstraint(actor_creation_options.placement_resources, @@ -1731,17 +1722,18 @@ Status CoreWorker::CreateActor(const RayFunction &function, actor_name.empty() ? function.GetFunctionDescriptor()->DefaultTaskName() : actor_name + ":" + function.GetFunctionDescriptor()->CallString(); - BuildCommonTaskSpec( - builder, job_id, actor_creation_task_id, task_name, - worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, - function, args, 1, new_resource, new_placement_resources, &return_ids, - actor_creation_options.placement_options, - actor_creation_options.placement_group_capture_child_tasks, - "", /* debugger_breakpoint */ - actor_creation_options.serialized_runtime_env, override_environment_variables); + BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, task_name, + worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), + rpc_address_, function, args, 1, new_resource, + new_placement_resources, actor_creation_options.placement_options, + actor_creation_options.placement_group_capture_child_tasks, + "", /* debugger_breakpoint */ + actor_creation_options.serialized_runtime_env, + override_environment_variables); auto actor_handle = std::make_unique( - actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0], + actor_id, GetCallerId(), rpc_address_, job_id, + /*actor_cursor=*/ObjectID::FromIndex(actor_creation_task_id, 1), function.GetLanguage(), function.GetFunctionDescriptor(), extension_data, actor_creation_options.max_task_retries); std::string serialized_actor_handle; @@ -1871,10 +1863,9 @@ Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_gro return status_future.get(); } -void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function, - const std::vector> &args, - const TaskOptions &task_options, - std::vector *return_ids) { +std::vector CoreWorker::SubmitActorTask( + const ActorID &actor_id, const RayFunction &function, + const std::vector> &args, const TaskOptions &task_options) { auto actor_handle = actor_manager_->GetActorHandle(actor_id); // Add one for actor cursor object id for tasks. @@ -1891,36 +1882,37 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun ? function.GetFunctionDescriptor()->DefaultTaskName() : task_options.name; const std::unordered_map override_environment_variables = {}; - BuildCommonTaskSpec( - builder, actor_handle->CreationJobID(), actor_task_id, task_name, - worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, - function, args, num_returns, task_options.resources, required_resources, return_ids, - std::make_pair(PlacementGroupID::Nil(), -1), - true, /* placement_group_capture_child_tasks */ - "", /* debugger_breakpoint */ - "{}", /* serialized_runtime_env */ - override_environment_variables, task_options.concurrency_group_name); + BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, task_name, + worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), + rpc_address_, function, args, num_returns, task_options.resources, + required_resources, std::make_pair(PlacementGroupID::Nil(), -1), + true, /* placement_group_capture_child_tasks */ + "", /* debugger_breakpoint */ + "{}", /* serialized_runtime_env */ + override_environment_variables, + task_options.concurrency_group_name); // NOTE: placement_group_capture_child_tasks and runtime_env will // be ignored in the actor because we should always follow the actor's option. - const ObjectID new_cursor = return_ids->back(); + // TODO(swang): Do we actually need to set this ObjectID? + const ObjectID new_cursor = ObjectID::FromIndex(actor_task_id, num_returns); actor_handle->SetActorTaskSpec(builder, new_cursor); - // Remove cursor from return ids. - return_ids->pop_back(); // Submit task. TaskSpecification task_spec = builder.Build(); + std::vector returned_refs; if (options_.is_local_mode) { - ExecuteTaskLocalMode(task_spec, actor_id); + returned_refs = ExecuteTaskLocalMode(task_spec, actor_id); } else { - task_manager_->AddPendingTask(rpc_address_, task_spec, CurrentCallSite(), - actor_handle->MaxTaskRetries()); + returned_refs = task_manager_->AddPendingTask( + rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); io_service_.post( [this, task_spec]() { RAY_UNUSED(direct_actor_submitter_->SubmitTask(task_spec)); }, "CoreWorker.SubmitActorTask"); } + return returned_refs; } Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill, @@ -2172,14 +2164,13 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()}; std::vector> args; - std::vector arg_reference_ids; + std::vector arg_refs; // This includes all IDs that were passed by reference and any IDs that were // inlined in the task spec. These references will be pinned during the task // execution and unpinned once the task completes. We will notify the caller // about any IDs that we are still borrowing by the time the task completes. std::vector borrowed_ids; - RAY_CHECK_OK( - GetAndPinArgsForExecutor(task_spec, &args, &arg_reference_ids, &borrowed_ids)); + RAY_CHECK_OK(GetAndPinArgsForExecutor(task_spec, &args, &arg_refs, &borrowed_ids)); std::vector return_ids; for (size_t i = 0; i < task_spec.NumReturns(); i++) { @@ -2216,9 +2207,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, status = options_.task_execution_callback( task_type, task_spec.GetName(), func, - task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids, - return_ids, task_spec.GetDebuggerBreakpoint(), return_objects, - creation_task_exception_pb_bytes, is_application_level_error); + task_spec.GetRequiredResources().GetResourceMap(), args, arg_refs, return_ids, + task_spec.GetDebuggerBreakpoint(), return_objects, creation_task_exception_pb_bytes, + is_application_level_error); // Get the reference counts for any IDs that we borrowed during this task and // return them to the caller. This will notify the caller of any IDs that we @@ -2292,18 +2283,28 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id, return status; } -void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, - const ActorID &actor_id) { +std::vector CoreWorker::ExecuteTaskLocalMode( + const TaskSpecification &task_spec, const ActorID &actor_id) { auto resource_ids = std::make_shared(); auto return_objects = std::vector>(); auto borrowed_refs = ReferenceCounter::ReferenceTableProto(); - if (!task_spec.IsActorCreationTask()) { - for (size_t i = 0; i < task_spec.NumReturns(); i++) { + + std::vector returned_refs; + size_t num_returns = task_spec.NumReturns(); + if (task_spec.IsActorTask()) { + num_returns--; + } + for (size_t i = 0; i < num_returns; i++) { + if (!task_spec.IsActorCreationTask()) { reference_counter_->AddOwnedObject(task_spec.ReturnId(i), /*inner_ids=*/{}, rpc_address_, CurrentCallSite(), -1, /*is_reconstructable=*/false); } + rpc::ObjectReference ref; + ref.set_object_id(task_spec.ReturnId(i).Binary()); + ref.mutable_owner_address()->CopyFrom(task_spec.CallerAddress()); + returned_refs.push_back(std::move(ref)); } auto old_id = GetActorId(); SetActorId(actor_id); @@ -2311,15 +2312,16 @@ void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, RAY_UNUSED(ExecuteTask(task_spec, resource_ids, &return_objects, &borrowed_refs, &is_application_level_error)); SetActorId(old_id); + return returned_refs; } Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task, std::vector> *args, - std::vector *arg_reference_ids, + std::vector *arg_refs, std::vector *borrowed_ids) { auto num_args = task.NumArgs(); args->resize(num_args); - arg_reference_ids->resize(num_args); + arg_refs->resize(num_args); absl::flat_hash_set by_ref_ids; absl::flat_hash_map> by_ref_indices; @@ -2332,7 +2334,8 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task, RAY_UNUSED(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), task.ArgId(i))); } - const auto &arg_id = task.ArgId(i); + const auto &arg_ref = task.ArgRef(i); + const auto arg_id = ObjectID::FromBinary(arg_ref.object_id()); by_ref_ids.insert(arg_id); auto it = by_ref_indices.find(arg_id); if (it == by_ref_indices.end()) { @@ -2340,7 +2343,7 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task, } else { it->second.push_back(i); } - arg_reference_ids->at(i) = arg_id; + arg_refs->at(i) = arg_ref; // Pin all args passed by reference for the duration of the task. This // ensures that when the task completes, we can retrieve metadata about // any borrowed ObjectIDs that were serialized in the argument's value. @@ -2369,7 +2372,7 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task, bool copy_data = options_.language == Language::PYTHON; args->at(i) = std::make_shared(data, metadata, task.ArgInlinedRefs(i), copy_data); - arg_reference_ids->at(i) = ObjectID::Nil(); + arg_refs->at(i).set_object_id(ObjectID::Nil().Binary()); // The task borrows all ObjectIDs that were serialized in the inlined // arguments. The task will receive references to these IDs, so it is // possible for the task to continue borrowing these arguments by the @@ -2940,18 +2943,9 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request, rpc::SpillObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { if (options_.spill_objects != nullptr) { - std::vector object_ids_to_spill; - object_ids_to_spill.reserve(request.object_ids_to_spill_size()); - for (const auto &id_binary : request.object_ids_to_spill()) { - object_ids_to_spill.push_back(ObjectID::FromBinary(id_binary)); - } - std::vector owner_addresses; - owner_addresses.reserve(request.owner_addresses_size()); - for (const auto &owner_address : request.owner_addresses()) { - owner_addresses.push_back(owner_address.SerializeAsString()); - } - std::vector object_urls = - options_.spill_objects(object_ids_to_spill, owner_addresses); + auto object_refs = + VectorFromProtobuf(request.object_refs_to_spill()); + std::vector object_urls = options_.spill_objects(object_refs); for (size_t i = 0; i < object_urls.size(); i++) { reply->add_spilled_objects_url(std::move(object_urls[i])); } @@ -2985,10 +2979,12 @@ void CoreWorker::HandleRestoreSpilledObjects( rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { if (options_.restore_spilled_objects != nullptr) { // Get a list of object ids. - std::vector object_ids_to_restore; - object_ids_to_restore.reserve(request.object_ids_to_restore_size()); + std::vector object_refs_to_restore; + object_refs_to_restore.reserve(request.object_ids_to_restore_size()); for (const auto &id_binary : request.object_ids_to_restore()) { - object_ids_to_restore.push_back(ObjectID::FromBinary(id_binary)); + rpc::ObjectReference ref; + ref.set_object_id(id_binary); + object_refs_to_restore.push_back(std::move(ref)); } // Get a list of spilled_object_urls. std::vector spilled_objects_url; @@ -2997,7 +2993,7 @@ void CoreWorker::HandleRestoreSpilledObjects( spilled_objects_url.push_back(url); } auto total = - options_.restore_spilled_objects(object_ids_to_restore, spilled_objects_url); + options_.restore_spilled_objects(object_refs_to_restore, spilled_objects_url); reply->set_bytes_restored_total(total); send_reply_callback(Status::OK(), nullptr, nullptr); } else { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 11137265f..bdfdeaf4f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -67,7 +67,7 @@ struct CoreWorkerOptions { TaskType task_type, const std::string task_name, const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, - const std::vector &arg_reference_ids, + const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, std::vector> *results, std::shared_ptr &creation_task_exception_pb_bytes, @@ -152,11 +152,11 @@ struct CoreWorkerOptions { /// be held up in garbage objects. std::function gc_collect; /// Application-language callback to spill objects to external storage. - std::function(const std::vector &, - const std::vector &)> + std::function(const std::vector &)> spill_objects; /// Application-language callback to restore objects from external storage. - std::function &, const std::vector &)> + std::function &, + const std::vector &)> restore_spilled_objects; /// Application-language callback to delete objects from external storage. std::function &, rpc::WorkerType)> @@ -703,19 +703,18 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] function The remote function to execute. /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. - /// \param[out] return_ids Ids of the return objects. /// \param[in] max_retires max number of retry when the task fails. /// \param[in] placement_options placement group options. /// \param[in] placement_group_capture_child_tasks whether or not the submitted task /// \param[in] debugger_breakpoint breakpoint to drop into for the debugger after this /// task starts executing, or "" if we do not want to drop into the debugger. /// should capture parent's placement group implicilty. - void SubmitTask(const RayFunction &function, - const std::vector> &args, - const TaskOptions &task_options, std::vector *return_ids, - int max_retries, bool retry_exceptions, BundleID placement_options, - bool placement_group_capture_child_tasks, - const std::string &debugger_breakpoint); + /// \return ObjectRefs returned by this task. + std::vector SubmitTask( + const RayFunction &function, const std::vector> &args, + const TaskOptions &task_options, int max_retries, bool retry_exceptions, + BundleID placement_options, bool placement_group_capture_child_tasks, + const std::string &debugger_breakpoint); /// Create an actor. /// @@ -770,14 +769,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] function The remote function to execute. /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. - /// \param[out] return_ids Ids of the return objects. - /// \return Status error if the task is invalid or if the task submission - /// failed. Tasks can be invalid for direct actor calls because not all tasks - /// are currently supported. - void SubmitActorTask(const ActorID &actor_id, const RayFunction &function, - const std::vector> &args, - const TaskOptions &task_options, - std::vector *return_ids); + /// \return ObjectRefs returned by this task. + std::vector SubmitActorTask( + const ActorID &actor_id, const RayFunction &function, + const std::vector> &args, const TaskOptions &task_options); /// Tell an actor to exit immediately, without completing outstanding work. /// @@ -1129,8 +1124,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Execute a local mode task (runs normal ExecuteTask) /// /// \param spec[in] task_spec Task specification. - void ExecuteTaskLocalMode(const TaskSpecification &task_spec, - const ActorID &actor_id = ActorID::Nil()); + std::vector ExecuteTaskLocalMode( + const TaskSpecification &task_spec, const ActorID &actor_id = ActorID::Nil()); /// KillActor API for a local mode. Status KillActorLocalMode(const ActorID &actor_id); @@ -1172,7 +1167,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \return Error if the values could not be retrieved. Status GetAndPinArgsForExecutor(const TaskSpecification &task, std::vector> *args, - std::vector *arg_reference_ids, + std::vector *arg_refs, std::vector *pinned_ids); /// Process a subscribe message for wait for object eviction. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index 21aea5cc7..0ccb37bf5 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -97,7 +97,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( [](TaskType task_type, const std::string task_name, const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, - const std::vector &arg_reference_ids, + const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, std::vector> *results, std::shared_ptr &creation_task_exception_pb, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index d3c6b9882..dd05bc76a 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -74,7 +74,8 @@ inline std::vector> ToTaskArgs(JNIEnv *env, jobject arg RAY_CHECK(java_owner_address); auto owner_address = JavaProtobufObjectToNativeProtobufObject( env, java_owner_address); - return std::unique_ptr(new TaskArgByReference(id, owner_address)); + return std::unique_ptr( + new TaskArgByReference(id, owner_address, /*call_site=*/"")); } auto java_value = static_cast(env->GetObjectField(arg, java_function_arg_value)); @@ -303,15 +304,18 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSub auto task_options = ToTaskOptions(env, numReturns, callOptions); auto placement_group_options = ToPlacementGroupOptions(env, callOptions); - std::vector return_ids; // TODO (kfstorm): Allow setting `max_retries` via `CallOptions`. - CoreWorkerProcess::GetCoreWorker().SubmitTask( - ray_function, task_args, task_options, &return_ids, + auto return_refs = CoreWorkerProcess::GetCoreWorker().SubmitTask( + ray_function, task_args, task_options, /*max_retries=*/0, /*retry_exceptions=*/false, /*placement_options=*/placement_group_options, /*placement_group_capture_child_tasks=*/true, /*debugger_breakpoint*/ ""); + std::vector return_ids; + for (const auto &ref : return_refs) { + return_ids.push_back(ObjectID::FromBinary(ref.object_id())); + } // This is to avoid creating an empty java list and boost performance. if (return_ids.empty()) { @@ -350,9 +354,12 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask( RAY_CHECK(callOptions != nullptr); auto task_options = ToTaskOptions(env, numReturns, callOptions); + auto return_refs = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + actor_id, ray_function, task_args, task_options); std::vector return_ids; - CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, ray_function, task_args, - task_options, &return_ids); + for (const auto &ref : return_refs) { + return_ids.push_back(ObjectID::FromBinary(ref.object_id())); + } // This is to avoid creating an empty java list and boost performance. if (return_ids.empty()) { diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 72acdc87f..1bcf77003 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -30,9 +30,9 @@ const int64_t kTaskFailureThrottlingThreshold = 50; // Throttle task failure logs to once this interval. const int64_t kTaskFailureLoggingFrequencyMillis = 5000; -void TaskManager::AddPendingTask(const rpc::Address &caller_address, - const TaskSpecification &spec, - const std::string &call_site, int max_retries) { +std::vector TaskManager::AddPendingTask( + const rpc::Address &caller_address, const TaskSpecification &spec, + const std::string &call_site, int max_retries) { RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries << " retries"; @@ -62,8 +62,9 @@ void TaskManager::AddPendingTask(const rpc::Address &caller_address, if (spec.IsActorTask()) { num_returns--; } - if (!spec.IsActorCreationTask()) { - for (size_t i = 0; i < num_returns; i++) { + std::vector returned_refs; + for (size_t i = 0; i < num_returns; i++) { + if (!spec.IsActorCreationTask()) { // We pass an empty vector for inner IDs because we do not know the return // value of the task yet. If the task returns an ID(s), the worker will // publish the WaitForRefRemoved message that we are now a borrower for @@ -73,6 +74,12 @@ void TaskManager::AddPendingTask(const rpc::Address &caller_address, /*inner_ids=*/{}, caller_address, call_site, -1, /*is_reconstructable=*/true); } + + rpc::ObjectReference ref; + ref.set_object_id(spec.ReturnId(i).Binary()); + ref.mutable_owner_address()->CopyFrom(caller_address); + ref.set_call_site(call_site); + returned_refs.push_back(std::move(ref)); } { @@ -82,6 +89,8 @@ void TaskManager::AddPendingTask(const rpc::Address &caller_address, .second); num_pending_tasks_++; } + + return returned_refs; } Status TaskManager::ResubmitTask(const TaskID &task_id, diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 986900352..f337d82bc 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -94,9 +94,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// \param[in] spec The spec of the pending task. /// \param[in] max_retries Number of times this task may be retried /// on failure. - /// \return Void. - void AddPendingTask(const rpc::Address &caller_address, const TaskSpecification &spec, - const std::string &call_site, int max_retries = 0); + /// \return ObjectRefs returned by this task. + std::vector AddPendingTask(const rpc::Address &caller_address, + const TaskSpecification &spec, + const std::string &call_site, + int max_retries = 0); /// Resubmit a task that has completed execution before. This is used to /// reconstruct objects stored in Plasma that were lost. diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index c6edc7176..29d95cb8f 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -26,6 +26,7 @@ #include "hiredis/async.h" #include "hiredis/hiredis.h" #include "ray/common/buffer.h" +#include "ray/common/common_protocol.h" #include "ray/common/ray_object.h" #include "ray/common/task/task_spec.h" #include "ray/common/test_util.h" @@ -204,12 +205,11 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id, std::unordered_map &resources) { std::vector> args; TaskOptions options{"", 1, resources}; - std::vector return_ids; RayFunction func{Language::PYTHON, FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")}; - CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options, - &return_ids); + auto return_ids = ObjectRefsToIds( + CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options)); std::vector> results; RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results)); @@ -242,16 +242,17 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res std::vector> args; args.emplace_back(new TaskArgByValue(std::make_shared( buffer1, nullptr, std::vector()))); - args.emplace_back(new TaskArgByReference(object_id, driver.GetRpcAddress())); + args.emplace_back( + new TaskArgByReference(object_id, driver.GetRpcAddress(), /*call_site=*/"")); RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); TaskOptions options; - std::vector return_ids; - driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0, - /*retry_exceptions=*/false, - std::make_pair(PlacementGroupID::Nil(), -1), true, - /*debugger_breakpoint=*/""); + auto return_refs = driver.SubmitTask( + func, args, options, /*max_retries=*/0, + /*retry_exceptions=*/false, std::make_pair(PlacementGroupID::Nil(), -1), true, + /*debugger_breakpoint=*/""); + auto return_ids = ObjectRefsToIds(return_refs); ASSERT_EQ(return_ids.size(), 1); @@ -289,11 +290,11 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso buffer2, nullptr, std::vector()))); TaskOptions options{"", 1, resources}; - std::vector return_ids; RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - driver.SubmitActorTask(actor_id, func, args, options, &return_ids); + auto return_ids = + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); ASSERT_EQ(return_ids.size(), 1); std::vector> results; @@ -327,15 +328,16 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso // Create arguments with PassByRef and PassByValue. std::vector> args; - args.emplace_back(new TaskArgByReference(object_id, driver.GetRpcAddress())); + args.emplace_back( + new TaskArgByReference(object_id, driver.GetRpcAddress(), /*call_site=*/"")); args.emplace_back(new TaskArgByValue(std::make_shared( buffer2, nullptr, std::vector()))); TaskOptions options{"", 1, resources}; - std::vector return_ids; RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - driver.SubmitActorTask(actor_id, func, args, options, &return_ids); + auto return_ids = + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); ASSERT_EQ(return_ids.size(), 1); @@ -394,11 +396,11 @@ void CoreWorkerTest::TestActorRestart( buffer1, nullptr, std::vector()))); TaskOptions options{"", 1, resources}; - std::vector return_ids; RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - driver.SubmitActorTask(actor_id, func, args, options, &return_ids); + auto return_ids = + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); ASSERT_EQ(return_ids.size(), 1); // Verify if it's expected data. std::vector> results; @@ -437,11 +439,11 @@ void CoreWorkerTest::TestActorFailure( buffer1, nullptr, std::vector()))); TaskOptions options{"", 1, resources}; - std::vector return_ids; RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - driver.SubmitActorTask(actor_id, func, args, options, &return_ids); + auto return_ids = + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); ASSERT_EQ(return_ids.size(), 1); all_results.emplace_back(std::make_pair(return_ids[0], buffer1)); @@ -523,7 +525,6 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { rpc::Address address; for (int i = 0; i < num_tasks; i++) { TaskOptions options{"", 1, resources}; - std::vector return_ids; auto num_returns = options.num_returns; TaskSpecBuilder builder; @@ -571,11 +572,11 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { buffer, nullptr, std::vector()))); TaskOptions options{"", 1, resources}; - std::vector return_ids; RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - driver.SubmitActorTask(actor_id, func, args, options, &return_ids); + auto return_ids = + ObjectRefsToIds(driver.SubmitActorTask(actor_id, func, args, options)); ASSERT_EQ(return_ids.size(), 1); object_ids.emplace_back(return_ids[0]); } @@ -845,9 +846,6 @@ TEST_F(SingleNodeTest, TestCancelTasks) { FunctionDescriptorBuilder::BuildPython("WhileTrueLoop", "", "", "")); RayFunction func2(Language::PYTHON, FunctionDescriptorBuilder::BuildPython("WhileTrueLoop", "", "", "")); - // Return IDs for the two functions that implement while(true) loops. - std::vector return_ids1; - std::vector return_ids2; // Create default args and options needed to submit the tasks that encapsulate func1 and // func2. @@ -855,17 +853,17 @@ TEST_F(SingleNodeTest, TestCancelTasks) { TaskOptions options; // Submit func1. The function should start looping forever. - driver.SubmitTask(func1, args, options, &return_ids1, /*max_retries=*/0, - /*retry_exceptions=*/false, - std::make_pair(PlacementGroupID::Nil(), -1), true, - /*debugger_breakpoint=*/""); + auto return_ids1 = ObjectRefsToIds(driver.SubmitTask( + func1, args, options, /*max_retries=*/0, + /*retry_exceptions=*/false, std::make_pair(PlacementGroupID::Nil(), -1), true, + /*debugger_breakpoint=*/"")); ASSERT_EQ(return_ids1.size(), 1); // Submit func2. The function should be queued at the worker indefinitely. - driver.SubmitTask(func2, args, options, &return_ids2, /*max_retries=*/0, - /*retry_exceptions=*/false, - std::make_pair(PlacementGroupID::Nil(), -1), true, - /*debugger_breakpoint=*/""); + auto return_ids2 = ObjectRefsToIds(driver.SubmitTask( + func2, args, options, /*max_retries=*/0, + /*retry_exceptions=*/false, std::make_pair(PlacementGroupID::Nil(), -1), true, + /*debugger_breakpoint=*/"")); ASSERT_EQ(return_ids2.size(), 1); // Cancel func2 by removing it from the worker's queue diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index ea3e0f318..eb959b044 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -60,7 +60,7 @@ class MockWorker { const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, - const std::vector &arg_reference_ids, + const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, std::vector> *results) { diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 93d4108ec..7fe322401 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -254,6 +254,10 @@ message ObjectReference { bytes object_id = 1; // The address of the object's owner. Address owner_address = 2; + // Language call site of the object reference (i.e., file and line number). + // Used to print debugging information if there is an error retrieving the + // object. + string call_site = 3; } message ObjectReferenceCount { diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 87f5739fd..01f0f1604 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -277,11 +277,7 @@ message PlasmaObjectReadyReply { } message SpillObjectsRequest { - // The IDs of objects to be spilled. - repeated bytes object_ids_to_spill = 1; - // The owner addresses of the objects to be spilled. Must be in the same order as - // object_ids_to_spill. - repeated Address owner_addresses = 2; + repeated ObjectReference object_refs_to_spill = 1; } message SpillObjectsReply { diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 8a3cd3105..b46a9be20 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -238,10 +238,11 @@ void LocalObjectManager::SpillObjectsInternal( rpc::SpillObjectsRequest request; for (const auto &object_id : objects_to_spill) { RAY_LOG(DEBUG) << "Sending spill request for object " << object_id; - request.add_object_ids_to_spill(object_id.Binary()); + auto ref = request.add_object_refs_to_spill(); + ref->set_object_id(object_id.Binary()); auto it = objects_pending_spill_.find(object_id); RAY_CHECK(it != objects_pending_spill_.end()); - request.add_owner_addresses()->MergeFrom(it->second.second); + ref->mutable_owner_address()->CopyFrom(it->second.second); } io_worker->rpc_client()->SpillObjects( request, [this, objects_to_spill, callback, io_worker]( diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 0ae7400fd..5c1eae64a 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -129,12 +129,12 @@ RayTask CreateTask(const std::unordered_map &required_resou if (!args.empty()) { for (auto &arg : args) { - spec_builder.AddArg(TaskArgByReference(arg, rpc::Address())); + spec_builder.AddArg(TaskArgByReference(arg, rpc::Address(), "")); } } else { for (int i = 0; i < num_args; i++) { ObjectID put_id = ObjectID::FromIndex(RandomTaskId(), /*index=*/i + 1); - spec_builder.AddArg(TaskArgByReference(put_id, rpc::Address())); + spec_builder.AddArg(TaskArgByReference(put_id, rpc::Address(), "")); } } diff --git a/src/ray/streaming/streaming.cc b/src/ray/streaming/streaming.cc index a66ace90f..2d54c57dc 100644 --- a/src/ray/streaming/streaming.cc +++ b/src/ray/streaming/streaming.cc @@ -21,9 +21,9 @@ namespace streaming { using ray::core::CoreWorkerProcess; using ray::core::TaskOptions; -void SendInternal(const ActorID &peer_actor_id, std::shared_ptr buffer, - RayFunction &function, int return_num, - std::vector &return_ids) { +std::vector SendInternal(const ActorID &peer_actor_id, + std::shared_ptr buffer, + RayFunction &function, int return_num) { std::unordered_map resources; std::string name = function.GetFunctionDescriptor()->DefaultTaskName(); TaskOptions options{name, return_num, resources}; @@ -44,8 +44,8 @@ void SendInternal(const ActorID &peer_actor_id, std::shared_ptr(), true))); std::vector> results; - CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args, - options, &return_ids); + return CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args, + options); } } // namespace streaming } // namespace ray diff --git a/src/ray/streaming/streaming.h b/src/ray/streaming/streaming.h index 768174d3b..46e2bc67a 100644 --- a/src/ray/streaming/streaming.h +++ b/src/ray/streaming/streaming.h @@ -29,9 +29,9 @@ using ray::core::RayFunction; /// \param[in] function the function descriptor of peer's function. /// \param[in] return_num return value number of the call. /// \param[out] return_ids return ids from SubmitActorTask. -void SendInternal(const ActorID &peer_actor_id, std::shared_ptr buffer, - RayFunction &function, int return_num, - std::vector &return_ids); +std::vector SendInternal(const ActorID &peer_actor_id, + std::shared_ptr buffer, + RayFunction &function, int return_num); } // namespace streaming } // namespace ray diff --git a/streaming/src/queue/transport.cc b/streaming/src/queue/transport.cc index 5f1c5524b..bdee92848 100644 --- a/streaming/src/queue/transport.cc +++ b/streaming/src/queue/transport.cc @@ -1,6 +1,7 @@ #include "queue/transport.h" #include "queue/utils.h" +#include "ray/common/common_protocol.h" #include "ray/streaming/streaming.h" namespace ray { @@ -11,16 +12,15 @@ static constexpr int TASK_OPTION_RETURN_NUM_1 = 1; void Transport::Send(std::shared_ptr buffer) { STREAMING_LOG(DEBUG) << "Transport::Send buffer size: " << buffer->Size(); - std::vector return_ids; - ray::streaming::SendInternal(peer_actor_id_, std::move(buffer), async_func_, - TASK_OPTION_RETURN_NUM_0, return_ids); + RAY_UNUSED(ray::streaming::SendInternal(peer_actor_id_, std::move(buffer), async_func_, + TASK_OPTION_RETURN_NUM_0)); } std::shared_ptr Transport::SendForResult( std::shared_ptr buffer, int64_t timeout_ms) { - std::vector return_ids; - ray::streaming::SendInternal(peer_actor_id_, buffer, sync_func_, - TASK_OPTION_RETURN_NUM_1, return_ids); + auto return_refs = ray::streaming::SendInternal(peer_actor_id_, buffer, sync_func_, + TASK_OPTION_RETURN_NUM_1); + auto return_ids = ObjectRefsToIds(return_refs); std::vector> results; Status get_st = diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index c999a6932..c51b1a8a1 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -517,7 +517,7 @@ class StreamingWorker { const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, - const std::vector &arg_reference_ids, + const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, std::vector> *results) { diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index 35febcb2a..01953a818 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -1,6 +1,7 @@ #pragma once #include "hiredis/hiredis.h" +#include "ray/common/common_protocol.h" #include "ray/common/test_util.h" #include "ray/util/filesystem.h" @@ -82,11 +83,10 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { msg.ToBytes(), nullptr, std::vector(), true))); std::unordered_map resources; TaskOptions options{"", 0, resources}; - std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")}; - driver.SubmitActorTask(self_actor_id, func, args, options, &return_ids); + RAY_UNUSED(driver.SubmitActorTask(self_actor_id, func, args, options)); } void SubmitTestToActor(ActorID &actor_id, const std::string test) { @@ -98,11 +98,10 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { buffer, nullptr, std::vector(), true))); std::unordered_map resources; TaskOptions options("", 0, resources); - std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", test, "execute_test", "")}; - driver.SubmitActorTask(actor_id, func, args, options, &return_ids); + RAY_UNUSED(driver.SubmitActorTask(actor_id, func, args, options)); } bool CheckCurTest(ActorID &actor_id, const std::string test_name) { @@ -114,11 +113,11 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { buffer, nullptr, std::vector(), true))); std::unordered_map resources; TaskOptions options{"", 1, resources}; - std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", "", "check_current_test_status", "")}; - driver.SubmitActorTask(actor_id, func, args, options, &return_ids); + auto return_refs = driver.SubmitActorTask(actor_id, func, args, options); + auto return_ids = ObjectRefsToIds(return_refs); std::vector wait_results; std::vector> results;