diff --git a/cpp/src/ray/runtime/object/native_object_store.cc b/cpp/src/ray/runtime/object/native_object_store.cc index 9bdd7ff54..d9326feb2 100644 --- a/cpp/src/ray/runtime/object/native_object_store.cc +++ b/cpp/src/ray/runtime/object/native_object_store.cc @@ -74,7 +74,11 @@ void NativeObjectStore::CheckException(const std::string &meta_str, throw RayWorkerException(std::move(data_str)); } else if (meta_str == std::to_string(ray::rpc::ErrorType::ACTOR_DIED)) { throw RayActorException(std::move(data_str)); - } else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE)) { + } else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE) || + meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_LOST) || + meta_str == std::to_string(ray::rpc::ErrorType::OWNER_DIED) || + meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_DELETED)) { + // TODO: Differentiate object errors. throw UnreconstructableException(std::move(data_str)); } else if (meta_str == std::to_string(ray::rpc::ErrorType::TASK_EXECUTION_EXCEPTION)) { throw RayTaskException(std::move(data_str)); diff --git a/doc/source/troubleshooting.rst b/doc/source/troubleshooting.rst index 0f4146125..d9019a36b 100644 --- a/doc/source/troubleshooting.rst +++ b/doc/source/troubleshooting.rst @@ -235,6 +235,35 @@ as well as some known problems. If you encounter other problems, please .. _`let us know`: https://github.com/ray-project/ray/issues +Understanding `ObjectLostErrors` +-------------------------------- +Ray throws an ``ObjectLostError`` to the application when an object cannot be +retrieved due to application or system error. This can occur during a +``ray.get()`` call or when fetching a task's arguments, and can happen for a +number of reasons. Here is a guide to understanding the root cause for +different error types: + +- ``ObjectLostError``: The object was successfully created, but then all copies + were lost due to node failure. +- ``OwnerDiedError``: The owner of an object, i.e., the Python worker that + first created the ``ObjectRef`` via ``.remote()`` or ``ray.put()``, has died. + The owner stores critical object metadata and an object cannot be retrieved + if this process is lost. +- ``ObjectReconstructionFailedError``: Should only be thrown when `lineage + reconstruction`_ is enabled. This error is thrown if an object, or another + object that this object depends on, cannot be reconstructed because the + maximum number of task retries has been exceeded. By default, a non-actor + task can be retried up to 3 times and an actor task cannot be retried. + This can be overridden with the ``max_retries`` parameter for remote + functions and the ``max_task_retries`` parameter for actors. +- ``ReferenceCountingAssertionError``: The object has already been deleted, + so it cannot be retrieved. Ray implements automatic memory management through + distributed reference counting, so this error should not happen in general. + However, there is a `known edge case`_ that can produce this error. + +.. _`lineage reconstruction`: https://docs.ray.io/en/master/fault-tolerance.html +.. _`known edge case`: https://github.com/ray-project/ray/issues/18456 + Crashes ------- diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java index 66f47c797..fe1330044 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java @@ -33,6 +33,12 @@ public class ObjectSerializer { String.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes(); private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META = String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes(); + private static final byte[] OBJECT_LOST_META = + String.valueOf(ErrorType.OBJECT_LOST.getNumber()).getBytes(); + private static final byte[] OWNER_DIED_META = + String.valueOf(ErrorType.OWNER_DIED.getNumber()).getBytes(); + private static final byte[] OBJECT_DELETED_META = + String.valueOf(ErrorType.OBJECT_DELETED.getNumber()).getBytes(); private static final byte[] TASK_EXECUTION_EXCEPTION_META = String.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes(); @@ -77,6 +83,12 @@ public class ObjectSerializer { return Serializer.decode(data, objectType); } else if (Bytes.indexOf(meta, WORKER_EXCEPTION_META) == 0) { return new RayWorkerException(); + } else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0 + || Bytes.indexOf(meta, OBJECT_LOST_META) == 0 + || Bytes.indexOf(meta, OWNER_DIED_META) == 0 + || Bytes.indexOf(meta, OBJECT_DELETED_META) == 0) { + // TODO: Differentiate object errors. + return new UnreconstructableException(objectId); } else if (Bytes.indexOf(meta, ACTOR_EXCEPTION_META) == 0) { ActorId actorId = IdUtil.getActorIdFromObjectId(objectId); if (data != null && data.length > 0) { @@ -86,8 +98,6 @@ public class ObjectSerializer { } } return new RayActorException(actorId); - } else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0) { - return new UnreconstructableException(objectId); } else if (Bytes.indexOf(meta, TASK_EXECUTION_EXCEPTION_META) == 0) { return deserializeRayException(data, objectId); } else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_ACTOR_HANDLE) == 0) { diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 7a59b5f79..27840dc73 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -77,6 +77,7 @@ cdef class BaseID: cdef class ObjectRef(BaseID): cdef: CObjectID data + c_string owner_addr # Flag indicating whether or not this object ref was added to the set # of active IDs in the core worker so we know whether we should clean # it up. diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index de3623bd4..3104a7b87 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -184,8 +184,10 @@ cdef RayObjectsToDataMetadataPairs( cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs): result = [] for i in range(object_refs.size()): - result.append(ObjectRef(object_refs[i].object_id(), - object_refs[i].call_site())) + result.append(ObjectRef( + object_refs[i].object_id(), + object_refs[i].owner_address().SerializeAsString(), + object_refs[i].call_site())) return result diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 90e49e622..58d170654 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -3,7 +3,9 @@ from traceback import format_exception import ray.cloudpickle as pickle from ray.core.generated.common_pb2 import RayException, Language, PYTHON +from ray.core.generated.common_pb2 import Address import ray.ray_constants as ray_constants +from ray._raylet import WorkerID import colorama import setproctitle @@ -171,7 +173,7 @@ class RayTaskError(RayError): # due to the dependency failure. # Print out an user-friendly # message to explain that.. - out.append(" Some of the input arguments for " + out.append(" At least one of the input arguments for " "this task could not be computed:") if i + 1 < len(lines) and lines[i + 1].startswith(" "): # If the next line is indented with 2 space, @@ -280,30 +282,97 @@ class ObjectStoreFullError(RayError): class ObjectLostError(RayError): - """Indicates that an object has been lost due to node failure. + """Indicates that the object is lost from distributed memory, due to + node failure or system error. Attributes: object_ref_hex: Hex ID of the object. """ - def __init__(self, object_ref_hex, call_site): + def __init__(self, object_ref_hex, owner_address, call_site): self.object_ref_hex = object_ref_hex + self.owner_address = owner_address self.call_site = call_site.replace( ray_constants.CALL_STACK_LINE_DELIMITER, "\n ") - def __str__(self): - msg = (f"Object {self.object_ref_hex} cannot be retrieved due to node " - "failure or system error.") + def _base_str(self): + msg = f"Failed to retrieve object {self.object_ref_hex}. " if self.call_site: - msg += (f" The ObjectRef was created at: {self.call_site}") + msg += (f"The ObjectRef was created at: {self.call_site}") else: msg += ( - " To see information about where this ObjectRef was created " + "To see information about where this ObjectRef was created " "in Python, set the environment variable " "RAY_record_ref_creation_sites=1 during `ray start` and " "`ray.init()`.") return msg + def __str__(self): + return self._base_str() + "\n\n" + ( + f"All copies of {self.object_ref_hex} have been lost due to node " + "failure. Check cluster logs (`/tmp/ray/session_latest/logs`) for " + "more information about the failure.") + + +class ReferenceCountingAssertionError(ObjectLostError, AssertionError): + """Indicates that an object has been deleted while there was still a + reference to it. + + Attributes: + object_ref_hex: Hex ID of the object. + """ + + def __str__(self): + return self._base_str() + "\n\n" + ( + "The object has already been deleted by the reference counting " + "protocol. This should not happen.") + + +class OwnerDiedError(ObjectLostError): + """Indicates that the owner of the object has died while there is still a + reference to the object. + + Attributes: + object_ref_hex: Hex ID of the object. + """ + + def __str__(self): + log_loc = "`/tmp/ray/session_latest/logs`" + if self.owner_address: + try: + addr = Address() + addr.ParseFromString(self.owner_address) + ip_addr = addr.ip_address + worker_id = WorkerID(addr.worker_id) + log_loc = ( + f"`/tmp/ray/session_latest/logs/*{worker_id.hex()}*`" + f" at IP address {ip_addr}") + except Exception: + # Catch all to make sure we always at least print the default + # message. + pass + + return self._base_str() + "\n\n" + ( + "The object's owner has exited. This is the Python " + "worker that first created the ObjectRef via `.remote()` or " + "`ray.put()`. " + f"Check cluster logs ({log_loc}) for more " + "information about the Python worker failure.") + + +class ObjectReconstructionFailedError(ObjectLostError): + """Indicates that the owner of the object has died while there is still a + reference to the object. + + Attributes: + object_ref_hex: Hex ID of the object. + """ + + def __str__(self): + return self._base_str() + "\n\n" + ( + "The object cannot be reconstructed " + "because the maximum number of task retries has been exceeded.") + class GetTimeoutError(RayError): """Indicates that a call to the worker timed out.""" @@ -338,6 +407,9 @@ RAY_EXCEPTION_TYPES = [ RayActorError, ObjectStoreFullError, ObjectLostError, + ReferenceCountingAssertionError, + ObjectReconstructionFailedError, + OwnerDiedError, GetTimeoutError, AsyncioActorExit, RuntimeEnvSetupError, diff --git a/python/ray/includes/object_ref.pxi b/python/ray/includes/object_ref.pxi index 8e86c979c..e77a4335e 100644 --- a/python/ray/includes/object_ref.pxi +++ b/python/ray/includes/object_ref.pxi @@ -35,9 +35,10 @@ def _set_future_helper( cdef class ObjectRef(BaseID): - def __init__(self, id, call_site_data=""): + def __init__(self, id, owner_addr="", call_site_data=""): check_id(id) self.data = CObjectID.FromBinary(id) + self.owner_addr = owner_addr self.in_core_worker = False self.call_site_data = call_site_data @@ -85,6 +86,9 @@ cdef class ObjectRef(BaseID): def job_id(self): return self.task_id().job_id() + def owner_address(self): + return self.owner_addr + def call_site(self): return decode(self.call_site_data) diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 5b53c9ec3..bc335e4a8 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -6,10 +6,11 @@ import ray.cloudpickle as pickle from ray import ray_constants import ray._private.utils from ray._private.gcs_utils import ErrorType -from ray.exceptions import (RayError, PlasmaObjectNotAvailable, RayTaskError, - RayActorError, TaskCancelledError, - WorkerCrashedError, ObjectLostError, - RaySystemError, RuntimeEnvSetupError) +from ray.exceptions import ( + RayError, PlasmaObjectNotAvailable, RayTaskError, RayActorError, + TaskCancelledError, WorkerCrashedError, ObjectLostError, + ReferenceCountingAssertionError, OwnerDiedError, + ObjectReconstructionFailedError, RaySystemError, RuntimeEnvSetupError) from ray._raylet import ( split_buffer, unpack_pickle5_buffers, @@ -37,7 +38,7 @@ def _object_ref_deserializer(binary, call_site, owner_address, object_status): # the core worker to resolve the value. This is to make sure # that the ref count for the ObjectRef is greater than 0 by the # time the core worker resolves the value of the object. - obj_ref = ray.ObjectRef(binary, call_site) + obj_ref = ray.ObjectRef(binary, owner_address, call_site) # TODO(edoakes): we should be able to just capture a reference # to 'self' here instead, but this function is itself pickled @@ -222,15 +223,27 @@ class SerializationContext: return RayActorError() elif error_type == ErrorType.Value("TASK_CANCELLED"): return TaskCancelledError() - elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"): + elif error_type == ErrorType.Value("OBJECT_LOST"): return ObjectLostError(object_ref.hex(), + object_ref.owner_address(), object_ref.call_site()) + elif error_type == ErrorType.Value("OBJECT_DELETED"): + return ReferenceCountingAssertionError( + object_ref.hex(), object_ref.owner_address(), + object_ref.call_site()) + elif error_type == ErrorType.Value("OWNER_DIED"): + return OwnerDiedError(object_ref.hex(), + object_ref.owner_address(), + object_ref.call_site()) + elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"): + return ObjectReconstructionFailedError( + object_ref.hex(), object_ref.owner_address(), + object_ref.call_site()) elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"): return RuntimeEnvSetupError() else: - assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \ - "Tried to get object that has been promoted to plasma." - assert False, "Unrecognized error type " + str(error_type) + return RaySystemError("Unrecognized error type " + + str(error_type)) elif data: raise ValueError("non-null object should always have metadata") else: diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index ef8bfb481..b7962ff71 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -16,6 +16,7 @@ import ray.cluster_utils import ray._private.profiling as profiling from ray._private.test_utils import (client_test_enabled, RayTestTimeoutException, SignalActor) +from ray.exceptions import ReferenceCountingAssertionError if client_test_enabled(): from ray.util.client import ray @@ -44,7 +45,7 @@ def test_internal_free(shutdown_only): obj_ref = sampler.sample.remote() ray.get(obj_ref) ray.internal.free(obj_ref) - with pytest.raises(Exception): + with pytest.raises(ReferenceCountingAssertionError): ray.get(obj_ref) # Free deletes big objects from plasma store. @@ -52,7 +53,7 @@ def test_internal_free(shutdown_only): ray.get(big_id) ray.internal.free(big_id) time.sleep(1) # wait for delete RPC to propagate - with pytest.raises(Exception): + with pytest.raises(ReferenceCountingAssertionError): ray.get(big_id) diff --git a/python/ray/tests/test_failure_2.py b/python/ray/tests/test_failure_2.py index 94ef065ec..6bb0986e6 100644 --- a/python/ray/tests/test_failure_2.py +++ b/python/ray/tests/test_failure_2.py @@ -283,7 +283,7 @@ def test_raylet_crash_when_get(ray_start_regular): thread = threading.Thread(target=sleep_to_kill_raylet) thread.start() - with pytest.raises(ray.exceptions.ObjectLostError): + with pytest.raises(ray.exceptions.ReferenceCountingAssertionError): ray.get(object_ref) thread.join() @@ -307,7 +307,7 @@ def test_eviction(ray_start_cluster): # Evict the object. ray.internal.free([obj]) # ray.get throws an exception. - with pytest.raises(ray.exceptions.ObjectLostError): + with pytest.raises(ray.exceptions.ReferenceCountingAssertionError): ray.get(obj) @ray.remote diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index fe20541be..a26d2b288 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -108,10 +108,10 @@ def test_reconstruction_cached_dependency(ray_start_cluster, if reconstruction_enabled: ray.get(dependent_task.remote(obj)) else: - with pytest.raises(ray.exceptions.RayTaskError) as e: + with pytest.raises(ray.exceptions.RayTaskError): ray.get(dependent_task.remote(obj)) - with pytest.raises(ray.exceptions.ObjectLostError): - raise e.as_instanceof_cause() + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(obj) @pytest.mark.skipif( @@ -138,8 +138,6 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): # Node to place the initial object. node_to_kill = cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) - cluster.add_node( - num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) cluster.wait_for_nodes() @ray.remote(max_retries=1 if reconstruction_enabled else 0) @@ -154,18 +152,33 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) cluster.remove_node(node_to_kill, allow_graceful=False) - cluster.add_node( + node_to_kill = cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) if reconstruction_enabled: ray.get(dependent_task.remote(obj)) else: - with pytest.raises(ray.exceptions.RayTaskError) as e: + with pytest.raises(ray.exceptions.RayTaskError): ray.get(dependent_task.remote(obj)) - with pytest.raises(ray.exceptions.ObjectLostError): - raise e.as_instanceof_cause() + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(obj) + + # Losing the object a second time will cause reconstruction to fail because + # we have reached the max task retries. + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) + + if reconstruction_enabled: + with pytest.raises(ray.exceptions.ObjectReconstructionFailedError): + ray.get(obj) + else: + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(obj) +# TODO(swang): Add a test to check for ObjectReconstructionFailedError if we +# fail to reconstruct a ray.put object. @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky on Windows.") @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): @@ -287,10 +300,79 @@ def test_basic_reconstruction_actor_task(ray_start_cluster, if reconstruction_enabled: ray.get(dependent_task.remote(obj)) else: - with pytest.raises(ray.exceptions.RayTaskError) as e: + with pytest.raises(ray.exceptions.RayTaskError): ray.get(dependent_task.remote(obj)) - with pytest.raises(ray.exceptions.ObjectLostError): - raise e.as_instanceof_cause() + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(obj) + + # Make sure the actor handle is still usable. + pid = ray.get(a.pid.remote()) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Very flaky on Windows.") +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_basic_reconstruction_actor_lineage_disabled(ray_start_cluster, + reconstruction_enabled): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 100, + "object_timeout_milliseconds": 200, + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = False + + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _system_config=config, + enable_object_reconstruction=reconstruction_enabled) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 2}, object_store_memory=10**8) + cluster.add_node( + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) + cluster.wait_for_nodes() + + # Actor can be restarted but its outputs cannot be reconstructed. + @ray.remote(max_restarts=-1, resources={"node1": 1}, num_cpus=0) + class Actor: + def __init__(self): + pass + + def large_object(self): + return np.zeros(10**7, dtype=np.uint8) + + def pid(self): + return os.getpid() + + @ray.remote + def dependent_task(x): + return + + a = Actor.remote() + pid = ray.get(a.pid.remote()) + obj = a.large_object.remote() + ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) + + # Workaround to kill the actor process too since there is a bug where the + # actor's plasma client hangs after the plasma store has exited. + os.kill(pid, SIGKILL) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, resources={"node1": 2}, object_store_memory=10**8) + + wait_for_pid_to_exit(pid) + + if reconstruction_enabled: + with pytest.raises(ray.exceptions.ObjectReconstructionFailedError): + ray.get(obj) + else: + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(obj) # Make sure the actor handle is still usable. pid = ray.get(a.pid.remote()) @@ -369,12 +451,13 @@ def test_basic_reconstruction_actor_constructor(ray_start_cluster, if reconstruction_enabled: ray.get(a.dependent_task.remote(obj)) else: - with pytest.raises(ray.exceptions.RayActorError) as e: + with pytest.raises(ray.exceptions.RayActorError) as exc_info: x = a.dependent_task.remote(obj) print(x) ray.get(x) - with pytest.raises(ray.exceptions.ObjectLostError): - raise e.get_creation_task_error() + exc = str(exc_info.value) + assert "arguments" in exc + assert "ObjectLostError" in exc @pytest.mark.skip(reason="This hangs due to a deadlock in admission control.") @@ -424,6 +507,23 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): for obj in downstream: ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) + + if reconstruction_enabled: + for obj in downstream: + ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) + else: + with pytest.raises(ray.exceptions.RayTaskError): + for obj in downstream: + ray.get( + dependent_task.options(resources={ + "node1": 1 + }).remote(obj)) + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(obj) + cluster.remove_node(node_to_kill, allow_graceful=False) cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) @@ -432,14 +532,9 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): for obj in downstream: ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) else: - with pytest.raises(ray.exceptions.RayTaskError) as e: - for obj in downstream: - ray.get( - dependent_task.options(resources={ - "node1": 1 - }).remote(obj)) + for obj in downstream: with pytest.raises(ray.exceptions.ObjectLostError): - raise e.as_instanceof_cause() + ray.get(obj) @pytest.mark.skip(reason="This hangs due to a deadlock in admission control.") @@ -488,10 +583,10 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): if reconstruction_enabled: ray.get(dependent_task.remote(obj)) else: - with pytest.raises(ray.exceptions.RayTaskError) as e: + with pytest.raises(ray.exceptions.RayTaskError): ray.get(dependent_task.remote(obj)) - with pytest.raises(ray.exceptions.ObjectLostError): - raise e.as_instanceof_cause() + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(obj) @pytest.mark.skip(reason="This hangs due to a deadlock in admission control.") diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index e7693bbee..f7cb02ac2 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -415,8 +415,11 @@ def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put, try: assert ray.get(tail_oid) is None assert not failure - # TODO(edoakes): this should raise WorkerError. - except ray.exceptions.ObjectLostError: + except ray.exceptions.OwnerDiedError: + # There is only 1 core, so the same worker will execute all `recursive` + # tasks. Therefore, if we kill the worker during the last task, its + # owner (the worker that executed the second-to-last task) will also + # have died. assert failure # Reference should be gone, check that array gets evicted. @@ -494,15 +497,21 @@ def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put, return @ray.remote - def launch_pending_task(ref, signal): - return child.remote(ref[0], signal.wait.remote()) + class Submitter: + def __init__(self): + pass + + def launch_pending_task(self, ref, signal): + return child.remote(ref[0], signal.wait.remote()) signal = SignalActor.remote() # Test that the reference held by the actor isn't evicted. array_oid = put_object( np.zeros(20 * 1024 * 1024, dtype=np.uint8), use_ray_put) - child_return_id = ray.get(launch_pending_task.remote([array_oid], signal)) + s = Submitter.remote() + child_return_id = ray.get( + s.launch_pending_task.remote([array_oid], signal)) # Remove the local reference. array_oid_bytes = array_oid.binary() @@ -515,7 +524,7 @@ def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put, try: ray.get(child_return_id) assert not failure - except (ray.exceptions.WorkerCrashedError, ray.exceptions.ObjectLostError): + except ray.exceptions.WorkerCrashedError: assert failure del child_return_id diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py index 08e7364f9..a5f9597ed 100644 --- a/python/ray/tests/test_reference_counting_2.py +++ b/python/ray/tests/test_reference_counting_2.py @@ -91,12 +91,15 @@ def test_recursively_nest_ids(one_worker_100MiB, use_ray_put, failure): # Fulfill the dependency, causing the tail task to finish. ray.get(signal.send.remote()) - try: + if not failure: ray.get(tail_oid) - assert not failure - # TODO(edoakes): this should raise WorkerError. - except ray.exceptions.ObjectLostError: - assert failure + else: + # There is only 1 core, so the same worker will execute all `recursive` + # tasks. Therefore, if we kill the worker during the last task, its + # owner (the worker that executed the second-to-last task) will also + # have died. + with pytest.raises(ray.exceptions.OwnerDiedError): + ray.get(tail_oid) # Reference should be gone, check that array gets evicted. _fill_object_store_and_get(array_oid_bytes, succeed=False) @@ -233,8 +236,11 @@ def test_recursively_pass_returned_object_ref(one_worker_100MiB, use_ray_put, ray.get(outer_oid) _fill_object_store_and_get(inner_oid) assert not failure - # TODO(edoakes): this should raise WorkerError. - except ray.exceptions.ObjectLostError: + except ray.exceptions.OwnerDiedError: + # There is only 1 core, so the same worker will execute all `recursive` + # tasks. Therefore, if we kill the worker during the last task, its + # owner (the worker that executed the second-to-last task) will also + # have died. assert failure inner_oid_bytes = inner_oid.binary() @@ -286,6 +292,10 @@ def test_recursively_return_borrowed_object_ref(one_worker_100MiB, use_ray_put, # Reference should be gone, check that returned ID gets evicted. _fill_object_store_and_get(final_oid_bytes, succeed=False) + if failure: + with pytest.raises(ray.exceptions.OwnerDiedError): + ray.get(final_oid) + @pytest.mark.parametrize("failure", [False, True]) def test_borrowed_id_failure(one_worker_100MiB, failure): @@ -314,7 +324,8 @@ def test_borrowed_id_failure(one_worker_100MiB, failure): def resolve_ref(self): assert self.ref is not None if failure: - with pytest.raises(ray.exceptions.ObjectLostError): + with pytest.raises( + ray.exceptions.ReferenceCountingAssertionError): ray.get(self.ref) else: ray.get(self.ref) diff --git a/python/ray/tests/test_traceback.py b/python/ray/tests/test_traceback.py index bc136eeef..3081bcc6e 100644 --- a/python/ray/tests/test_traceback.py +++ b/python/ray/tests/test_traceback.py @@ -174,9 +174,9 @@ ZeroDivisionError: division by zero""" def test_dep_failure(ray_start_regular): """Test the stacktrace genereated due to dependency failures.""" expected_output = """ray::f() (pid=XXX, ip=YYY) # noqa - Some of the input arguments for this task could not be computed: + At least one of the input arguments for this task could not be computed: ray.exceptions.RayTaskError: ray::a() (pid=XXX, ip=YYY) - Some of the input arguments for this task could not be computed: + At least one of the input arguments for this task could not be computed: ray.exceptions.RayTaskError: ray::b() (pid=XXX, ip=YYY) File "FILE", line ZZ, in b raise ValueError("FILE") diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 4df786088..7775942f4 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -706,8 +706,10 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ object_recovery_manager_ = std::make_unique( rpc_address_, raylet_client_factory, local_raylet_client_, object_lookup_fn, task_manager_, reference_counter_, memory_store_, - [this](const ObjectID &object_id, bool pin_object) { - RAY_CHECK_OK(Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), + [this](const ObjectID &object_id, rpc::ErrorType reason, bool pin_object) { + RAY_LOG(DEBUG) << "Failed to recover object " << object_id << " due to " + << rpc::ErrorType_Name(reason); + RAY_CHECK_OK(Put(RayObject(reason), /*contained_object_ids=*/{}, object_id, /*pin_object=*/pin_object)); }, @@ -1458,8 +1460,7 @@ Status CoreWorker::Delete(const std::vector &object_ids, bool local_on // no longer reachable. memory_store_->Delete(object_ids); for (const auto &object_id : object_ids) { - RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), - object_id)); + RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_DELETED), object_id)); } // We only delete from plasma, which avoids hangs (issue #7105). In-memory diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index 024881e36..a46d38d96 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -45,14 +45,18 @@ void FutureResolver::ProcessResolvedObject(const ObjectID &object_id, << " that was deserialized: " << status.ToString(); } - if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) { - // The owner is gone or the owner replied that the object has gone - // out of scope (this is an edge case in the distributed ref counting - // protocol where a borrower dies before it can notify the owner of - // another borrower). Store an error so that an exception will be + if (!status.ok()) { + // The owner is unreachable. Store an error so that an exception will be // thrown immediately when the worker tries to get the value. - RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), - object_id)); + RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OWNER_DIED), object_id)); + } else if (reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) { + // The owner replied that the object has gone out of scope (this is an edge + // case in the distributed ref counting protocol where a borrower dies + // before it can notify the owner of another borrower). Store an error so + // that an exception will be thrown immediately when the worker tries to + // get the value. + RAY_UNUSED( + in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_DELETED), object_id)); } else if (reply.status() == rpc::GetObjectStatusReply::CREATED) { // The object is either an indicator that the object is in Plasma, or // the object has been returned directly in the reply. In either diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index d2c2f974e..966911fb9 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -34,8 +34,7 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { if (!owned_by_us) { RAY_LOG(DEBUG) << "Reconstruction for borrowed objects (" << object_id << ") is not supported"; - reconstruction_failure_callback_(object_id, /*pin_object=*/false); - return true; + return false; } bool already_pending_recovery = true; @@ -81,7 +80,9 @@ void ObjectRecoveryManager::PinOrReconstructObject( // There are no more copies to pin, try to reconstruct the object. ReconstructObject(object_id); } else { - reconstruction_failure_callback_(object_id, /*pin_object=*/true); + // All copies lost, and lineage reconstruction is disabled. + recovery_failure_callback_(object_id, rpc::ErrorType::OBJECT_LOST, + /*pin_object=*/true); } } @@ -143,12 +144,14 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { RAY_LOG(INFO) << "Failed to reconstruct object " << dep << ": " << status.message(); // We do not pin the dependency because we may not be the owner. - reconstruction_failure_callback_(dep, /*pin_object=*/false); + recovery_failure_callback_(dep, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE, + /*pin_object=*/false); } } } else { RAY_LOG(INFO) << "Failed to reconstruct object " << object_id; - reconstruction_failure_callback_(object_id, /*pin_object=*/true); + recovery_failure_callback_(object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE, + /*pin_object=*/true); } } diff --git a/src/ray/core_worker/object_recovery_manager.h b/src/ray/core_worker/object_recovery_manager.h index 5932ea6ef..cbabf310e 100644 --- a/src/ray/core_worker/object_recovery_manager.h +++ b/src/ray/core_worker/object_recovery_manager.h @@ -33,6 +33,11 @@ typedef std::function &raylet_locations)> ObjectLookupCallback; +// A callback for if we fail to recover an object. +typedef std::function + ObjectRecoveryFailureCallback; + class ObjectRecoveryManager { public: ObjectRecoveryManager(const rpc::Address &rpc_address, @@ -44,8 +49,7 @@ class ObjectRecoveryManager { std::shared_ptr task_resubmitter, std::shared_ptr reference_counter, std::shared_ptr in_memory_store, - std::function - reconstruction_failure_callback, + const ObjectRecoveryFailureCallback &recovery_failure_callback, bool lineage_reconstruction_enabled) : task_resubmitter_(task_resubmitter), reference_counter_(reference_counter), @@ -54,7 +58,7 @@ class ObjectRecoveryManager { local_object_pinning_client_(local_object_pinning_client), object_lookup_(object_lookup), in_memory_store_(in_memory_store), - reconstruction_failure_callback_(reconstruction_failure_callback), + recovery_failure_callback_(recovery_failure_callback), lineage_reconstruction_enabled_(lineage_reconstruction_enabled) {} /// Recover an object that was stored in plasma. This will only succeed for @@ -127,8 +131,7 @@ class ObjectRecoveryManager { std::shared_ptr in_memory_store_; /// Callback to call if recovery fails. - const std::function - reconstruction_failure_callback_; + const ObjectRecoveryFailureCallback recovery_failure_callback_; /// Whether lineage reconstruction is enabled. If disabled, then we will try /// to pin new copies for a lost object, but we will never reconstruct it diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index aaf9d5ebe..98ad8dcd2 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -999,8 +999,8 @@ absl::optional> ReferenceCounter::GetObjectLocations absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it == object_id_refs_.end()) { - RAY_LOG(WARNING) << "Tried to get the object locations for an object " << object_id - << " that doesn't exist in the reference table"; + RAY_LOG(DEBUG) << "Tried to get the object locations for an object " << object_id + << " that doesn't exist in the reference table"; return absl::nullopt; } return it->second.locations; @@ -1151,9 +1151,13 @@ Status ReferenceCounter::FillObjectInformation( absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it == object_id_refs_.end()) { - return Status::ObjectNotFound("Object " + object_id.Hex() + " not found"); + RAY_LOG(WARNING) << "Object locations requested for " << object_id + << ", but ref already removed. This may be a bug in the distributed " + "reference counting protocol."; + object_info->set_ref_removed(true); + } else { + FillObjectInformationInternal(it, object_info); } - FillObjectInformationInternal(it, object_info); return Status::OK(); } @@ -1173,10 +1177,17 @@ void ReferenceCounter::PublishObjectLocationSnapshot(const ObjectID &object_id) absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it == object_id_refs_.end()) { - RAY_LOG(DEBUG) << "Tried to register a location subscriber for an object " - << object_id << " that doesn't exist in the reference table." - << " The object has probably already been freed."; - // Consider the object is already freed, and not subscribeable. + RAY_LOG(WARNING) << "Object locations requested for " << object_id + << ", but ref already removed. This may be a bug in the distributed " + "reference counting protocol."; + // First let subscribers handle this error. + rpc::PubMessage pub_message; + pub_message.set_key_id(object_id.Binary()); + pub_message.set_channel_type(rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL); + pub_message.mutable_worker_object_locations_message()->set_ref_removed(true); + object_info_publisher_->Publish(rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL, + pub_message, object_id.Binary()); + // Then, publish a failure to subscribers since this object is unreachable. object_info_publisher_->PublishFailure( rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL, object_id.Binary()); return; diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc index 1f9a89979..4625831d9 100644 --- a/src/ray/core_worker/test/object_recovery_manager_test.cc +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -105,9 +105,9 @@ class MockObjectDirectory { std::unordered_map> locations; }; -class ObjectRecoveryManagerTest : public ::testing::Test { +class ObjectRecoveryManagerTestBase : public ::testing::Test { public: - ObjectRecoveryManagerTest() + ObjectRecoveryManagerTestBase(bool lineage_enabled) : local_raylet_id_(NodeID::FromRandom()), publisher_(std::make_shared()), subscriber_(std::make_shared()), @@ -117,7 +117,7 @@ class ObjectRecoveryManagerTest : public ::testing::Test { task_resubmitter_(std::make_shared()), ref_counter_(std::make_shared( rpc::Address(), publisher_.get(), subscriber_.get(), - /*lineage_pinning_enabled=*/true)), + /*lineage_pinning_enabled=*/lineage_enabled)), manager_(rpc::Address(), [&](const std::string &ip, int port) { return raylet_client_; }, raylet_client_, @@ -126,9 +126,9 @@ class ObjectRecoveryManagerTest : public ::testing::Test { return Status::OK(); }, task_resubmitter_, ref_counter_, memory_store_, - [&](const ObjectID &object_id, bool pin_object) { + [&](const ObjectID &object_id, rpc::ErrorType reason, bool pin_object) { RAY_CHECK(failed_reconstructions_.count(object_id) == 0); - failed_reconstructions_[object_id] = pin_object; + failed_reconstructions_[object_id] = reason; std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); @@ -140,10 +140,10 @@ class ObjectRecoveryManagerTest : public ::testing::Test { std::vector()); RAY_CHECK(memory_store_->Put(data, object_id)); }, - /*lineage_reconstruction_enabled=*/true) {} + /*lineage_reconstruction_enabled=*/lineage_enabled) {} NodeID local_raylet_id_; - std::unordered_map failed_reconstructions_; + std::unordered_map failed_reconstructions_; std::shared_ptr publisher_; std::shared_ptr subscriber_; @@ -155,21 +155,30 @@ class ObjectRecoveryManagerTest : public ::testing::Test { ObjectRecoveryManager manager_; }; -TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { +class ObjectRecoveryLineageDisabledTest : public ObjectRecoveryManagerTestBase { + public: + ObjectRecoveryLineageDisabledTest() : ObjectRecoveryManagerTestBase(false) {} +}; + +class ObjectRecoveryManagerTest : public ObjectRecoveryManagerTestBase { + public: + ObjectRecoveryManagerTest() : ObjectRecoveryManagerTestBase(true) {} +}; + +TEST_F(ObjectRecoveryLineageDisabledTest, TestNoReconstruction) { // Lineage recording disabled. ObjectID object_id = ObjectID::FromRandom(); ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_TRUE(failed_reconstructions_.empty()); ASSERT_TRUE(object_directory_->Flush() == 1); - ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1); + ASSERT_EQ(failed_reconstructions_[object_id], rpc::ErrorType::OBJECT_LOST); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); // Borrowed object. object_id = ObjectID::FromRandom(); ref_counter_->AddLocalReference(object_id, ""); - ASSERT_TRUE(manager_.RecoverObject(object_id)); - ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1); + ASSERT_FALSE(manager_.RecoverObject(object_id)); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); // Ref went out of scope. @@ -179,6 +188,19 @@ TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); } +TEST_F(ObjectRecoveryLineageDisabledTest, TestPinNewCopy) { + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + std::vector addresses({rpc::Address()}); + object_directory_->SetLocations(object_id, addresses); + + ASSERT_TRUE(manager_.RecoverObject(object_id)); + ASSERT_TRUE(object_directory_->Flush() == 1); + ASSERT_TRUE(raylet_client_->Flush() == 1); + ASSERT_TRUE(failed_reconstructions_.empty()); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); +} + TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { ObjectID object_id = ObjectID::FromRandom(); ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); @@ -256,6 +278,36 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) { } } +TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) { + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + + ASSERT_TRUE(manager_.RecoverObject(object_id)); + ASSERT_TRUE(object_directory_->Flush() == 1); + + ASSERT_TRUE(failed_reconstructions_[object_id] == + rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); +} + +TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) { + ObjectID dep_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(dep_id, {}, rpc::Address(), "", 0, true); + + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); + task_resubmitter_->AddTask(object_id.TaskId(), {dep_id}); + RAY_LOG(INFO) << object_id; + + ASSERT_TRUE(manager_.RecoverObject(object_id)); + ASSERT_EQ(object_directory_->Flush(), 1); + // Trigger callback for dep ID. + ASSERT_EQ(object_directory_->Flush(), 1); + ASSERT_EQ(failed_reconstructions_[dep_id], rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE); + ASSERT_EQ(failed_reconstructions_.count(object_id), 0); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 1); +} + } // namespace core } // namespace ray diff --git a/src/ray/object_manager/ownership_based_object_directory.cc b/src/ray/object_manager/ownership_based_object_directory.cc index 916dd0e8b..6e0b1ca48 100644 --- a/src/ray/object_manager/ownership_based_object_directory.cc +++ b/src/ray/object_manager/ownership_based_object_directory.cc @@ -269,14 +269,21 @@ ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations( auto msg_published_callback = [this, object_id](const rpc::PubMessage &pub_message) { RAY_CHECK(pub_message.has_worker_object_locations_message()); const auto &location_info = pub_message.worker_object_locations_message(); - ObjectLocationSubscriptionCallback(location_info, object_id, - /*location_lookup_failed*/ false); + ObjectLocationSubscriptionCallback( + location_info, object_id, + /*location_lookup_failed*/ !location_info.ref_removed()); + if (location_info.ref_removed()) { + mark_as_failed_(object_id, rpc::ErrorType::OBJECT_DELETED); + } }; auto failure_callback = [this, owner_address](const std::string &object_id_binary) { const auto object_id = ObjectID::FromBinary(object_id_binary); - mark_as_failed_(object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE); + mark_as_failed_(object_id, rpc::ErrorType::OWNER_DIED); rpc::WorkerObjectLocationsPubMessage location_info; + // Location lookup can fail if the owner is reachable but no longer has a + // record of this ObjectRef, most likely due to an issue with the + // distributed reference counting protocol. ObjectLocationSubscriptionCallback(location_info, object_id, /*location_lookup_failed*/ true); }; @@ -399,7 +406,13 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations( if (!status.ok()) { RAY_LOG(ERROR) << "Worker " << worker_id << " failed to get the location for " << object_id << status.ToString(); - mark_as_failed_(object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE); + mark_as_failed_(object_id, rpc::ErrorType::OWNER_DIED); + } else if (reply.object_location_info().ref_removed()) { + RAY_LOG(ERROR) + << "Worker " << worker_id << " failed to get the location for " + << object_id + << ", object already released by distributed reference counting protocol"; + mark_as_failed_(object_id, rpc::ErrorType::OBJECT_DELETED); } else { UpdateObjectLocations(reply.object_location_info(), gcs_client_, &node_ids, &spilled_url, &spilled_node_id, &object_size); diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index e8d77bf6f..b2014248f 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -108,14 +108,12 @@ enum ErrorType { // Indicates that a task failed because the actor died unexpectedly before // finishing it. ACTOR_DIED = 1; - // Indicates that an object is lost and cannot be restarted. - // Note, this currently only happens to actor objects. When the actor's - // state is already after the object's creating task, the actor cannot - // re-run the task. - // TODO(hchen): we may want to reuse this error type for more cases. E.g., - // 1) A object that was put by the driver. - // 2) The object's creating task is already cleaned up from GCS (this - // currently crashes raylet). + // This object was lost from distributed memory to a node failure or system + // error. We use this error when lineage reconstruction is enabled, but the + // object cannot be reconstructed but the lineage is not available.. + // TODO(swang): We may want to break down this error type further, e.g., + // object's lineage was evicted or object depended on an actor task that + // can't be reconstructed. OBJECT_UNRECONSTRUCTABLE = 2; // Indicates that a task failed due to user code failure. TASK_EXECUTION_EXCEPTION = 3; @@ -129,6 +127,16 @@ enum ErrorType { ACTOR_CREATION_FAILED = 6; // Indicates that the runtime_env failed to be created. RUNTIME_ENV_SETUP_FAILED = 7; + // This object was lost from distributed memory to a node failure or system + // error. We use this error when lineage reconstruction is disabled. + OBJECT_LOST = 8; + // This object is unreachable because its owner has died. + OWNER_DIED = 9; + // This object is unreachable because the owner is alive but no longer has a + // record of this object, meaning that the physical object has likely already + // been deleted from distributed memory. This can happen in distributed + // reference counting, due to a bug or corner case. + OBJECT_DELETED = 10; } /// The task exception encapsulates all information about task diff --git a/src/ray/protobuf/pubsub.proto b/src/ray/protobuf/pubsub.proto index 48ffc189e..fc046afcf 100644 --- a/src/ray/protobuf/pubsub.proto +++ b/src/ray/protobuf/pubsub.proto @@ -75,6 +75,11 @@ message WorkerObjectLocationsPubMessage { // The ID of the node that stores the primary copy in plasma. // This could be Nil if the object has been evicted or inlined. bytes primary_node_id = 6; + // If this is set, then the owner is alive but no longer has an entry for + // this reference. This can happen if there is a bug in the distributed ref + // counting protocol that causes the object to be released while there are + // still references. + bool ref_removed = 7; } /// Indicating the subscriber needs to handle failure callback. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 816dd54f9..b8660f8ad 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1710,6 +1710,10 @@ void NodeManager::HandleCancelWorkerLease(const rpc::CancelWorkerLeaseRequest &r void NodeManager::MarkObjectsAsFailed( const ErrorType &error_type, const std::vector objects_to_fail, const JobID &job_id) { + // TODO(swang): Ideally we should return the error directly to the client + // that needs this object instead of storing the object in plasma, which is + // not guaranteed to succeed. This avoids hanging the client if plasma is not + // reachable. const std::string meta = std::to_string(static_cast(error_type)); for (const auto &ref : objects_to_fail) { ObjectID object_id = ObjectID::FromBinary(ref.object_id());