[core][usability] Disambiguate ObjectLostErrors for better understandability (#18292)

* Define error types, throw error for ObjectReleased

* x

* Disambiguate OBJECT_UNRECONSTRUCTABLE and OBJECT_LOST

* OwnerDiedError

* fix test

* x

* ObjectReconstructionFailed

* ObjectReconstructionFailed

* x

* x

* print owner addr

* str

* doc

* rename

* x
This commit is contained in:
Stephanie Wang 2021-09-13 16:16:17 -07:00 committed by GitHub
parent 6479a5fcfc
commit 284dee493e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 475 additions and 120 deletions

View file

@ -74,7 +74,11 @@ void NativeObjectStore::CheckException(const std::string &meta_str,
throw RayWorkerException(std::move(data_str)); throw RayWorkerException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::ACTOR_DIED)) { } else if (meta_str == std::to_string(ray::rpc::ErrorType::ACTOR_DIED)) {
throw RayActorException(std::move(data_str)); throw RayActorException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE)) { } else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE) ||
meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_LOST) ||
meta_str == std::to_string(ray::rpc::ErrorType::OWNER_DIED) ||
meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_DELETED)) {
// TODO: Differentiate object errors.
throw UnreconstructableException(std::move(data_str)); throw UnreconstructableException(std::move(data_str));
} else if (meta_str == std::to_string(ray::rpc::ErrorType::TASK_EXECUTION_EXCEPTION)) { } else if (meta_str == std::to_string(ray::rpc::ErrorType::TASK_EXECUTION_EXCEPTION)) {
throw RayTaskException(std::move(data_str)); throw RayTaskException(std::move(data_str));

View file

@ -235,6 +235,35 @@ as well as some known problems. If you encounter other problems, please
.. _`let us know`: https://github.com/ray-project/ray/issues .. _`let us know`: https://github.com/ray-project/ray/issues
Understanding `ObjectLostErrors`
--------------------------------
Ray throws an ``ObjectLostError`` to the application when an object cannot be
retrieved due to application or system error. This can occur during a
``ray.get()`` call or when fetching a task's arguments, and can happen for a
number of reasons. Here is a guide to understanding the root cause for
different error types:
- ``ObjectLostError``: The object was successfully created, but then all copies
were lost due to node failure.
- ``OwnerDiedError``: The owner of an object, i.e., the Python worker that
first created the ``ObjectRef`` via ``.remote()`` or ``ray.put()``, has died.
The owner stores critical object metadata and an object cannot be retrieved
if this process is lost.
- ``ObjectReconstructionFailedError``: Should only be thrown when `lineage
reconstruction`_ is enabled. This error is thrown if an object, or another
object that this object depends on, cannot be reconstructed because the
maximum number of task retries has been exceeded. By default, a non-actor
task can be retried up to 3 times and an actor task cannot be retried.
This can be overridden with the ``max_retries`` parameter for remote
functions and the ``max_task_retries`` parameter for actors.
- ``ReferenceCountingAssertionError``: The object has already been deleted,
so it cannot be retrieved. Ray implements automatic memory management through
distributed reference counting, so this error should not happen in general.
However, there is a `known edge case`_ that can produce this error.
.. _`lineage reconstruction`: https://docs.ray.io/en/master/fault-tolerance.html
.. _`known edge case`: https://github.com/ray-project/ray/issues/18456
Crashes Crashes
------- -------

View file

@ -33,6 +33,12 @@ public class ObjectSerializer {
String.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes(); String.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes();
private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META = private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META =
String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes(); String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes();
private static final byte[] OBJECT_LOST_META =
String.valueOf(ErrorType.OBJECT_LOST.getNumber()).getBytes();
private static final byte[] OWNER_DIED_META =
String.valueOf(ErrorType.OWNER_DIED.getNumber()).getBytes();
private static final byte[] OBJECT_DELETED_META =
String.valueOf(ErrorType.OBJECT_DELETED.getNumber()).getBytes();
private static final byte[] TASK_EXECUTION_EXCEPTION_META = private static final byte[] TASK_EXECUTION_EXCEPTION_META =
String.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes(); String.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes();
@ -77,6 +83,12 @@ public class ObjectSerializer {
return Serializer.decode(data, objectType); return Serializer.decode(data, objectType);
} else if (Bytes.indexOf(meta, WORKER_EXCEPTION_META) == 0) { } else if (Bytes.indexOf(meta, WORKER_EXCEPTION_META) == 0) {
return new RayWorkerException(); return new RayWorkerException();
} else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0
|| Bytes.indexOf(meta, OBJECT_LOST_META) == 0
|| Bytes.indexOf(meta, OWNER_DIED_META) == 0
|| Bytes.indexOf(meta, OBJECT_DELETED_META) == 0) {
// TODO: Differentiate object errors.
return new UnreconstructableException(objectId);
} else if (Bytes.indexOf(meta, ACTOR_EXCEPTION_META) == 0) { } else if (Bytes.indexOf(meta, ACTOR_EXCEPTION_META) == 0) {
ActorId actorId = IdUtil.getActorIdFromObjectId(objectId); ActorId actorId = IdUtil.getActorIdFromObjectId(objectId);
if (data != null && data.length > 0) { if (data != null && data.length > 0) {
@ -86,8 +98,6 @@ public class ObjectSerializer {
} }
} }
return new RayActorException(actorId); return new RayActorException(actorId);
} else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0) {
return new UnreconstructableException(objectId);
} else if (Bytes.indexOf(meta, TASK_EXECUTION_EXCEPTION_META) == 0) { } else if (Bytes.indexOf(meta, TASK_EXECUTION_EXCEPTION_META) == 0) {
return deserializeRayException(data, objectId); return deserializeRayException(data, objectId);
} else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_ACTOR_HANDLE) == 0) { } else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_ACTOR_HANDLE) == 0) {

View file

@ -77,6 +77,7 @@ cdef class BaseID:
cdef class ObjectRef(BaseID): cdef class ObjectRef(BaseID):
cdef: cdef:
CObjectID data CObjectID data
c_string owner_addr
# Flag indicating whether or not this object ref was added to the set # Flag indicating whether or not this object ref was added to the set
# of active IDs in the core worker so we know whether we should clean # of active IDs in the core worker so we know whether we should clean
# it up. # it up.

View file

@ -184,7 +184,9 @@ cdef RayObjectsToDataMetadataPairs(
cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs): cdef VectorToObjectRefs(const c_vector[CObjectReference] &object_refs):
result = [] result = []
for i in range(object_refs.size()): for i in range(object_refs.size()):
result.append(ObjectRef(object_refs[i].object_id(), result.append(ObjectRef(
object_refs[i].object_id(),
object_refs[i].owner_address().SerializeAsString(),
object_refs[i].call_site())) object_refs[i].call_site()))
return result return result

View file

@ -3,7 +3,9 @@ from traceback import format_exception
import ray.cloudpickle as pickle import ray.cloudpickle as pickle
from ray.core.generated.common_pb2 import RayException, Language, PYTHON from ray.core.generated.common_pb2 import RayException, Language, PYTHON
from ray.core.generated.common_pb2 import Address
import ray.ray_constants as ray_constants import ray.ray_constants as ray_constants
from ray._raylet import WorkerID
import colorama import colorama
import setproctitle import setproctitle
@ -171,7 +173,7 @@ class RayTaskError(RayError):
# due to the dependency failure. # due to the dependency failure.
# Print out an user-friendly # Print out an user-friendly
# message to explain that.. # message to explain that..
out.append(" Some of the input arguments for " out.append(" At least one of the input arguments for "
"this task could not be computed:") "this task could not be computed:")
if i + 1 < len(lines) and lines[i + 1].startswith(" "): if i + 1 < len(lines) and lines[i + 1].startswith(" "):
# If the next line is indented with 2 space, # If the next line is indented with 2 space,
@ -280,30 +282,97 @@ class ObjectStoreFullError(RayError):
class ObjectLostError(RayError): class ObjectLostError(RayError):
"""Indicates that an object has been lost due to node failure. """Indicates that the object is lost from distributed memory, due to
node failure or system error.
Attributes: Attributes:
object_ref_hex: Hex ID of the object. object_ref_hex: Hex ID of the object.
""" """
def __init__(self, object_ref_hex, call_site): def __init__(self, object_ref_hex, owner_address, call_site):
self.object_ref_hex = object_ref_hex self.object_ref_hex = object_ref_hex
self.owner_address = owner_address
self.call_site = call_site.replace( self.call_site = call_site.replace(
ray_constants.CALL_STACK_LINE_DELIMITER, "\n ") ray_constants.CALL_STACK_LINE_DELIMITER, "\n ")
def __str__(self): def _base_str(self):
msg = (f"Object {self.object_ref_hex} cannot be retrieved due to node " msg = f"Failed to retrieve object {self.object_ref_hex}. "
"failure or system error.")
if self.call_site: if self.call_site:
msg += (f" The ObjectRef was created at: {self.call_site}") msg += (f"The ObjectRef was created at: {self.call_site}")
else: else:
msg += ( msg += (
" To see information about where this ObjectRef was created " "To see information about where this ObjectRef was created "
"in Python, set the environment variable " "in Python, set the environment variable "
"RAY_record_ref_creation_sites=1 during `ray start` and " "RAY_record_ref_creation_sites=1 during `ray start` and "
"`ray.init()`.") "`ray.init()`.")
return msg return msg
def __str__(self):
return self._base_str() + "\n\n" + (
f"All copies of {self.object_ref_hex} have been lost due to node "
"failure. Check cluster logs (`/tmp/ray/session_latest/logs`) for "
"more information about the failure.")
class ReferenceCountingAssertionError(ObjectLostError, AssertionError):
"""Indicates that an object has been deleted while there was still a
reference to it.
Attributes:
object_ref_hex: Hex ID of the object.
"""
def __str__(self):
return self._base_str() + "\n\n" + (
"The object has already been deleted by the reference counting "
"protocol. This should not happen.")
class OwnerDiedError(ObjectLostError):
"""Indicates that the owner of the object has died while there is still a
reference to the object.
Attributes:
object_ref_hex: Hex ID of the object.
"""
def __str__(self):
log_loc = "`/tmp/ray/session_latest/logs`"
if self.owner_address:
try:
addr = Address()
addr.ParseFromString(self.owner_address)
ip_addr = addr.ip_address
worker_id = WorkerID(addr.worker_id)
log_loc = (
f"`/tmp/ray/session_latest/logs/*{worker_id.hex()}*`"
f" at IP address {ip_addr}")
except Exception:
# Catch all to make sure we always at least print the default
# message.
pass
return self._base_str() + "\n\n" + (
"The object's owner has exited. This is the Python "
"worker that first created the ObjectRef via `.remote()` or "
"`ray.put()`. "
f"Check cluster logs ({log_loc}) for more "
"information about the Python worker failure.")
class ObjectReconstructionFailedError(ObjectLostError):
"""Indicates that the owner of the object has died while there is still a
reference to the object.
Attributes:
object_ref_hex: Hex ID of the object.
"""
def __str__(self):
return self._base_str() + "\n\n" + (
"The object cannot be reconstructed "
"because the maximum number of task retries has been exceeded.")
class GetTimeoutError(RayError): class GetTimeoutError(RayError):
"""Indicates that a call to the worker timed out.""" """Indicates that a call to the worker timed out."""
@ -338,6 +407,9 @@ RAY_EXCEPTION_TYPES = [
RayActorError, RayActorError,
ObjectStoreFullError, ObjectStoreFullError,
ObjectLostError, ObjectLostError,
ReferenceCountingAssertionError,
ObjectReconstructionFailedError,
OwnerDiedError,
GetTimeoutError, GetTimeoutError,
AsyncioActorExit, AsyncioActorExit,
RuntimeEnvSetupError, RuntimeEnvSetupError,

View file

@ -35,9 +35,10 @@ def _set_future_helper(
cdef class ObjectRef(BaseID): cdef class ObjectRef(BaseID):
def __init__(self, id, call_site_data=""): def __init__(self, id, owner_addr="", call_site_data=""):
check_id(id) check_id(id)
self.data = CObjectID.FromBinary(<c_string>id) self.data = CObjectID.FromBinary(<c_string>id)
self.owner_addr = owner_addr
self.in_core_worker = False self.in_core_worker = False
self.call_site_data = call_site_data self.call_site_data = call_site_data
@ -85,6 +86,9 @@ cdef class ObjectRef(BaseID):
def job_id(self): def job_id(self):
return self.task_id().job_id() return self.task_id().job_id()
def owner_address(self):
return self.owner_addr
def call_site(self): def call_site(self):
return decode(self.call_site_data) return decode(self.call_site_data)

View file

@ -6,10 +6,11 @@ import ray.cloudpickle as pickle
from ray import ray_constants from ray import ray_constants
import ray._private.utils import ray._private.utils
from ray._private.gcs_utils import ErrorType from ray._private.gcs_utils import ErrorType
from ray.exceptions import (RayError, PlasmaObjectNotAvailable, RayTaskError, from ray.exceptions import (
RayActorError, TaskCancelledError, RayError, PlasmaObjectNotAvailable, RayTaskError, RayActorError,
WorkerCrashedError, ObjectLostError, TaskCancelledError, WorkerCrashedError, ObjectLostError,
RaySystemError, RuntimeEnvSetupError) ReferenceCountingAssertionError, OwnerDiedError,
ObjectReconstructionFailedError, RaySystemError, RuntimeEnvSetupError)
from ray._raylet import ( from ray._raylet import (
split_buffer, split_buffer,
unpack_pickle5_buffers, unpack_pickle5_buffers,
@ -37,7 +38,7 @@ def _object_ref_deserializer(binary, call_site, owner_address, object_status):
# the core worker to resolve the value. This is to make sure # the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectRef is greater than 0 by the # that the ref count for the ObjectRef is greater than 0 by the
# time the core worker resolves the value of the object. # time the core worker resolves the value of the object.
obj_ref = ray.ObjectRef(binary, call_site) obj_ref = ray.ObjectRef(binary, owner_address, call_site)
# TODO(edoakes): we should be able to just capture a reference # TODO(edoakes): we should be able to just capture a reference
# to 'self' here instead, but this function is itself pickled # to 'self' here instead, but this function is itself pickled
@ -222,15 +223,27 @@ class SerializationContext:
return RayActorError() return RayActorError()
elif error_type == ErrorType.Value("TASK_CANCELLED"): elif error_type == ErrorType.Value("TASK_CANCELLED"):
return TaskCancelledError() return TaskCancelledError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"): elif error_type == ErrorType.Value("OBJECT_LOST"):
return ObjectLostError(object_ref.hex(), return ObjectLostError(object_ref.hex(),
object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OBJECT_DELETED"):
return ReferenceCountingAssertionError(
object_ref.hex(), object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OWNER_DIED"):
return OwnerDiedError(object_ref.hex(),
object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return ObjectReconstructionFailedError(
object_ref.hex(), object_ref.owner_address(),
object_ref.call_site()) object_ref.call_site())
elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"): elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"):
return RuntimeEnvSetupError() return RuntimeEnvSetupError()
else: else:
assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \ return RaySystemError("Unrecognized error type " +
"Tried to get object that has been promoted to plasma." str(error_type))
assert False, "Unrecognized error type " + str(error_type)
elif data: elif data:
raise ValueError("non-null object should always have metadata") raise ValueError("non-null object should always have metadata")
else: else:

View file

@ -16,6 +16,7 @@ import ray.cluster_utils
import ray._private.profiling as profiling import ray._private.profiling as profiling
from ray._private.test_utils import (client_test_enabled, from ray._private.test_utils import (client_test_enabled,
RayTestTimeoutException, SignalActor) RayTestTimeoutException, SignalActor)
from ray.exceptions import ReferenceCountingAssertionError
if client_test_enabled(): if client_test_enabled():
from ray.util.client import ray from ray.util.client import ray
@ -44,7 +45,7 @@ def test_internal_free(shutdown_only):
obj_ref = sampler.sample.remote() obj_ref = sampler.sample.remote()
ray.get(obj_ref) ray.get(obj_ref)
ray.internal.free(obj_ref) ray.internal.free(obj_ref)
with pytest.raises(Exception): with pytest.raises(ReferenceCountingAssertionError):
ray.get(obj_ref) ray.get(obj_ref)
# Free deletes big objects from plasma store. # Free deletes big objects from plasma store.
@ -52,7 +53,7 @@ def test_internal_free(shutdown_only):
ray.get(big_id) ray.get(big_id)
ray.internal.free(big_id) ray.internal.free(big_id)
time.sleep(1) # wait for delete RPC to propagate time.sleep(1) # wait for delete RPC to propagate
with pytest.raises(Exception): with pytest.raises(ReferenceCountingAssertionError):
ray.get(big_id) ray.get(big_id)

View file

@ -283,7 +283,7 @@ def test_raylet_crash_when_get(ray_start_regular):
thread = threading.Thread(target=sleep_to_kill_raylet) thread = threading.Thread(target=sleep_to_kill_raylet)
thread.start() thread.start()
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(ray.exceptions.ReferenceCountingAssertionError):
ray.get(object_ref) ray.get(object_ref)
thread.join() thread.join()
@ -307,7 +307,7 @@ def test_eviction(ray_start_cluster):
# Evict the object. # Evict the object.
ray.internal.free([obj]) ray.internal.free([obj])
# ray.get throws an exception. # ray.get throws an exception.
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(ray.exceptions.ReferenceCountingAssertionError):
ray.get(obj) ray.get(obj)
@ray.remote @ray.remote

View file

@ -108,10 +108,10 @@ def test_reconstruction_cached_dependency(ray_start_cluster,
if reconstruction_enabled: if reconstruction_enabled:
ray.get(dependent_task.remote(obj)) ray.get(dependent_task.remote(obj))
else: else:
with pytest.raises(ray.exceptions.RayTaskError) as e: with pytest.raises(ray.exceptions.RayTaskError):
ray.get(dependent_task.remote(obj)) ray.get(dependent_task.remote(obj))
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(ray.exceptions.ObjectLostError):
raise e.as_instanceof_cause() ray.get(obj)
@pytest.mark.skipif( @pytest.mark.skipif(
@ -138,8 +138,6 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled):
# Node to place the initial object. # Node to place the initial object.
node_to_kill = cluster.add_node( node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
cluster.add_node(
num_cpus=1, resources={"node2": 1}, object_store_memory=10**8)
cluster.wait_for_nodes() cluster.wait_for_nodes()
@ray.remote(max_retries=1 if reconstruction_enabled else 0) @ray.remote(max_retries=1 if reconstruction_enabled else 0)
@ -154,18 +152,33 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled):
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False) cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node( node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
if reconstruction_enabled: if reconstruction_enabled:
ray.get(dependent_task.remote(obj)) ray.get(dependent_task.remote(obj))
else: else:
with pytest.raises(ray.exceptions.RayTaskError) as e: with pytest.raises(ray.exceptions.RayTaskError):
ray.get(dependent_task.remote(obj)) ray.get(dependent_task.remote(obj))
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(ray.exceptions.ObjectLostError):
raise e.as_instanceof_cause() ray.get(obj)
# Losing the object a second time will cause reconstruction to fail because
# we have reached the max task retries.
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
if reconstruction_enabled:
with pytest.raises(ray.exceptions.ObjectReconstructionFailedError):
ray.get(obj)
else:
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(obj)
# TODO(swang): Add a test to check for ObjectReconstructionFailedError if we
# fail to reconstruct a ray.put object.
@pytest.mark.skipif(sys.platform == "win32", reason="Very flaky on Windows.") @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky on Windows.")
@pytest.mark.parametrize("reconstruction_enabled", [False, True]) @pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled):
@ -287,10 +300,79 @@ def test_basic_reconstruction_actor_task(ray_start_cluster,
if reconstruction_enabled: if reconstruction_enabled:
ray.get(dependent_task.remote(obj)) ray.get(dependent_task.remote(obj))
else: else:
with pytest.raises(ray.exceptions.RayTaskError) as e: with pytest.raises(ray.exceptions.RayTaskError):
ray.get(dependent_task.remote(obj)) ray.get(dependent_task.remote(obj))
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(ray.exceptions.ObjectLostError):
raise e.as_instanceof_cause() ray.get(obj)
# Make sure the actor handle is still usable.
pid = ray.get(a.pid.remote())
@pytest.mark.skipif(sys.platform == "win32", reason="Very flaky on Windows.")
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction_actor_lineage_disabled(ray_start_cluster,
reconstruction_enabled):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"object_timeout_milliseconds": 200,
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = False
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_system_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 2}, object_store_memory=10**8)
cluster.add_node(
num_cpus=1, resources={"node2": 1}, object_store_memory=10**8)
cluster.wait_for_nodes()
# Actor can be restarted but its outputs cannot be reconstructed.
@ray.remote(max_restarts=-1, resources={"node1": 1}, num_cpus=0)
class Actor:
def __init__(self):
pass
def large_object(self):
return np.zeros(10**7, dtype=np.uint8)
def pid(self):
return os.getpid()
@ray.remote
def dependent_task(x):
return
a = Actor.remote()
pid = ray.get(a.pid.remote())
obj = a.large_object.remote()
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
# Workaround to kill the actor process too since there is a bug where the
# actor's plasma client hangs after the plasma store has exited.
os.kill(pid, SIGKILL)
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1, resources={"node1": 2}, object_store_memory=10**8)
wait_for_pid_to_exit(pid)
if reconstruction_enabled:
with pytest.raises(ray.exceptions.ObjectReconstructionFailedError):
ray.get(obj)
else:
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(obj)
# Make sure the actor handle is still usable. # Make sure the actor handle is still usable.
pid = ray.get(a.pid.remote()) pid = ray.get(a.pid.remote())
@ -369,12 +451,13 @@ def test_basic_reconstruction_actor_constructor(ray_start_cluster,
if reconstruction_enabled: if reconstruction_enabled:
ray.get(a.dependent_task.remote(obj)) ray.get(a.dependent_task.remote(obj))
else: else:
with pytest.raises(ray.exceptions.RayActorError) as e: with pytest.raises(ray.exceptions.RayActorError) as exc_info:
x = a.dependent_task.remote(obj) x = a.dependent_task.remote(obj)
print(x) print(x)
ray.get(x) ray.get(x)
with pytest.raises(ray.exceptions.ObjectLostError): exc = str(exc_info.value)
raise e.get_creation_task_error() assert "arguments" in exc
assert "ObjectLostError" in exc
@pytest.mark.skip(reason="This hangs due to a deadlock in admission control.") @pytest.mark.skip(reason="This hangs due to a deadlock in admission control.")
@ -424,6 +507,23 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
for obj in downstream: for obj in downstream:
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
if reconstruction_enabled:
for obj in downstream:
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
else:
with pytest.raises(ray.exceptions.RayTaskError):
for obj in downstream:
ray.get(
dependent_task.options(resources={
"node1": 1
}).remote(obj))
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(obj)
cluster.remove_node(node_to_kill, allow_graceful=False) cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node( cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
@ -432,14 +532,9 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
for obj in downstream: for obj in downstream:
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
else: else:
with pytest.raises(ray.exceptions.RayTaskError) as e:
for obj in downstream: for obj in downstream:
ray.get(
dependent_task.options(resources={
"node1": 1
}).remote(obj))
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(ray.exceptions.ObjectLostError):
raise e.as_instanceof_cause() ray.get(obj)
@pytest.mark.skip(reason="This hangs due to a deadlock in admission control.") @pytest.mark.skip(reason="This hangs due to a deadlock in admission control.")
@ -488,10 +583,10 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled):
if reconstruction_enabled: if reconstruction_enabled:
ray.get(dependent_task.remote(obj)) ray.get(dependent_task.remote(obj))
else: else:
with pytest.raises(ray.exceptions.RayTaskError) as e: with pytest.raises(ray.exceptions.RayTaskError):
ray.get(dependent_task.remote(obj)) ray.get(dependent_task.remote(obj))
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(ray.exceptions.ObjectLostError):
raise e.as_instanceof_cause() ray.get(obj)
@pytest.mark.skip(reason="This hangs due to a deadlock in admission control.") @pytest.mark.skip(reason="This hangs due to a deadlock in admission control.")

View file

@ -415,8 +415,11 @@ def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put,
try: try:
assert ray.get(tail_oid) is None assert ray.get(tail_oid) is None
assert not failure assert not failure
# TODO(edoakes): this should raise WorkerError. except ray.exceptions.OwnerDiedError:
except ray.exceptions.ObjectLostError: # There is only 1 core, so the same worker will execute all `recursive`
# tasks. Therefore, if we kill the worker during the last task, its
# owner (the worker that executed the second-to-last task) will also
# have died.
assert failure assert failure
# Reference should be gone, check that array gets evicted. # Reference should be gone, check that array gets evicted.
@ -494,7 +497,11 @@ def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put,
return return
@ray.remote @ray.remote
def launch_pending_task(ref, signal): class Submitter:
def __init__(self):
pass
def launch_pending_task(self, ref, signal):
return child.remote(ref[0], signal.wait.remote()) return child.remote(ref[0], signal.wait.remote())
signal = SignalActor.remote() signal = SignalActor.remote()
@ -502,7 +509,9 @@ def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put,
# Test that the reference held by the actor isn't evicted. # Test that the reference held by the actor isn't evicted.
array_oid = put_object( array_oid = put_object(
np.zeros(20 * 1024 * 1024, dtype=np.uint8), use_ray_put) np.zeros(20 * 1024 * 1024, dtype=np.uint8), use_ray_put)
child_return_id = ray.get(launch_pending_task.remote([array_oid], signal)) s = Submitter.remote()
child_return_id = ray.get(
s.launch_pending_task.remote([array_oid], signal))
# Remove the local reference. # Remove the local reference.
array_oid_bytes = array_oid.binary() array_oid_bytes = array_oid.binary()
@ -515,7 +524,7 @@ def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put,
try: try:
ray.get(child_return_id) ray.get(child_return_id)
assert not failure assert not failure
except (ray.exceptions.WorkerCrashedError, ray.exceptions.ObjectLostError): except ray.exceptions.WorkerCrashedError:
assert failure assert failure
del child_return_id del child_return_id

View file

@ -91,12 +91,15 @@ def test_recursively_nest_ids(one_worker_100MiB, use_ray_put, failure):
# Fulfill the dependency, causing the tail task to finish. # Fulfill the dependency, causing the tail task to finish.
ray.get(signal.send.remote()) ray.get(signal.send.remote())
try: if not failure:
ray.get(tail_oid)
else:
# There is only 1 core, so the same worker will execute all `recursive`
# tasks. Therefore, if we kill the worker during the last task, its
# owner (the worker that executed the second-to-last task) will also
# have died.
with pytest.raises(ray.exceptions.OwnerDiedError):
ray.get(tail_oid) ray.get(tail_oid)
assert not failure
# TODO(edoakes): this should raise WorkerError.
except ray.exceptions.ObjectLostError:
assert failure
# Reference should be gone, check that array gets evicted. # Reference should be gone, check that array gets evicted.
_fill_object_store_and_get(array_oid_bytes, succeed=False) _fill_object_store_and_get(array_oid_bytes, succeed=False)
@ -233,8 +236,11 @@ def test_recursively_pass_returned_object_ref(one_worker_100MiB, use_ray_put,
ray.get(outer_oid) ray.get(outer_oid)
_fill_object_store_and_get(inner_oid) _fill_object_store_and_get(inner_oid)
assert not failure assert not failure
# TODO(edoakes): this should raise WorkerError. except ray.exceptions.OwnerDiedError:
except ray.exceptions.ObjectLostError: # There is only 1 core, so the same worker will execute all `recursive`
# tasks. Therefore, if we kill the worker during the last task, its
# owner (the worker that executed the second-to-last task) will also
# have died.
assert failure assert failure
inner_oid_bytes = inner_oid.binary() inner_oid_bytes = inner_oid.binary()
@ -286,6 +292,10 @@ def test_recursively_return_borrowed_object_ref(one_worker_100MiB, use_ray_put,
# Reference should be gone, check that returned ID gets evicted. # Reference should be gone, check that returned ID gets evicted.
_fill_object_store_and_get(final_oid_bytes, succeed=False) _fill_object_store_and_get(final_oid_bytes, succeed=False)
if failure:
with pytest.raises(ray.exceptions.OwnerDiedError):
ray.get(final_oid)
@pytest.mark.parametrize("failure", [False, True]) @pytest.mark.parametrize("failure", [False, True])
def test_borrowed_id_failure(one_worker_100MiB, failure): def test_borrowed_id_failure(one_worker_100MiB, failure):
@ -314,7 +324,8 @@ def test_borrowed_id_failure(one_worker_100MiB, failure):
def resolve_ref(self): def resolve_ref(self):
assert self.ref is not None assert self.ref is not None
if failure: if failure:
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(
ray.exceptions.ReferenceCountingAssertionError):
ray.get(self.ref) ray.get(self.ref)
else: else:
ray.get(self.ref) ray.get(self.ref)

View file

@ -174,9 +174,9 @@ ZeroDivisionError: division by zero"""
def test_dep_failure(ray_start_regular): def test_dep_failure(ray_start_regular):
"""Test the stacktrace genereated due to dependency failures.""" """Test the stacktrace genereated due to dependency failures."""
expected_output = """ray::f() (pid=XXX, ip=YYY) # noqa expected_output = """ray::f() (pid=XXX, ip=YYY) # noqa
Some of the input arguments for this task could not be computed: At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::a() (pid=XXX, ip=YYY) ray.exceptions.RayTaskError: ray::a() (pid=XXX, ip=YYY)
Some of the input arguments for this task could not be computed: At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::b() (pid=XXX, ip=YYY) ray.exceptions.RayTaskError: ray::b() (pid=XXX, ip=YYY)
File "FILE", line ZZ, in b File "FILE", line ZZ, in b
raise ValueError("FILE") raise ValueError("FILE")

View file

@ -706,8 +706,10 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
object_recovery_manager_ = std::make_unique<ObjectRecoveryManager>( object_recovery_manager_ = std::make_unique<ObjectRecoveryManager>(
rpc_address_, raylet_client_factory, local_raylet_client_, object_lookup_fn, rpc_address_, raylet_client_factory, local_raylet_client_, object_lookup_fn,
task_manager_, reference_counter_, memory_store_, task_manager_, reference_counter_, memory_store_,
[this](const ObjectID &object_id, bool pin_object) { [this](const ObjectID &object_id, rpc::ErrorType reason, bool pin_object) {
RAY_CHECK_OK(Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), RAY_LOG(DEBUG) << "Failed to recover object " << object_id << " due to "
<< rpc::ErrorType_Name(reason);
RAY_CHECK_OK(Put(RayObject(reason),
/*contained_object_ids=*/{}, object_id, /*contained_object_ids=*/{}, object_id,
/*pin_object=*/pin_object)); /*pin_object=*/pin_object));
}, },
@ -1458,8 +1460,7 @@ Status CoreWorker::Delete(const std::vector<ObjectID> &object_ids, bool local_on
// no longer reachable. // no longer reachable.
memory_store_->Delete(object_ids); memory_store_->Delete(object_ids);
for (const auto &object_id : object_ids) { for (const auto &object_id : object_ids) {
RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_DELETED), object_id));
object_id));
} }
// We only delete from plasma, which avoids hangs (issue #7105). In-memory // We only delete from plasma, which avoids hangs (issue #7105). In-memory

View file

@ -45,14 +45,18 @@ void FutureResolver::ProcessResolvedObject(const ObjectID &object_id,
<< " that was deserialized: " << status.ToString(); << " that was deserialized: " << status.ToString();
} }
if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) { if (!status.ok()) {
// The owner is gone or the owner replied that the object has gone // The owner is unreachable. Store an error so that an exception will be
// out of scope (this is an edge case in the distributed ref counting
// protocol where a borrower dies before it can notify the owner of
// another borrower). Store an error so that an exception will be
// thrown immediately when the worker tries to get the value. // thrown immediately when the worker tries to get the value.
RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OWNER_DIED), object_id));
object_id)); } else if (reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) {
// The owner replied that the object has gone out of scope (this is an edge
// case in the distributed ref counting protocol where a borrower dies
// before it can notify the owner of another borrower). Store an error so
// that an exception will be thrown immediately when the worker tries to
// get the value.
RAY_UNUSED(
in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_DELETED), object_id));
} else if (reply.status() == rpc::GetObjectStatusReply::CREATED) { } else if (reply.status() == rpc::GetObjectStatusReply::CREATED) {
// The object is either an indicator that the object is in Plasma, or // The object is either an indicator that the object is in Plasma, or
// the object has been returned directly in the reply. In either // the object has been returned directly in the reply. In either

View file

@ -34,8 +34,7 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
if (!owned_by_us) { if (!owned_by_us) {
RAY_LOG(DEBUG) << "Reconstruction for borrowed objects (" << object_id RAY_LOG(DEBUG) << "Reconstruction for borrowed objects (" << object_id
<< ") is not supported"; << ") is not supported";
reconstruction_failure_callback_(object_id, /*pin_object=*/false); return false;
return true;
} }
bool already_pending_recovery = true; bool already_pending_recovery = true;
@ -81,7 +80,9 @@ void ObjectRecoveryManager::PinOrReconstructObject(
// There are no more copies to pin, try to reconstruct the object. // There are no more copies to pin, try to reconstruct the object.
ReconstructObject(object_id); ReconstructObject(object_id);
} else { } else {
reconstruction_failure_callback_(object_id, /*pin_object=*/true); // All copies lost, and lineage reconstruction is disabled.
recovery_failure_callback_(object_id, rpc::ErrorType::OBJECT_LOST,
/*pin_object=*/true);
} }
} }
@ -143,12 +144,14 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
RAY_LOG(INFO) << "Failed to reconstruct object " << dep << ": " RAY_LOG(INFO) << "Failed to reconstruct object " << dep << ": "
<< status.message(); << status.message();
// We do not pin the dependency because we may not be the owner. // We do not pin the dependency because we may not be the owner.
reconstruction_failure_callback_(dep, /*pin_object=*/false); recovery_failure_callback_(dep, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE,
/*pin_object=*/false);
} }
} }
} else { } else {
RAY_LOG(INFO) << "Failed to reconstruct object " << object_id; RAY_LOG(INFO) << "Failed to reconstruct object " << object_id;
reconstruction_failure_callback_(object_id, /*pin_object=*/true); recovery_failure_callback_(object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE,
/*pin_object=*/true);
} }
} }

View file

@ -33,6 +33,11 @@ typedef std::function<void(const ObjectID &object_id,
const std::vector<rpc::Address> &raylet_locations)> const std::vector<rpc::Address> &raylet_locations)>
ObjectLookupCallback; ObjectLookupCallback;
// A callback for if we fail to recover an object.
typedef std::function<void(const ObjectID &object_id, rpc::ErrorType reason,
bool pin_object)>
ObjectRecoveryFailureCallback;
class ObjectRecoveryManager { class ObjectRecoveryManager {
public: public:
ObjectRecoveryManager(const rpc::Address &rpc_address, ObjectRecoveryManager(const rpc::Address &rpc_address,
@ -44,8 +49,7 @@ class ObjectRecoveryManager {
std::shared_ptr<TaskResubmissionInterface> task_resubmitter, std::shared_ptr<TaskResubmissionInterface> task_resubmitter,
std::shared_ptr<ReferenceCounter> reference_counter, std::shared_ptr<ReferenceCounter> reference_counter,
std::shared_ptr<CoreWorkerMemoryStore> in_memory_store, std::shared_ptr<CoreWorkerMemoryStore> in_memory_store,
std::function<void(const ObjectID &object_id, bool pin_object)> const ObjectRecoveryFailureCallback &recovery_failure_callback,
reconstruction_failure_callback,
bool lineage_reconstruction_enabled) bool lineage_reconstruction_enabled)
: task_resubmitter_(task_resubmitter), : task_resubmitter_(task_resubmitter),
reference_counter_(reference_counter), reference_counter_(reference_counter),
@ -54,7 +58,7 @@ class ObjectRecoveryManager {
local_object_pinning_client_(local_object_pinning_client), local_object_pinning_client_(local_object_pinning_client),
object_lookup_(object_lookup), object_lookup_(object_lookup),
in_memory_store_(in_memory_store), in_memory_store_(in_memory_store),
reconstruction_failure_callback_(reconstruction_failure_callback), recovery_failure_callback_(recovery_failure_callback),
lineage_reconstruction_enabled_(lineage_reconstruction_enabled) {} lineage_reconstruction_enabled_(lineage_reconstruction_enabled) {}
/// Recover an object that was stored in plasma. This will only succeed for /// Recover an object that was stored in plasma. This will only succeed for
@ -127,8 +131,7 @@ class ObjectRecoveryManager {
std::shared_ptr<CoreWorkerMemoryStore> in_memory_store_; std::shared_ptr<CoreWorkerMemoryStore> in_memory_store_;
/// Callback to call if recovery fails. /// Callback to call if recovery fails.
const std::function<void(const ObjectID &object_id, bool pin_object)> const ObjectRecoveryFailureCallback recovery_failure_callback_;
reconstruction_failure_callback_;
/// Whether lineage reconstruction is enabled. If disabled, then we will try /// Whether lineage reconstruction is enabled. If disabled, then we will try
/// to pin new copies for a lost object, but we will never reconstruct it /// to pin new copies for a lost object, but we will never reconstruct it

View file

@ -999,7 +999,7 @@ absl::optional<absl::flat_hash_set<NodeID>> ReferenceCounter::GetObjectLocations
absl::MutexLock lock(&mutex_); absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id); auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) { if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Tried to get the object locations for an object " << object_id RAY_LOG(DEBUG) << "Tried to get the object locations for an object " << object_id
<< " that doesn't exist in the reference table"; << " that doesn't exist in the reference table";
return absl::nullopt; return absl::nullopt;
} }
@ -1151,9 +1151,13 @@ Status ReferenceCounter::FillObjectInformation(
absl::MutexLock lock(&mutex_); absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id); auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) { if (it == object_id_refs_.end()) {
return Status::ObjectNotFound("Object " + object_id.Hex() + " not found"); RAY_LOG(WARNING) << "Object locations requested for " << object_id
} << ", but ref already removed. This may be a bug in the distributed "
"reference counting protocol.";
object_info->set_ref_removed(true);
} else {
FillObjectInformationInternal(it, object_info); FillObjectInformationInternal(it, object_info);
}
return Status::OK(); return Status::OK();
} }
@ -1173,10 +1177,17 @@ void ReferenceCounter::PublishObjectLocationSnapshot(const ObjectID &object_id)
absl::MutexLock lock(&mutex_); absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id); auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) { if (it == object_id_refs_.end()) {
RAY_LOG(DEBUG) << "Tried to register a location subscriber for an object " RAY_LOG(WARNING) << "Object locations requested for " << object_id
<< object_id << " that doesn't exist in the reference table." << ", but ref already removed. This may be a bug in the distributed "
<< " The object has probably already been freed."; "reference counting protocol.";
// Consider the object is already freed, and not subscribeable. // First let subscribers handle this error.
rpc::PubMessage pub_message;
pub_message.set_key_id(object_id.Binary());
pub_message.set_channel_type(rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL);
pub_message.mutable_worker_object_locations_message()->set_ref_removed(true);
object_info_publisher_->Publish(rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL,
pub_message, object_id.Binary());
// Then, publish a failure to subscribers since this object is unreachable.
object_info_publisher_->PublishFailure( object_info_publisher_->PublishFailure(
rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL, object_id.Binary()); rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL, object_id.Binary());
return; return;

View file

@ -105,9 +105,9 @@ class MockObjectDirectory {
std::unordered_map<ObjectID, std::vector<rpc::Address>> locations; std::unordered_map<ObjectID, std::vector<rpc::Address>> locations;
}; };
class ObjectRecoveryManagerTest : public ::testing::Test { class ObjectRecoveryManagerTestBase : public ::testing::Test {
public: public:
ObjectRecoveryManagerTest() ObjectRecoveryManagerTestBase(bool lineage_enabled)
: local_raylet_id_(NodeID::FromRandom()), : local_raylet_id_(NodeID::FromRandom()),
publisher_(std::make_shared<mock_pubsub::MockPublisher>()), publisher_(std::make_shared<mock_pubsub::MockPublisher>()),
subscriber_(std::make_shared<mock_pubsub::MockSubscriber>()), subscriber_(std::make_shared<mock_pubsub::MockSubscriber>()),
@ -117,7 +117,7 @@ class ObjectRecoveryManagerTest : public ::testing::Test {
task_resubmitter_(std::make_shared<MockTaskResubmitter>()), task_resubmitter_(std::make_shared<MockTaskResubmitter>()),
ref_counter_(std::make_shared<ReferenceCounter>( ref_counter_(std::make_shared<ReferenceCounter>(
rpc::Address(), publisher_.get(), subscriber_.get(), rpc::Address(), publisher_.get(), subscriber_.get(),
/*lineage_pinning_enabled=*/true)), /*lineage_pinning_enabled=*/lineage_enabled)),
manager_(rpc::Address(), manager_(rpc::Address(),
[&](const std::string &ip, int port) { return raylet_client_; }, [&](const std::string &ip, int port) { return raylet_client_; },
raylet_client_, raylet_client_,
@ -126,9 +126,9 @@ class ObjectRecoveryManagerTest : public ::testing::Test {
return Status::OK(); return Status::OK();
}, },
task_resubmitter_, ref_counter_, memory_store_, task_resubmitter_, ref_counter_, memory_store_,
[&](const ObjectID &object_id, bool pin_object) { [&](const ObjectID &object_id, rpc::ErrorType reason, bool pin_object) {
RAY_CHECK(failed_reconstructions_.count(object_id) == 0); RAY_CHECK(failed_reconstructions_.count(object_id) == 0);
failed_reconstructions_[object_id] = pin_object; failed_reconstructions_[object_id] = reason;
std::string meta = std::string meta =
std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA)); std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
@ -140,10 +140,10 @@ class ObjectRecoveryManagerTest : public ::testing::Test {
std::vector<rpc::ObjectReference>()); std::vector<rpc::ObjectReference>());
RAY_CHECK(memory_store_->Put(data, object_id)); RAY_CHECK(memory_store_->Put(data, object_id));
}, },
/*lineage_reconstruction_enabled=*/true) {} /*lineage_reconstruction_enabled=*/lineage_enabled) {}
NodeID local_raylet_id_; NodeID local_raylet_id_;
std::unordered_map<ObjectID, bool> failed_reconstructions_; std::unordered_map<ObjectID, rpc::ErrorType> failed_reconstructions_;
std::shared_ptr<mock_pubsub::MockPublisher> publisher_; std::shared_ptr<mock_pubsub::MockPublisher> publisher_;
std::shared_ptr<mock_pubsub::MockSubscriber> subscriber_; std::shared_ptr<mock_pubsub::MockSubscriber> subscriber_;
@ -155,21 +155,30 @@ class ObjectRecoveryManagerTest : public ::testing::Test {
ObjectRecoveryManager manager_; ObjectRecoveryManager manager_;
}; };
TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { class ObjectRecoveryLineageDisabledTest : public ObjectRecoveryManagerTestBase {
public:
ObjectRecoveryLineageDisabledTest() : ObjectRecoveryManagerTestBase(false) {}
};
class ObjectRecoveryManagerTest : public ObjectRecoveryManagerTestBase {
public:
ObjectRecoveryManagerTest() : ObjectRecoveryManagerTestBase(true) {}
};
TEST_F(ObjectRecoveryLineageDisabledTest, TestNoReconstruction) {
// Lineage recording disabled. // Lineage recording disabled.
ObjectID object_id = ObjectID::FromRandom(); ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(failed_reconstructions_.empty()); ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_TRUE(object_directory_->Flush() == 1); ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1); ASSERT_EQ(failed_reconstructions_[object_id], rpc::ErrorType::OBJECT_LOST);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
// Borrowed object. // Borrowed object.
object_id = ObjectID::FromRandom(); object_id = ObjectID::FromRandom();
ref_counter_->AddLocalReference(object_id, ""); ref_counter_->AddLocalReference(object_id, "");
ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_FALSE(manager_.RecoverObject(object_id));
ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
// Ref went out of scope. // Ref went out of scope.
@ -179,6 +188,19 @@ TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) {
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
} }
TEST_F(ObjectRecoveryLineageDisabledTest, TestPinNewCopy) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
std::vector<rpc::Address> addresses({rpc::Address()});
object_directory_->SetLocations(object_id, addresses);
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(raylet_client_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
}
TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) {
ObjectID object_id = ObjectID::FromRandom(); ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
@ -256,6 +278,36 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) {
} }
} }
TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_[object_id] ==
rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
}
TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) {
ObjectID dep_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(dep_id, {}, rpc::Address(), "", 0, true);
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
task_resubmitter_->AddTask(object_id.TaskId(), {dep_id});
RAY_LOG(INFO) << object_id;
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_EQ(object_directory_->Flush(), 1);
// Trigger callback for dep ID.
ASSERT_EQ(object_directory_->Flush(), 1);
ASSERT_EQ(failed_reconstructions_[dep_id], rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE);
ASSERT_EQ(failed_reconstructions_.count(object_id), 0);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 1);
}
} // namespace core } // namespace core
} // namespace ray } // namespace ray

View file

@ -269,14 +269,21 @@ ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations(
auto msg_published_callback = [this, object_id](const rpc::PubMessage &pub_message) { auto msg_published_callback = [this, object_id](const rpc::PubMessage &pub_message) {
RAY_CHECK(pub_message.has_worker_object_locations_message()); RAY_CHECK(pub_message.has_worker_object_locations_message());
const auto &location_info = pub_message.worker_object_locations_message(); const auto &location_info = pub_message.worker_object_locations_message();
ObjectLocationSubscriptionCallback(location_info, object_id, ObjectLocationSubscriptionCallback(
/*location_lookup_failed*/ false); location_info, object_id,
/*location_lookup_failed*/ !location_info.ref_removed());
if (location_info.ref_removed()) {
mark_as_failed_(object_id, rpc::ErrorType::OBJECT_DELETED);
}
}; };
auto failure_callback = [this, owner_address](const std::string &object_id_binary) { auto failure_callback = [this, owner_address](const std::string &object_id_binary) {
const auto object_id = ObjectID::FromBinary(object_id_binary); const auto object_id = ObjectID::FromBinary(object_id_binary);
mark_as_failed_(object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE); mark_as_failed_(object_id, rpc::ErrorType::OWNER_DIED);
rpc::WorkerObjectLocationsPubMessage location_info; rpc::WorkerObjectLocationsPubMessage location_info;
// Location lookup can fail if the owner is reachable but no longer has a
// record of this ObjectRef, most likely due to an issue with the
// distributed reference counting protocol.
ObjectLocationSubscriptionCallback(location_info, object_id, ObjectLocationSubscriptionCallback(location_info, object_id,
/*location_lookup_failed*/ true); /*location_lookup_failed*/ true);
}; };
@ -399,7 +406,13 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
if (!status.ok()) { if (!status.ok()) {
RAY_LOG(ERROR) << "Worker " << worker_id << " failed to get the location for " RAY_LOG(ERROR) << "Worker " << worker_id << " failed to get the location for "
<< object_id << status.ToString(); << object_id << status.ToString();
mark_as_failed_(object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE); mark_as_failed_(object_id, rpc::ErrorType::OWNER_DIED);
} else if (reply.object_location_info().ref_removed()) {
RAY_LOG(ERROR)
<< "Worker " << worker_id << " failed to get the location for "
<< object_id
<< ", object already released by distributed reference counting protocol";
mark_as_failed_(object_id, rpc::ErrorType::OBJECT_DELETED);
} else { } else {
UpdateObjectLocations(reply.object_location_info(), gcs_client_, &node_ids, UpdateObjectLocations(reply.object_location_info(), gcs_client_, &node_ids,
&spilled_url, &spilled_node_id, &object_size); &spilled_url, &spilled_node_id, &object_size);

View file

@ -108,14 +108,12 @@ enum ErrorType {
// Indicates that a task failed because the actor died unexpectedly before // Indicates that a task failed because the actor died unexpectedly before
// finishing it. // finishing it.
ACTOR_DIED = 1; ACTOR_DIED = 1;
// Indicates that an object is lost and cannot be restarted. // This object was lost from distributed memory to a node failure or system
// Note, this currently only happens to actor objects. When the actor's // error. We use this error when lineage reconstruction is enabled, but the
// state is already after the object's creating task, the actor cannot // object cannot be reconstructed but the lineage is not available..
// re-run the task. // TODO(swang): We may want to break down this error type further, e.g.,
// TODO(hchen): we may want to reuse this error type for more cases. E.g., // object's lineage was evicted or object depended on an actor task that
// 1) A object that was put by the driver. // can't be reconstructed.
// 2) The object's creating task is already cleaned up from GCS (this
// currently crashes raylet).
OBJECT_UNRECONSTRUCTABLE = 2; OBJECT_UNRECONSTRUCTABLE = 2;
// Indicates that a task failed due to user code failure. // Indicates that a task failed due to user code failure.
TASK_EXECUTION_EXCEPTION = 3; TASK_EXECUTION_EXCEPTION = 3;
@ -129,6 +127,16 @@ enum ErrorType {
ACTOR_CREATION_FAILED = 6; ACTOR_CREATION_FAILED = 6;
// Indicates that the runtime_env failed to be created. // Indicates that the runtime_env failed to be created.
RUNTIME_ENV_SETUP_FAILED = 7; RUNTIME_ENV_SETUP_FAILED = 7;
// This object was lost from distributed memory to a node failure or system
// error. We use this error when lineage reconstruction is disabled.
OBJECT_LOST = 8;
// This object is unreachable because its owner has died.
OWNER_DIED = 9;
// This object is unreachable because the owner is alive but no longer has a
// record of this object, meaning that the physical object has likely already
// been deleted from distributed memory. This can happen in distributed
// reference counting, due to a bug or corner case.
OBJECT_DELETED = 10;
} }
/// The task exception encapsulates all information about task /// The task exception encapsulates all information about task

View file

@ -75,6 +75,11 @@ message WorkerObjectLocationsPubMessage {
// The ID of the node that stores the primary copy in plasma. // The ID of the node that stores the primary copy in plasma.
// This could be Nil if the object has been evicted or inlined. // This could be Nil if the object has been evicted or inlined.
bytes primary_node_id = 6; bytes primary_node_id = 6;
// If this is set, then the owner is alive but no longer has an entry for
// this reference. This can happen if there is a bug in the distributed ref
// counting protocol that causes the object to be released while there are
// still references.
bool ref_removed = 7;
} }
/// Indicating the subscriber needs to handle failure callback. /// Indicating the subscriber needs to handle failure callback.

View file

@ -1710,6 +1710,10 @@ void NodeManager::HandleCancelWorkerLease(const rpc::CancelWorkerLeaseRequest &r
void NodeManager::MarkObjectsAsFailed( void NodeManager::MarkObjectsAsFailed(
const ErrorType &error_type, const std::vector<rpc::ObjectReference> objects_to_fail, const ErrorType &error_type, const std::vector<rpc::ObjectReference> objects_to_fail,
const JobID &job_id) { const JobID &job_id) {
// TODO(swang): Ideally we should return the error directly to the client
// that needs this object instead of storing the object in plasma, which is
// not guaranteed to succeed. This avoids hanging the client if plasma is not
// reachable.
const std::string meta = std::to_string(static_cast<int>(error_type)); const std::string meta = std::to_string(static_cast<int>(error_type));
for (const auto &ref : objects_to_fail) { for (const auto &ref : objects_to_fail) {
ObjectID object_id = ObjectID::FromBinary(ref.object_id()); ObjectID object_id = ObjectID::FromBinary(ref.object_id());