From 293452dcba08b4aa1c437d60b5ae2c9bb6ed1269 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Fri, 5 Aug 2022 17:07:13 -0600 Subject: [PATCH] [Core] Unrevert "Add retry exception allowlist for user-defined filtering of retryable application-level errors." (#26449) This reverts commit cf7305a, and unreverts #25896. This was reverted due to a failing Windows test: #26287 We can merge once the failing Windows test (and all other relevant tests) pass. --- .gitignore | 2 + cpp/src/ray/runtime/task/task_executor.cc | 7 +- cpp/src/ray/runtime/task/task_executor.h | 3 +- .../doc_code/tasks_fault_tolerance.py | 91 ++++++++++++++++++ doc/source/ray-core/tasks/fault-tolerance.rst | 55 ++++++----- python/ray/_private/ray_option_utils.py | 20 +++- python/ray/_private/worker.py | 6 +- python/ray/_raylet.pyx | 92 +++++++++++++++++-- python/ray/includes/libcoreworker.pxd | 6 +- python/ray/remote_function.py | 7 ++ python/ray/tests/BUILD | 2 +- python/ray/tests/test_failure_4.py | 46 ++++++++++ python/ray/tests/test_output.py | 1 + src/ray/common/task/task_spec.cc | 4 + src/ray/common/task/task_spec.h | 3 + src/ray/common/task/task_util.h | 10 +- src/ray/core_worker/core_worker.cc | 22 +++-- src/ray/core_worker/core_worker.h | 9 +- src/ray/core_worker/core_worker_options.h | 3 +- .../java/io_ray_runtime_RayNativeRuntime.cc | 7 +- .../test/direct_task_transport_test.cc | 6 +- src/ray/core_worker/test/mock_worker.cc | 5 +- .../transport/direct_actor_transport.cc | 6 +- .../transport/direct_actor_transport.h | 2 +- .../transport/direct_task_transport.cc | 3 +- src/ray/protobuf/common.proto | 9 +- src/ray/protobuf/core_worker.proto | 4 +- .../scheduling/cluster_task_manager_test.cc | 2 +- 28 files changed, 360 insertions(+), 73 deletions(-) create mode 100644 doc/source/ray-core/doc_code/tasks_fault_tolerance.py diff --git a/.gitignore b/.gitignore index d5c0273ca..6c44b4308 100644 --- a/.gitignore +++ b/.gitignore @@ -185,6 +185,8 @@ venv # Vim .*.swp *.swp +.*.swo +*.swo tags tags.lock tags.temp diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index 3e79b1fdf..426920b0c 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -127,9 +127,10 @@ Status TaskExecutor::ExecuteTask( const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, + const std::string &serialized_retry_exception_allowlist, std::vector> *results, std::shared_ptr &creation_task_exception_pb_bytes, - bool *is_application_level_error, + bool *is_retryable_error, const std::vector &defined_concurrency_groups, const std::string name_of_concurrency_group_to_execute) { RAY_LOG(DEBUG) << "Execute task type: " << TaskType_Name(task_type) @@ -141,6 +142,10 @@ Status TaskExecutor::ExecuteTask( auto typed_descriptor = function_descriptor->As(); std::string func_name = typed_descriptor->FunctionName(); bool cross_lang = !typed_descriptor->Caller().empty(); + // TODO(Clark): Support retrying application-level errors for C++. + // TODO(Clark): Support exception allowlist for retrying application-level + // errors for C++. + *is_retryable_error = false; Status status{}; std::shared_ptr data = nullptr; diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h index 63e0c5c67..2e48351e8 100644 --- a/cpp/src/ray/runtime/task/task_executor.h +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -83,9 +83,10 @@ class TaskExecutor { const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, + const std::string &serialized_retry_exception_allowlist, std::vector> *results, std::shared_ptr &creation_task_exception_pb_bytes, - bool *is_application_level_error, + bool *is_retryable_error, const std::vector &defined_concurrency_groups, const std::string name_of_concurrency_group_to_execute); diff --git a/doc/source/ray-core/doc_code/tasks_fault_tolerance.py b/doc/source/ray-core/doc_code/tasks_fault_tolerance.py new file mode 100644 index 000000000..e80ff4f52 --- /dev/null +++ b/doc/source/ray-core/doc_code/tasks_fault_tolerance.py @@ -0,0 +1,91 @@ +# flake8: noqa + +# fmt: off +# __tasks_fault_tolerance_retries_begin__ +import numpy as np +import os +import ray +import time + +ray.init(ignore_reinit_error=True) + +@ray.remote(max_retries=1) +def potentially_fail(failure_probability): + time.sleep(0.2) + if np.random.random() < failure_probability: + os._exit(0) + return 0 + +for _ in range(3): + try: + # If this task crashes, Ray will retry it up to one additional + # time. If either of the attempts succeeds, the call to ray.get + # below will return normally. Otherwise, it will raise an + # exception. + ray.get(potentially_fail.remote(0.5)) + print('SUCCESS') + except ray.exceptions.WorkerCrashedError: + print('FAILURE') +# __tasks_fault_tolerance_retries_end__ +# fmt: on + +# fmt: off +# __tasks_fault_tolerance_retries_exception_begin__ +import numpy as np +import os +import ray +import time + +ray.init(ignore_reinit_error=True) + +class RandomError(Exception): + pass + +@ray.remote(max_retries=1, retry_exceptions=True) +def potentially_fail(failure_probability): + if failure_probability < 0 or failure_probability > 1: + raise ValueError( + "failure_probability must be between 0 and 1, but got: " + f"{failure_probability}" + ) + time.sleep(0.2) + if np.random.random() < failure_probability: + raise RandomError("Failed!") + return 0 + +for _ in range(3): + try: + # If this task crashes, Ray will retry it up to one additional + # time. If either of the attempts succeeds, the call to ray.get + # below will return normally. Otherwise, it will raise an + # exception. + ray.get(potentially_fail.remote(0.5)) + print('SUCCESS') + except RandomError: + print('FAILURE') + +# Provide the exceptions that we want to retry as an allowlist. +retry_on_exception = potentially_fail.options(retry_exceptions=[RandomError]) +try: + # This will fail since we're passing in -1 for the failure_probability, + # which will raise a ValueError in the task and does not match the RandomError + # exception that we provided. + ray.get(retry_on_exception.remote(-1)) +except ValueError: + print("FAILED AS EXPECTED") +else: + raise RuntimeError("An exception should be raised so this shouldn't be reached.") + +# These will retry on the RandomError exception. +for _ in range(3): + try: + # If this task crashes, Ray will retry it up to one additional + # time. If either of the attempts succeeds, the call to ray.get + # below will return normally. Otherwise, it will raise an + # exception. + ray.get(retry_on_exception.remote(0.5)) + print('SUCCESS') + except RandomError: + print('FAILURE AFTER RETRIES') +# __tasks_fault_tolerance_retries_exception_end__ +# fmt: on diff --git a/doc/source/ray-core/tasks/fault-tolerance.rst b/doc/source/ray-core/tasks/fault-tolerance.rst index 42d6213b1..309eaa265 100644 --- a/doc/source/ray-core/tasks/fault-tolerance.rst +++ b/doc/source/ray-core/tasks/fault-tolerance.rst @@ -1,8 +1,14 @@ .. _task-fault-tolerance: +=============== Fault Tolerance =============== +.. _task-retries: + +Retries +======= + When a worker is executing a task, if the worker dies unexpectedly, either because the process crashed or because the machine failed, Ray will rerun the task until either the task succeeds or the maximum number of retries is @@ -15,35 +21,40 @@ using :ref:`runtime environments`. You can experiment with this behavior by running the following code. -.. code-block:: python +.. literalinclude:: ../doc_code/tasks_fault_tolerance.py + :language: python + :start-after: __tasks_fault_tolerance_retries_begin__ + :end-before: __tasks_fault_tolerance_retries_end__ - import numpy as np - import os - import ray - import time +You can also control whether application-level errors are retried, and even **which** +application-level errors are retried, via the ``retry_exceptions`` argument. This is +``False`` by default, so if your application code within the Ray task raises an +exception, this failure will **not** be retried. This is to ensure that Ray is not +retrying non-idempotent tasks when they have partially executed. +However, if your tasks are idempotent, then you can enable application-level error +retries with ``retry_exceptions=True``, or even retry a specific set of +application-level errors (such as a class of exception types that you know to be +transient) by providing an allowlist of exceptions: - ray.init(ignore_reinit_error=True) +.. literalinclude:: ../doc_code/tasks_fault_tolerance.py + :language: python + :start-after: __tasks_fault_tolerance_retries_exception_begin__ + :end-before: __tasks_fault_tolerance_retries_exception_end__ - @ray.remote(max_retries=1) - def potentially_fail(failure_probability): - time.sleep(0.2) - if np.random.random() < failure_probability: - os._exit(0) - return 0 +The semantics for each of the potential ``retry_exceptions`` values are as follows: - for _ in range(3): - try: - # If this task crashes, Ray will retry it up to one additional - # time. If either of the attempts succeeds, the call to ray.get - # below will return normally. Otherwise, it will raise an - # exception. - ray.get(potentially_fail.remote(0.5)) - print('SUCCESS') - except ray.exceptions.WorkerCrashedError: - print('FAILURE') +* ``retry_exceptions=False`` (default): Application-level errors are not retried. + +* ``retry_exceptions=True``: All application-level errors are retried. + +* ``retry_exceptions=[Exc1, Exc2]``: Application-level errors that are instances of + either ``Exc1`` or ``Exc2`` are retried. .. _object-reconstruction: +Lineage-based Object Reconstruction +=================================== + Ray also implements *lineage reconstruction* to recover task outputs that are lost from the distributed object store. This can occur during node failures. Ray will first automatically attempt to recover the value by looking for copies diff --git a/python/ray/_private/ray_option_utils.py b/python/ray/_private/ray_option_utils.py index f22ed711e..b24e35a0d 100644 --- a/python/ray/_private/ray_option_utils.py +++ b/python/ray/_private/ray_option_utils.py @@ -104,6 +104,13 @@ _common_options = { } +def issubclass_safe(obj: Any, cls_: type) -> bool: + try: + return issubclass(obj, cls_) + except TypeError: + return False + + _task_only_options = { "max_calls": _counting_option("max_calls", False, default_value=0), # Normal tasks may be retried on failure this many times. @@ -119,7 +126,18 @@ _task_only_options = { lambda x: x is None, "Setting 'object_store_memory' is not implemented for tasks", ), - "retry_exceptions": Option(bool, default_value=False), + "retry_exceptions": Option( + (bool, list, tuple), + lambda x: ( + isinstance(x, bool) + or ( + isinstance(x, (list, tuple)) + and all(issubclass_safe(x_, Exception) for x_ in x) + ) + ), + "retry_exceptions must be either a boolean or a list of exceptions", + default_value=False, + ), } _actor_only_options = { diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 13f1a8006..6268aae35 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -2843,9 +2843,9 @@ def remote(*args, **kwargs): this actor or task and its children. See :ref:`runtime-environments` for detailed documentation. This API is in beta and may change before becoming stable. - retry_exceptions: Only for *remote functions*. This specifies - whether application-level errors should be retried - up to max_retries times. + retry_exceptions: Only for *remote functions*. This specifies whether + application-level errors should be retried up to max_retries times. + This can be a boolean or a list of exceptions that should be retried. scheduling_strategy: Strategy about how to schedule a remote function or actor. Possible values are None: ray will figure out the scheduling strategy to use, it diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index eacb87757..9558807b7 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -132,6 +132,7 @@ from ray.util.scheduling_strategies import ( NodeAffinitySchedulingStrategy, ) import ray._private.ray_constants as ray_constants +import ray.cloudpickle as ray_pickle from ray._private.async_compat import sync_to_async, get_new_event_loop from ray._private.client_mode_hook import disable_client_hook import ray._private.gcs_utils as gcs_utils @@ -484,6 +485,51 @@ cdef raise_if_dependency_failed(arg): raise arg +cdef c_bool determine_if_retryable( + Exception e, + const c_string serialized_retry_exception_allowlist, + FunctionDescriptor function_descriptor, +): + """Determine if the provided exception is retryable, according to the + (possibly null) serialized exception allowlist. + + If the serialized exception allowlist is an empty string or is None once + deserialized, the exception is considered retryable and we return True. + + This method can raise an exception if: + - Deserialization of exception allowlist fails (TypeError) + - Exception allowlist is not None and not a tuple (AssertionError) + """ + if len(serialized_retry_exception_allowlist) == 0: + # No exception allowlist specified, default to all retryable. + return True + + # Deserialize exception allowlist and check that the exception is in the allowlist. + try: + exception_allowlist = ray_pickle.loads( + serialized_retry_exception_allowlist, + ) + except TypeError as inner_e: + # Exception allowlist deserialization failed. + msg = ( + "Could not deserialize the retry exception allowlist " + f"for task {function_descriptor.repr}. " + "Check " + "https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting " # noqa + "for more information.") + raise TypeError(msg) from inner_e + + if exception_allowlist is None: + # No exception allowlist specified, default to all retryable. + return True + + # Python API should have converted the list of exceptions to a tuple. + assert isinstance(exception_allowlist, tuple) + + # Check that e is in allowlist. + return isinstance(e, exception_allowlist) + + cdef execute_task( CTaskType task_type, const c_string name, @@ -493,14 +539,15 @@ cdef execute_task( const c_vector[CObjectReference] &c_arg_refs, const c_vector[CObjectID] &c_return_ids, const c_string debugger_breakpoint, + const c_string serialized_retry_exception_allowlist, c_vector[shared_ptr[CRayObject]] *returns, - c_bool *is_application_level_error, + c_bool *is_retryable_error, # This parameter is only used for actor creation task to define # the concurrency groups of this actor. const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups, const c_string c_name_of_concurrency_group_to_execute): - is_application_level_error[0] = False + is_retryable_error[0] = False worker = ray._private.worker.global_worker manager = worker.function_actor_manager @@ -687,12 +734,24 @@ cdef execute_task( raise TaskCancelledError( core_worker.get_current_task_id()) except Exception as e: - is_application_level_error[0] = True - if core_worker.get_current_task_retry_exceptions(): + is_retryable_error[0] = determine_if_retryable( + e, + serialized_retry_exception_allowlist, + function_descriptor, + ) + if ( + is_retryable_error[0] + and core_worker.get_current_task_retry_exceptions() + ): logger.info("Task failed with retryable exception:" " {}.".format( core_worker.get_current_task_id()), exc_info=True) + else: + logger.info("Task failed with unretryable exception:" + " {}.".format( + core_worker.get_current_task_id()), + exc_info=True) raise e if c_return_ids.size() == 1: # If there is only one return specified, we should return @@ -791,9 +850,10 @@ cdef CRayStatus task_execution_handler( const c_vector[CObjectReference] &c_arg_refs, const c_vector[CObjectID] &c_return_ids, const c_string debugger_breakpoint, + const c_string serialized_retry_exception_allowlist, c_vector[shared_ptr[CRayObject]] *returns, shared_ptr[LocalMemoryBuffer] &creation_task_exception_pb_bytes, - c_bool *is_application_level_error, + c_bool *is_retryable_error, const c_vector[CConcurrencyGroup] &defined_concurrency_groups, const c_string name_of_concurrency_group_to_execute) nogil: with gil, disable_client_hook(): @@ -803,8 +863,10 @@ cdef CRayStatus task_execution_handler( # it does, that indicates that there was an internal error. execute_task(task_type, task_name, ray_function, c_resources, c_args, c_arg_refs, c_return_ids, - debugger_breakpoint, returns, - is_application_level_error, + debugger_breakpoint, + serialized_retry_exception_allowlist, + returns, + is_retryable_error, defined_concurrency_groups, name_of_concurrency_group_to_execute) except Exception as e: @@ -1517,6 +1579,7 @@ cdef class CoreWorker: resources, int max_retries, c_bool retry_exceptions, + retry_exception_allowlist, scheduling_strategy, c_string debugger_breakpoint, c_string serialized_runtime_env_info, @@ -1529,10 +1592,24 @@ cdef class CoreWorker: c_vector[CObjectReference] return_refs CSchedulingStrategy c_scheduling_strategy c_vector[CObjectID] incremented_put_arg_ids + c_string serialized_retry_exception_allowlist self.python_scheduling_strategy_to_c( scheduling_strategy, &c_scheduling_strategy) + try: + serialized_retry_exception_allowlist = ray_pickle.dumps( + retry_exception_allowlist, + ) + except TypeError as e: + msg = ( + "Could not serialize the retry exception allowlist" + f"{retry_exception_allowlist} for task {function_descriptor.repr}. " + "Check " + "https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting " # noqa + "for more information.") + raise TypeError(msg) from e + with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) ray_function = CRayFunction( @@ -1551,6 +1628,7 @@ cdef class CoreWorker: max_retries, retry_exceptions, c_scheduling_strategy, debugger_breakpoint, + serialized_retry_exception_allowlist, ) # These arguments were serialized and put into the local object diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 9760c3f83..0a32daf6b 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -111,7 +111,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: int max_retries, c_bool retry_exceptions, const CSchedulingStrategy &scheduling_strategy, - c_string debugger_breakpoint) + c_string debugger_breakpoint, + c_string serialized_retry_exception_allowlist) CRayStatus CreateActor( const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, @@ -287,10 +288,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[CObjectReference] &arg_refs, const c_vector[CObjectID] &return_ids, const c_string debugger_breakpoint, + const c_string serialized_retry_exception_allowlist, c_vector[shared_ptr[CRayObject]] *returns, shared_ptr[LocalMemoryBuffer] &creation_task_exception_pb_bytes, - c_bool *is_application_level_error, + c_bool *is_retryable_error, const c_vector[CConcurrencyGroup] &defined_concurrency_groups, const c_string name_of_concurrency_group_to_execute) nogil ) task_execution_callback diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 02e79319b..bc9558c73 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -59,6 +59,7 @@ class RemoteFunction: _max_retries: The number of times this task may be retried on worker failure. _retry_exceptions: Whether application-level errors should be retried. + This can be a boolean or a list/tuple of exceptions that should be retried. _runtime_env: The runtime environment for this task. _decorator: An optional decorator that should be applied to the remote function invocation (as opposed to the function execution) before @@ -300,6 +301,11 @@ class RemoteFunction: num_returns = task_options["num_returns"] max_retries = task_options["max_retries"] retry_exceptions = task_options["retry_exceptions"] + if isinstance(retry_exceptions, (list, tuple)): + retry_exception_allowlist = tuple(retry_exceptions) + retry_exceptions = True + else: + retry_exception_allowlist = None if scheduling_strategy is None or not isinstance( scheduling_strategy, PlacementGroupSchedulingStrategy @@ -375,6 +381,7 @@ class RemoteFunction: resources, max_retries, retry_exceptions, + retry_exception_allowlist, scheduling_strategy, worker.debugger_breakpoint, serialized_runtime_env_info or "{}", diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 409dc0240..f2328b62e 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -61,7 +61,6 @@ py_test_module_list( "test_gcs_pubsub.py", "test_global_gc.py", "test_grpc_client_credentials.py", - "test_iter.py", "test_job.py", "test_get_locations.py", "test_global_state.py", @@ -285,6 +284,7 @@ py_test_module_list( "test_output.py", "test_out_of_disk_space.py", "test_failure_4.py", + "test_iter.py", "test_object_spilling.py", "test_object_spilling_no_asan.py", "test_object_spilling_2.py", diff --git a/python/ray/tests/test_failure_4.py b/python/ray/tests/test_failure_4.py index 98b39005b..c3fb1ac20 100644 --- a/python/ray/tests/test_failure_4.py +++ b/python/ray/tests/test_failure_4.py @@ -106,6 +106,52 @@ def test_retry_application_level_error(ray_start_regular): ray.get(r3) +class CountError(Exception): + pass + + +def test_retry_application_level_error_exception_filter(ray_start_regular): + @ray.remote + class Counter: + def __init__(self): + self.value = 0 + + def increment(self): + self.value += 1 + return self.value + + @ray.remote(max_retries=1, retry_exceptions=[CountError]) + def func(counter): + if counter is None: + raise ValueError() + count = counter.increment.remote() + if ray.get(count) == 1: + raise CountError() + else: + return 2 + + # Exception that doesn't satisfy the predicate should cause the task to immediately + # fail. + r0 = func.remote(None) + with pytest.raises(ValueError): + ray.get(r0) + + # Test against exceptions (CountError) that do satisfy the predicate. + counter1 = Counter.remote() + r1 = func.remote(counter1) + assert ray.get(r1) == 2 + + counter2 = Counter.remote() + r2 = func.options(max_retries=0).remote(counter2) + with pytest.raises(CountError): + ray.get(r2) + + counter3 = Counter.remote() + r3 = func.options(retry_exceptions=False).remote(counter3) + with pytest.raises(CountError): + ray.get(r3) + + @pytest.mark.xfail(cluster_not_supported, reason="cluster not supported") def test_connect_with_disconnected_node(shutdown_only): config = { diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index d827bbbe7..12263f4d3 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -177,6 +177,7 @@ ray.get([f.remote() for _ in range(15)]) assert "Tip:" not in err_str +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_fail_importing_actor(ray_start_regular, error_pubsub): script = """ import os diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 4b608d9bf..5938d4af1 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -324,6 +324,10 @@ bool TaskSpecification::IsSpreadSchedulingStrategy() const { rpc::SchedulingStrategy::SchedulingStrategyCase::kSpreadSchedulingStrategy; } +const std::string TaskSpecification::GetSerializedRetryExceptionAllowlist() const { + return message_->serialized_retry_exception_allowlist(); +} + // === Below are getter methods specific to actor creation tasks. ActorID TaskSpecification::ActorCreationId() const { diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 2932819e2..2f4ee06af 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -306,6 +306,9 @@ class TaskSpecification : public MessageWrapper { /// Whether this task is an actor task. bool IsActorTask() const; + // Returns the serialized exception allowlist for this task. + const std::string GetSerializedRetryExceptionAllowlist() const; + // Methods specific to actor creation tasks. ActorID ActorCreationId() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 4f5305f9b..a5e471302 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -138,11 +138,15 @@ class TaskSpecBuilder { return *this; } - TaskSpecBuilder &SetNormalTaskSpec(int max_retries, - bool retry_exceptions, - const rpc::SchedulingStrategy &scheduling_strategy) { + TaskSpecBuilder &SetNormalTaskSpec( + int max_retries, + bool retry_exceptions, + const std::string &serialized_retry_exception_allowlist, + const rpc::SchedulingStrategy &scheduling_strategy) { message_->set_max_retries(max_retries); message_->set_retry_exceptions(retry_exceptions); + message_->set_serialized_retry_exception_allowlist( + serialized_retry_exception_allowlist); message_->mutable_scheduling_strategy()->CopyFrom(scheduling_strategy); return *this; } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index bd5aa9ecf..84ee3eadf 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1567,7 +1567,8 @@ std::vector CoreWorker::SubmitTask( int max_retries, bool retry_exceptions, const rpc::SchedulingStrategy &scheduling_strategy, - const std::string &debugger_breakpoint) { + const std::string &debugger_breakpoint, + const std::string &serialized_retry_exception_allowlist) { RAY_CHECK(scheduling_strategy.scheduling_strategy_case() != rpc::SchedulingStrategy::SchedulingStrategyCase::SCHEDULING_STRATEGY_NOT_SET); @@ -1601,7 +1602,10 @@ std::vector CoreWorker::SubmitTask( debugger_breakpoint, depth, task_options.serialized_runtime_env_info); - builder.SetNormalTaskSpec(max_retries, retry_exceptions, scheduling_strategy); + builder.SetNormalTaskSpec(max_retries, + retry_exceptions, + serialized_retry_exception_allowlist, + scheduling_strategy); TaskSpecification task_spec = builder.Build(); RAY_LOG(DEBUG) << "Submitting normal task " << task_spec.DebugString(); std::vector returned_refs; @@ -2174,7 +2178,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, const std::shared_ptr &resource_ids, std::vector> *return_objects, ReferenceCounter::ReferenceTableProto *borrowed_refs, - bool *is_application_level_error) { + bool *is_retryable_error) { RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString(); task_queue_length_ -= 1; num_executed_tasks_ += 1; @@ -2258,9 +2262,10 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, arg_refs, return_ids, task_spec.GetDebuggerBreakpoint(), + task_spec.GetSerializedRetryExceptionAllowlist(), return_objects, creation_task_exception_pb_bytes, - is_application_level_error, + is_retryable_error, defined_concurrency_groups, name_of_concurrency_group_to_execute); @@ -2431,12 +2436,9 @@ std::vector CoreWorker::ExecuteTaskLocalMode( } auto old_id = GetActorId(); SetActorId(actor_id); - bool is_application_level_error; - RAY_UNUSED(ExecuteTask(task_spec, - resource_ids, - &return_objects, - &borrowed_refs, - &is_application_level_error)); + bool is_retryable_error; + RAY_UNUSED(ExecuteTask( + task_spec, resource_ids, &return_objects, &borrowed_refs, &is_retryable_error)); SetActorId(old_id); return returned_refs; } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index bfd43b6dd..3bf232cd8 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -458,6 +458,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] debugger_breakpoint breakpoint to drop into for the debugger after this /// task starts executing, or "" if we do not want to drop into the debugger. /// should capture parent's placement group implicilty. + /// \param[in] serialized_retry_exception_allowlist A serialized exception list + /// that serves as an allowlist of frontend-language exceptions/errors that should be + /// retried. Default is an empty string, which will be treated as an allow-all in the + /// language worker. /// \return ObjectRefs returned by this task. std::vector SubmitTask( const RayFunction &function, @@ -466,7 +470,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { int max_retries, bool retry_exceptions, const rpc::SchedulingStrategy &scheduling_strategy, - const std::string &debugger_breakpoint); + const std::string &debugger_breakpoint, + const std::string &serialized_retry_exception_allowlist = ""); /// Create an actor. /// @@ -933,7 +938,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const std::shared_ptr &resource_ids, std::vector> *return_objects, ReferenceCounter::ReferenceTableProto *borrowed_refs, - bool *is_application_level_error); + bool *is_retryable_error); /// Put an object in the local plasma store. Status PutInLocalPlasmaStore(const RayObject &object, diff --git a/src/ray/core_worker/core_worker_options.h b/src/ray/core_worker/core_worker_options.h index 5703c6544..efb50202f 100644 --- a/src/ray/core_worker/core_worker_options.h +++ b/src/ray/core_worker/core_worker_options.h @@ -41,9 +41,10 @@ struct CoreWorkerOptions { const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, + const std::string &serialized_retry_exception_allowlist, std::vector> *results, std::shared_ptr &creation_task_exception_pb_bytes, - bool *is_application_level_error, + bool *is_retryable_error, // The following 2 parameters `defined_concurrency_groups` and // `name_of_concurrency_group_to_execute` are used for Python // asyncio actor only. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index be47629e0..b496842f9 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -116,9 +116,10 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env, const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, + const std::string &serialized_retry_exception_allowlist, std::vector> *results, std::shared_ptr &creation_task_exception_pb, - bool *is_application_level_error, + bool *is_retryable_error, const std::vector &defined_concurrency_groups, const std::string name_of_concurrency_group_to_execute) { // These 2 parameters are used for Python only, and Java worker @@ -126,7 +127,9 @@ Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(JNIEnv *env, RAY_UNUSED(defined_concurrency_groups); RAY_UNUSED(name_of_concurrency_group_to_execute); // TODO(jjyao): Support retrying application-level errors for Java - *is_application_level_error = false; + // TODO(Clark): Support exception allowlist for retrying application-level + // errors for Java. + *is_retryable_error = false; JNIEnv *env = GetJNIEnv(); RAY_CHECK(java_task_executor); diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 551dfe2cb..753192da9 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -64,7 +64,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { bool ReplyPushTask(Status status = Status::OK(), bool exit = false, - bool is_application_level_error = false) { + bool is_retryable_error = false) { if (callbacks.size() == 0) { return false; } @@ -73,8 +73,8 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { if (exit) { reply.set_worker_exiting(true); } - if (is_application_level_error) { - reply.set_is_application_level_error(true); + if (is_retryable_error) { + reply.set_is_retryable_error(true); } callback(status, reply); callbacks.pop_front(); diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index 9c9ec6ec3..ab3350400 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -49,8 +49,8 @@ class MockWorker { options.node_ip_address = "127.0.0.1"; options.node_manager_port = node_manager_port; options.raylet_ip_address = "127.0.0.1"; - options.task_execution_callback = - std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9); + options.task_execution_callback = std::bind( + &MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9, _10); options.metrics_agent_port = -1; options.startup_token = startup_token; CoreWorkerProcess::Initialize(options); @@ -67,6 +67,7 @@ class MockWorker { const std::vector &arg_refs, const std::vector &return_ids, const std::string &debugger_breakpoint, + const std::string &serialized_retry_exception_allowlist, std::vector> *results) { // Note that this doesn't include dummy object id. const FunctionDescriptor function_descriptor = ray_function.GetFunctionDescriptor(); diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 31a911c1a..89fdd94df 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -92,13 +92,13 @@ void CoreWorkerDirectTaskReceiver::HandleTask( RAY_CHECK(num_returns >= 0); std::vector> return_objects; - bool is_application_level_error = false; + bool is_retryable_error = false; auto status = task_handler_(task_spec, resource_ids, &return_objects, reply->mutable_borrowed_refs(), - &is_application_level_error); - reply->set_is_application_level_error(is_application_level_error); + &is_retryable_error); + reply->set_is_retryable_error(is_retryable_error); bool objects_valid = return_objects.size() == num_returns; if (objects_valid) { diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index ab68f550f..1f8760f97 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -54,7 +54,7 @@ class CoreWorkerDirectTaskReceiver { const std::shared_ptr resource_ids, std::vector> *return_objects, ReferenceCounter::ReferenceTableProto *borrower_refs, - bool *is_application_level_error)>; + bool *is_retryable_error)>; using OnTaskDone = std::function; diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 75af2827f..673f64541 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -585,8 +585,7 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED, &status)); } else { - if (!task_spec.GetMessage().retry_exceptions() || - !reply.is_application_level_error() || + if (!task_spec.GetMessage().retry_exceptions() || !reply.is_retryable_error() || !task_finisher_->RetryTaskIfPossible(task_id)) { task_finisher_->CompletePendingTask(task_id, reply, addr.ToProto()); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 85a8afc75..13782bdb1 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -312,14 +312,17 @@ message TaskSpec { string concurrency_group_name = 24; // Whether application-level errors (exceptions) should be retried. bool retry_exceptions = 25; + // A serialized exception list that serves as an allowlist of frontend-language + // exceptions/errors that should be retried. + bytes serialized_retry_exception_allowlist = 26; // The depth of the task. The driver has depth 0, anything it calls has depth // 1, etc. - int64 depth = 26; + int64 depth = 27; // Strategy about how to schedule this task. - SchedulingStrategy scheduling_strategy = 27; + SchedulingStrategy scheduling_strategy = 28; // A count of the number of times this task has been attempted so far. 0 // means this is the first execution. - uint64 attempt_number = 28; + uint64 attempt_number = 29; } message TaskInfoEntry { diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 78171ec8f..46d3acc6b 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -125,8 +125,8 @@ message PushTaskReply { // may now be borrowing. The reference counts also include any new borrowers // that the worker created by passing a borrowed ID into a nested task. repeated ObjectReferenceCount borrowed_refs = 4; - // Whether the result contains an application-level error (exception). - bool is_application_level_error = 5; + // Whether the result contains a retryable application-level error. + bool is_retryable_error = 5; } message DirectActorCallArgWaitCompleteRequest { diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index cc0ad1efa..71ae5a6e0 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -179,7 +179,7 @@ RayTask CreateTask( } } - spec_builder.SetNormalTaskSpec(0, false, scheduling_strategy); + spec_builder.SetNormalTaskSpec(0, false, "", scheduling_strategy); return RayTask(spec_builder.Build()); }