[Core] Unrevert "Add retry exception allowlist for user-defined filtering of retryable application-level errors." (#26449)

This reverts commit cf7305a, and unreverts #25896.

This was reverted due to a failing Windows test: #26287

We can merge once the failing Windows test (and all other relevant tests) pass.
This commit is contained in:
Clark Zinzow 2022-08-05 17:07:13 -06:00 committed by GitHub
parent f6d19ac7c0
commit 293452dcba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 360 additions and 73 deletions

2
.gitignore vendored
View file

@ -185,6 +185,8 @@ venv
# Vim
.*.swp
*.swp
.*.swo
*.swo
tags
tags.lock
tags.temp

View file

@ -127,9 +127,10 @@ Status TaskExecutor::ExecuteTask(
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_application_level_error,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute) {
RAY_LOG(DEBUG) << "Execute task type: " << TaskType_Name(task_type)
@ -141,6 +142,10 @@ Status TaskExecutor::ExecuteTask(
auto typed_descriptor = function_descriptor->As<ray::CppFunctionDescriptor>();
std::string func_name = typed_descriptor->FunctionName();
bool cross_lang = !typed_descriptor->Caller().empty();
// TODO(Clark): Support retrying application-level errors for C++.
// TODO(Clark): Support exception allowlist for retrying application-level
// errors for C++.
*is_retryable_error = false;
Status status{};
std::shared_ptr<msgpack::sbuffer> data = nullptr;

View file

@ -83,9 +83,10 @@ class TaskExecutor {
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_application_level_error,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute);

View file

@ -0,0 +1,91 @@
# flake8: noqa
# fmt: off
# __tasks_fault_tolerance_retries_begin__
import numpy as np
import os
import ray
import time
ray.init(ignore_reinit_error=True)
@ray.remote(max_retries=1)
def potentially_fail(failure_probability):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return 0
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except ray.exceptions.WorkerCrashedError:
print('FAILURE')
# __tasks_fault_tolerance_retries_end__
# fmt: on
# fmt: off
# __tasks_fault_tolerance_retries_exception_begin__
import numpy as np
import os
import ray
import time
ray.init(ignore_reinit_error=True)
class RandomError(Exception):
pass
@ray.remote(max_retries=1, retry_exceptions=True)
def potentially_fail(failure_probability):
if failure_probability < 0 or failure_probability > 1:
raise ValueError(
"failure_probability must be between 0 and 1, but got: "
f"{failure_probability}"
)
time.sleep(0.2)
if np.random.random() < failure_probability:
raise RandomError("Failed!")
return 0
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except RandomError:
print('FAILURE')
# Provide the exceptions that we want to retry as an allowlist.
retry_on_exception = potentially_fail.options(retry_exceptions=[RandomError])
try:
# This will fail since we're passing in -1 for the failure_probability,
# which will raise a ValueError in the task and does not match the RandomError
# exception that we provided.
ray.get(retry_on_exception.remote(-1))
except ValueError:
print("FAILED AS EXPECTED")
else:
raise RuntimeError("An exception should be raised so this shouldn't be reached.")
# These will retry on the RandomError exception.
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(retry_on_exception.remote(0.5))
print('SUCCESS')
except RandomError:
print('FAILURE AFTER RETRIES')
# __tasks_fault_tolerance_retries_exception_end__
# fmt: on

View file

@ -1,8 +1,14 @@
.. _task-fault-tolerance:
===============
Fault Tolerance
===============
.. _task-retries:
Retries
=======
When a worker is executing a task, if the worker dies unexpectedly, either
because the process crashed or because the machine failed, Ray will rerun
the task until either the task succeeds or the maximum number of retries is
@ -15,35 +21,40 @@ using :ref:`runtime environments<runtime-environments>`.
You can experiment with this behavior by running the following code.
.. code-block:: python
.. literalinclude:: ../doc_code/tasks_fault_tolerance.py
:language: python
:start-after: __tasks_fault_tolerance_retries_begin__
:end-before: __tasks_fault_tolerance_retries_end__
import numpy as np
import os
import ray
import time
You can also control whether application-level errors are retried, and even **which**
application-level errors are retried, via the ``retry_exceptions`` argument. This is
``False`` by default, so if your application code within the Ray task raises an
exception, this failure will **not** be retried. This is to ensure that Ray is not
retrying non-idempotent tasks when they have partially executed.
However, if your tasks are idempotent, then you can enable application-level error
retries with ``retry_exceptions=True``, or even retry a specific set of
application-level errors (such as a class of exception types that you know to be
transient) by providing an allowlist of exceptions:
ray.init(ignore_reinit_error=True)
.. literalinclude:: ../doc_code/tasks_fault_tolerance.py
:language: python
:start-after: __tasks_fault_tolerance_retries_exception_begin__
:end-before: __tasks_fault_tolerance_retries_exception_end__
@ray.remote(max_retries=1)
def potentially_fail(failure_probability):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return 0
The semantics for each of the potential ``retry_exceptions`` values are as follows:
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except ray.exceptions.WorkerCrashedError:
print('FAILURE')
* ``retry_exceptions=False`` (default): Application-level errors are not retried.
* ``retry_exceptions=True``: All application-level errors are retried.
* ``retry_exceptions=[Exc1, Exc2]``: Application-level errors that are instances of
either ``Exc1`` or ``Exc2`` are retried.
.. _object-reconstruction:
Lineage-based Object Reconstruction
===================================
Ray also implements *lineage reconstruction* to recover task outputs that are
lost from the distributed object store. This can occur during node failures.
Ray will first automatically attempt to recover the value by looking for copies

View file

@ -104,6 +104,13 @@ _common_options = {
}
def issubclass_safe(obj: Any, cls_: type) -> bool:
try:
return issubclass(obj, cls_)
except TypeError:
return False
_task_only_options = {
"max_calls": _counting_option("max_calls", False, default_value=0),
# Normal tasks may be retried on failure this many times.
@ -119,7 +126,18 @@ _task_only_options = {
lambda x: x is None,
"Setting 'object_store_memory' is not implemented for tasks",
),
"retry_exceptions": Option(bool, default_value=False),
"retry_exceptions": Option(
(bool, list, tuple),
lambda x: (
isinstance(x, bool)
or (
isinstance(x, (list, tuple))
and all(issubclass_safe(x_, Exception) for x_ in x)
)
),
"retry_exceptions must be either a boolean or a list of exceptions",
default_value=False,
),
}
_actor_only_options = {

View file

@ -2843,9 +2843,9 @@ def remote(*args, **kwargs):
this actor or task and its children. See
:ref:`runtime-environments` for detailed documentation. This API is
in beta and may change before becoming stable.
retry_exceptions: Only for *remote functions*. This specifies
whether application-level errors should be retried
up to max_retries times.
retry_exceptions: Only for *remote functions*. This specifies whether
application-level errors should be retried up to max_retries times.
This can be a boolean or a list of exceptions that should be retried.
scheduling_strategy: Strategy about how to
schedule a remote function or actor. Possible values are
None: ray will figure out the scheduling strategy to use, it

View file

@ -132,6 +132,7 @@ from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
)
import ray._private.ray_constants as ray_constants
import ray.cloudpickle as ray_pickle
from ray._private.async_compat import sync_to_async, get_new_event_loop
from ray._private.client_mode_hook import disable_client_hook
import ray._private.gcs_utils as gcs_utils
@ -484,6 +485,51 @@ cdef raise_if_dependency_failed(arg):
raise arg
cdef c_bool determine_if_retryable(
Exception e,
const c_string serialized_retry_exception_allowlist,
FunctionDescriptor function_descriptor,
):
"""Determine if the provided exception is retryable, according to the
(possibly null) serialized exception allowlist.
If the serialized exception allowlist is an empty string or is None once
deserialized, the exception is considered retryable and we return True.
This method can raise an exception if:
- Deserialization of exception allowlist fails (TypeError)
- Exception allowlist is not None and not a tuple (AssertionError)
"""
if len(serialized_retry_exception_allowlist) == 0:
# No exception allowlist specified, default to all retryable.
return True
# Deserialize exception allowlist and check that the exception is in the allowlist.
try:
exception_allowlist = ray_pickle.loads(
serialized_retry_exception_allowlist,
)
except TypeError as inner_e:
# Exception allowlist deserialization failed.
msg = (
"Could not deserialize the retry exception allowlist "
f"for task {function_descriptor.repr}. "
"Check "
"https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting " # noqa
"for more information.")
raise TypeError(msg) from inner_e
if exception_allowlist is None:
# No exception allowlist specified, default to all retryable.
return True
# Python API should have converted the list of exceptions to a tuple.
assert isinstance(exception_allowlist, tuple)
# Check that e is in allowlist.
return isinstance(e, exception_allowlist)
cdef execute_task(
CTaskType task_type,
const c_string name,
@ -493,14 +539,15 @@ cdef execute_task(
const c_vector[CObjectReference] &c_arg_refs,
const c_vector[CObjectID] &c_return_ids,
const c_string debugger_breakpoint,
const c_string serialized_retry_exception_allowlist,
c_vector[shared_ptr[CRayObject]] *returns,
c_bool *is_application_level_error,
c_bool *is_retryable_error,
# This parameter is only used for actor creation task to define
# the concurrency groups of this actor.
const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups,
const c_string c_name_of_concurrency_group_to_execute):
is_application_level_error[0] = False
is_retryable_error[0] = False
worker = ray._private.worker.global_worker
manager = worker.function_actor_manager
@ -687,12 +734,24 @@ cdef execute_task(
raise TaskCancelledError(
core_worker.get_current_task_id())
except Exception as e:
is_application_level_error[0] = True
if core_worker.get_current_task_retry_exceptions():
is_retryable_error[0] = determine_if_retryable(
e,
serialized_retry_exception_allowlist,
function_descriptor,
)
if (
is_retryable_error[0]
and core_worker.get_current_task_retry_exceptions()
):
logger.info("Task failed with retryable exception:"
" {}.".format(
core_worker.get_current_task_id()),
exc_info=True)
else:
logger.info("Task failed with unretryable exception:"
" {}.".format(
core_worker.get_current_task_id()),
exc_info=True)
raise e
if c_return_ids.size() == 1:
# If there is only one return specified, we should return
@ -791,9 +850,10 @@ cdef CRayStatus task_execution_handler(
const c_vector[CObjectReference] &c_arg_refs,
const c_vector[CObjectID] &c_return_ids,
const c_string debugger_breakpoint,
const c_string serialized_retry_exception_allowlist,
c_vector[shared_ptr[CRayObject]] *returns,
shared_ptr[LocalMemoryBuffer] &creation_task_exception_pb_bytes,
c_bool *is_application_level_error,
c_bool *is_retryable_error,
const c_vector[CConcurrencyGroup] &defined_concurrency_groups,
const c_string name_of_concurrency_group_to_execute) nogil:
with gil, disable_client_hook():
@ -803,8 +863,10 @@ cdef CRayStatus task_execution_handler(
# it does, that indicates that there was an internal error.
execute_task(task_type, task_name, ray_function, c_resources,
c_args, c_arg_refs, c_return_ids,
debugger_breakpoint, returns,
is_application_level_error,
debugger_breakpoint,
serialized_retry_exception_allowlist,
returns,
is_retryable_error,
defined_concurrency_groups,
name_of_concurrency_group_to_execute)
except Exception as e:
@ -1517,6 +1579,7 @@ cdef class CoreWorker:
resources,
int max_retries,
c_bool retry_exceptions,
retry_exception_allowlist,
scheduling_strategy,
c_string debugger_breakpoint,
c_string serialized_runtime_env_info,
@ -1529,10 +1592,24 @@ cdef class CoreWorker:
c_vector[CObjectReference] return_refs
CSchedulingStrategy c_scheduling_strategy
c_vector[CObjectID] incremented_put_arg_ids
c_string serialized_retry_exception_allowlist
self.python_scheduling_strategy_to_c(
scheduling_strategy, &c_scheduling_strategy)
try:
serialized_retry_exception_allowlist = ray_pickle.dumps(
retry_exception_allowlist,
)
except TypeError as e:
msg = (
"Could not serialize the retry exception allowlist"
f"{retry_exception_allowlist} for task {function_descriptor.repr}. "
"Check "
"https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting " # noqa
"for more information.")
raise TypeError(msg) from e
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
ray_function = CRayFunction(
@ -1551,6 +1628,7 @@ cdef class CoreWorker:
max_retries, retry_exceptions,
c_scheduling_strategy,
debugger_breakpoint,
serialized_retry_exception_allowlist,
)
# These arguments were serialized and put into the local object

View file

@ -111,7 +111,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int max_retries,
c_bool retry_exceptions,
const CSchedulingStrategy &scheduling_strategy,
c_string debugger_breakpoint)
c_string debugger_breakpoint,
c_string serialized_retry_exception_allowlist)
CRayStatus CreateActor(
const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
@ -287,10 +288,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[CObjectReference] &arg_refs,
const c_vector[CObjectID] &return_ids,
const c_string debugger_breakpoint,
const c_string serialized_retry_exception_allowlist,
c_vector[shared_ptr[CRayObject]] *returns,
shared_ptr[LocalMemoryBuffer]
&creation_task_exception_pb_bytes,
c_bool *is_application_level_error,
c_bool *is_retryable_error,
const c_vector[CConcurrencyGroup] &defined_concurrency_groups,
const c_string name_of_concurrency_group_to_execute) nogil
) task_execution_callback

View file

@ -59,6 +59,7 @@ class RemoteFunction:
_max_retries: The number of times this task may be retried
on worker failure.
_retry_exceptions: Whether application-level errors should be retried.
This can be a boolean or a list/tuple of exceptions that should be retried.
_runtime_env: The runtime environment for this task.
_decorator: An optional decorator that should be applied to the remote
function invocation (as opposed to the function execution) before
@ -300,6 +301,11 @@ class RemoteFunction:
num_returns = task_options["num_returns"]
max_retries = task_options["max_retries"]
retry_exceptions = task_options["retry_exceptions"]
if isinstance(retry_exceptions, (list, tuple)):
retry_exception_allowlist = tuple(retry_exceptions)
retry_exceptions = True
else:
retry_exception_allowlist = None
if scheduling_strategy is None or not isinstance(
scheduling_strategy, PlacementGroupSchedulingStrategy
@ -375,6 +381,7 @@ class RemoteFunction:
resources,
max_retries,
retry_exceptions,
retry_exception_allowlist,
scheduling_strategy,
worker.debugger_breakpoint,
serialized_runtime_env_info or "{}",

View file

@ -61,7 +61,6 @@ py_test_module_list(
"test_gcs_pubsub.py",
"test_global_gc.py",
"test_grpc_client_credentials.py",
"test_iter.py",
"test_job.py",
"test_get_locations.py",
"test_global_state.py",
@ -285,6 +284,7 @@ py_test_module_list(
"test_output.py",
"test_out_of_disk_space.py",
"test_failure_4.py",
"test_iter.py",
"test_object_spilling.py",
"test_object_spilling_no_asan.py",
"test_object_spilling_2.py",

View file

@ -106,6 +106,52 @@ def test_retry_application_level_error(ray_start_regular):
ray.get(r3)
class CountError(Exception):
pass
def test_retry_application_level_error_exception_filter(ray_start_regular):
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
@ray.remote(max_retries=1, retry_exceptions=[CountError])
def func(counter):
if counter is None:
raise ValueError()
count = counter.increment.remote()
if ray.get(count) == 1:
raise CountError()
else:
return 2
# Exception that doesn't satisfy the predicate should cause the task to immediately
# fail.
r0 = func.remote(None)
with pytest.raises(ValueError):
ray.get(r0)
# Test against exceptions (CountError) that do satisfy the predicate.
counter1 = Counter.remote()
r1 = func.remote(counter1)
assert ray.get(r1) == 2
counter2 = Counter.remote()
r2 = func.options(max_retries=0).remote(counter2)
with pytest.raises(CountError):
ray.get(r2)
counter3 = Counter.remote()
r3 = func.options(retry_exceptions=False).remote(counter3)
with pytest.raises(CountError):
ray.get(r3)
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_connect_with_disconnected_node(shutdown_only):
config = {

View file

@ -177,6 +177,7 @@ ray.get([f.remote() for _ in range(15)])
assert "Tip:" not in err_str
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_fail_importing_actor(ray_start_regular, error_pubsub):
script = """
import os

View file

@ -324,6 +324,10 @@ bool TaskSpecification::IsSpreadSchedulingStrategy() const {
rpc::SchedulingStrategy::SchedulingStrategyCase::kSpreadSchedulingStrategy;
}
const std::string TaskSpecification::GetSerializedRetryExceptionAllowlist() const {
return message_->serialized_retry_exception_allowlist();
}
// === Below are getter methods specific to actor creation tasks.
ActorID TaskSpecification::ActorCreationId() const {

View file

@ -306,6 +306,9 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
/// Whether this task is an actor task.
bool IsActorTask() const;
// Returns the serialized exception allowlist for this task.
const std::string GetSerializedRetryExceptionAllowlist() const;
// Methods specific to actor creation tasks.
ActorID ActorCreationId() const;

View file

@ -138,11 +138,15 @@ class TaskSpecBuilder {
return *this;
}
TaskSpecBuilder &SetNormalTaskSpec(int max_retries,
bool retry_exceptions,
const rpc::SchedulingStrategy &scheduling_strategy) {
TaskSpecBuilder &SetNormalTaskSpec(
int max_retries,
bool retry_exceptions,
const std::string &serialized_retry_exception_allowlist,
const rpc::SchedulingStrategy &scheduling_strategy) {
message_->set_max_retries(max_retries);
message_->set_retry_exceptions(retry_exceptions);
message_->set_serialized_retry_exception_allowlist(
serialized_retry_exception_allowlist);
message_->mutable_scheduling_strategy()->CopyFrom(scheduling_strategy);
return *this;
}

View file

@ -1567,7 +1567,8 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
int max_retries,
bool retry_exceptions,
const rpc::SchedulingStrategy &scheduling_strategy,
const std::string &debugger_breakpoint) {
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist) {
RAY_CHECK(scheduling_strategy.scheduling_strategy_case() !=
rpc::SchedulingStrategy::SchedulingStrategyCase::SCHEDULING_STRATEGY_NOT_SET);
@ -1601,7 +1602,10 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
debugger_breakpoint,
depth,
task_options.serialized_runtime_env_info);
builder.SetNormalTaskSpec(max_retries, retry_exceptions, scheduling_strategy);
builder.SetNormalTaskSpec(max_retries,
retry_exceptions,
serialized_retry_exception_allowlist,
scheduling_strategy);
TaskSpecification task_spec = builder.Build();
RAY_LOG(DEBUG) << "Submitting normal task " << task_spec.DebugString();
std::vector<rpc::ObjectReference> returned_refs;
@ -2174,7 +2178,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
const std::shared_ptr<ResourceMappingType> &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects,
ReferenceCounter::ReferenceTableProto *borrowed_refs,
bool *is_application_level_error) {
bool *is_retryable_error) {
RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString();
task_queue_length_ -= 1;
num_executed_tasks_ += 1;
@ -2258,9 +2262,10 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
arg_refs,
return_ids,
task_spec.GetDebuggerBreakpoint(),
task_spec.GetSerializedRetryExceptionAllowlist(),
return_objects,
creation_task_exception_pb_bytes,
is_application_level_error,
is_retryable_error,
defined_concurrency_groups,
name_of_concurrency_group_to_execute);
@ -2431,12 +2436,9 @@ std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
}
auto old_id = GetActorId();
SetActorId(actor_id);
bool is_application_level_error;
RAY_UNUSED(ExecuteTask(task_spec,
resource_ids,
&return_objects,
&borrowed_refs,
&is_application_level_error));
bool is_retryable_error;
RAY_UNUSED(ExecuteTask(
task_spec, resource_ids, &return_objects, &borrowed_refs, &is_retryable_error));
SetActorId(old_id);
return returned_refs;
}

View file

@ -458,6 +458,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] debugger_breakpoint breakpoint to drop into for the debugger after this
/// task starts executing, or "" if we do not want to drop into the debugger.
/// should capture parent's placement group implicilty.
/// \param[in] serialized_retry_exception_allowlist A serialized exception list
/// that serves as an allowlist of frontend-language exceptions/errors that should be
/// retried. Default is an empty string, which will be treated as an allow-all in the
/// language worker.
/// \return ObjectRefs returned by this task.
std::vector<rpc::ObjectReference> SubmitTask(
const RayFunction &function,
@ -466,7 +470,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
int max_retries,
bool retry_exceptions,
const rpc::SchedulingStrategy &scheduling_strategy,
const std::string &debugger_breakpoint);
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist = "");
/// Create an actor.
///
@ -933,7 +938,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
const std::shared_ptr<ResourceMappingType> &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects,
ReferenceCounter::ReferenceTableProto *borrowed_refs,
bool *is_application_level_error);
bool *is_retryable_error);
/// Put an object in the local plasma store.
Status PutInLocalPlasmaStore(const RayObject &object,

View file

@ -41,9 +41,10 @@ struct CoreWorkerOptions {
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<RayObject>> *results,
std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_application_level_error,
bool *is_retryable_error,
// The following 2 parameters `defined_concurrency_groups` and
// `name_of_concurrency_group_to_execute` are used for Python
// asyncio actor only.

View file

@ -116,9 +116,10 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<RayObject>> *results,
std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb,
bool *is_application_level_error,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute) {
// These 2 parameters are used for Python only, and Java worker
@ -126,7 +127,9 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env,
RAY_UNUSED(defined_concurrency_groups);
RAY_UNUSED(name_of_concurrency_group_to_execute);
// TODO(jjyao): Support retrying application-level errors for Java
*is_application_level_error = false;
// TODO(Clark): Support exception allowlist for retrying application-level
// errors for Java.
*is_retryable_error = false;
JNIEnv *env = GetJNIEnv();
RAY_CHECK(java_task_executor);

View file

@ -64,7 +64,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
bool ReplyPushTask(Status status = Status::OK(),
bool exit = false,
bool is_application_level_error = false) {
bool is_retryable_error = false) {
if (callbacks.size() == 0) {
return false;
}
@ -73,8 +73,8 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
if (exit) {
reply.set_worker_exiting(true);
}
if (is_application_level_error) {
reply.set_is_application_level_error(true);
if (is_retryable_error) {
reply.set_is_retryable_error(true);
}
callback(status, reply);
callbacks.pop_front();

View file

@ -49,8 +49,8 @@ class MockWorker {
options.node_ip_address = "127.0.0.1";
options.node_manager_port = node_manager_port;
options.raylet_ip_address = "127.0.0.1";
options.task_execution_callback =
std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9);
options.task_execution_callback = std::bind(
&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9, _10);
options.metrics_agent_port = -1;
options.startup_token = startup_token;
CoreWorkerProcess::Initialize(options);
@ -67,6 +67,7 @@ class MockWorker {
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<RayObject>> *results) {
// Note that this doesn't include dummy object id.
const FunctionDescriptor function_descriptor = ray_function.GetFunctionDescriptor();

View file

@ -92,13 +92,13 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
RAY_CHECK(num_returns >= 0);
std::vector<std::shared_ptr<RayObject>> return_objects;
bool is_application_level_error = false;
bool is_retryable_error = false;
auto status = task_handler_(task_spec,
resource_ids,
&return_objects,
reply->mutable_borrowed_refs(),
&is_application_level_error);
reply->set_is_application_level_error(is_application_level_error);
&is_retryable_error);
reply->set_is_retryable_error(is_retryable_error);
bool objects_valid = return_objects.size() == num_returns;
if (objects_valid) {

View file

@ -54,7 +54,7 @@ class CoreWorkerDirectTaskReceiver {
const std::shared_ptr<ResourceMappingType> resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects,
ReferenceCounter::ReferenceTableProto *borrower_refs,
bool *is_application_level_error)>;
bool *is_retryable_error)>;
using OnTaskDone = std::function<Status()>;

View file

@ -585,8 +585,7 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED,
&status));
} else {
if (!task_spec.GetMessage().retry_exceptions() ||
!reply.is_application_level_error() ||
if (!task_spec.GetMessage().retry_exceptions() || !reply.is_retryable_error() ||
!task_finisher_->RetryTaskIfPossible(task_id)) {
task_finisher_->CompletePendingTask(task_id, reply, addr.ToProto());
}

View file

@ -312,14 +312,17 @@ message TaskSpec {
string concurrency_group_name = 24;
// Whether application-level errors (exceptions) should be retried.
bool retry_exceptions = 25;
// A serialized exception list that serves as an allowlist of frontend-language
// exceptions/errors that should be retried.
bytes serialized_retry_exception_allowlist = 26;
// The depth of the task. The driver has depth 0, anything it calls has depth
// 1, etc.
int64 depth = 26;
int64 depth = 27;
// Strategy about how to schedule this task.
SchedulingStrategy scheduling_strategy = 27;
SchedulingStrategy scheduling_strategy = 28;
// A count of the number of times this task has been attempted so far. 0
// means this is the first execution.
uint64 attempt_number = 28;
uint64 attempt_number = 29;
}
message TaskInfoEntry {

View file

@ -125,8 +125,8 @@ message PushTaskReply {
// may now be borrowing. The reference counts also include any new borrowers
// that the worker created by passing a borrowed ID into a nested task.
repeated ObjectReferenceCount borrowed_refs = 4;
// Whether the result contains an application-level error (exception).
bool is_application_level_error = 5;
// Whether the result contains a retryable application-level error.
bool is_retryable_error = 5;
}
message DirectActorCallArgWaitCompleteRequest {

View file

@ -179,7 +179,7 @@ RayTask CreateTask(
}
}
spec_builder.SetNormalTaskSpec(0, false, scheduling_strategy);
spec_builder.SetNormalTaskSpec(0, false, "", scheduling_strategy);
return RayTask(spec_builder.Build());
}