[core] Allow user to override global default for max_retries (#25189)

This PR allows the user to override the global default for max_retries for non-actor tasks. It adds an OS env called RAY_task_max_retries which can be passed to the driver or set with runtime envs. Any future tasks submitted by that worker will default to this value instead of 3, the hard-coded default.

It would be nicer if we could have a standard way of setting these defaults, but I think this is fine as a one-off for now (not a clear need for overriding defaults of other @ray.remote options yet).
Related issue number

Closes #24854.
This commit is contained in:
Stephanie Wang 2022-06-01 17:42:18 -04:00 committed by GitHub
parent 71717e59c4
commit 961b875ab8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 81 additions and 2 deletions

View file

@ -8,7 +8,10 @@ because the process crashed or because the machine failed, Ray will rerun
the task until either the task succeeds or the maximum number of retries is the task until either the task succeeds or the maximum number of retries is
exceeded. The default number of retries is 3 and can be overridden by exceeded. The default number of retries is 3 and can be overridden by
specifying ``max_retries`` in the ``@ray.remote`` decorator. Specifying -1 specifying ``max_retries`` in the ``@ray.remote`` decorator. Specifying -1
allows infinite retries, and 0 disables retries. allows infinite retries, and 0 disables retries. To override the default number
of retries for all tasks submitted, set the OS environment variable
``RAY_TASK_MAX_RETRIES``. e.g., by passing this to your driver script or by
using :ref:`runtime environments<runtime-environments>`.
You can experiment with this behavior by running the following code. You can experiment with this behavior by running the following code.

View file

@ -6,6 +6,7 @@ from ray.util.scheduling_strategies import (
PlacementGroupSchedulingStrategy, PlacementGroupSchedulingStrategy,
NodeAffinitySchedulingStrategy, NodeAffinitySchedulingStrategy,
) )
import ray.ray_constants as ray_constants
@dataclass @dataclass
@ -104,7 +105,9 @@ _task_only_options = {
"max_calls": _counting_option("max_calls", False, default_value=0), "max_calls": _counting_option("max_calls", False, default_value=0),
# Normal tasks may be retried on failure this many times. # Normal tasks may be retried on failure this many times.
# TODO(swang): Allow this to be set globally for an application. # TODO(swang): Allow this to be set globally for an application.
"max_retries": _counting_option("max_retries", default_value=3), "max_retries": _counting_option(
"max_retries", default_value=ray_constants.DEFAULT_TASK_MAX_RETRIES
),
# override "_common_options" # override "_common_options"
"num_cpus": _resource_option("num_cpus", default_value=1), "num_cpus": _resource_option("num_cpus", default_value=1),
"num_returns": _counting_option("num_returns", False, default_value=1), "num_returns": _counting_option("num_returns", False, default_value=1),

View file

@ -366,3 +366,7 @@ KV_NAMESPACE_FUNCTION_TABLE = b"fun"
LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"] LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"]
NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES"
# Default max_retries option in @ray.remote for non-actor
# tasks.
DEFAULT_TASK_MAX_RETRIES = 3

View file

@ -2,6 +2,7 @@ from functools import wraps
import inspect import inspect
import logging import logging
import uuid import uuid
import os
from ray import cloudpickle as pickle from ray import cloudpickle as pickle
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
@ -217,6 +218,14 @@ class RemoteFunction:
# fill task required options # fill task required options
for k, v in ray_option_utils.task_options.items(): for k, v in ray_option_utils.task_options.items():
if k == "max_retries":
# TODO(swang): We need to override max_retries here because the default
# value gets set at Ray import time. Ideally, we should allow setting
# default values from env vars for other options too.
v.default_value = os.environ.get(
"RAY_TASK_MAX_RETRIES", v.default_value
)
v.default_value = int(v.default_value)
task_options[k] = task_options.get(k, v.default_value) task_options[k] = task_options.get(k, v.default_value)
# "max_calls" already takes effects and should not apply again. # "max_calls" already takes effects and should not apply again.
# Remove the default value here. # Remove the default value here.

View file

@ -1,5 +1,6 @@
import sys import sys
import time import time
import os
import numpy as np import numpy as np
import pytest import pytest
@ -11,6 +12,7 @@ from ray._private.test_utils import (
Semaphore, Semaphore,
) )
from ray.internal.internal_api import memory_summary from ray.internal.internal_api import memory_summary
import ray.ray_constants as ray_constants
# Task status. # Task status.
WAITING_FOR_DEPENDENCIES = "WAITING_FOR_DEPENDENCIES" WAITING_FOR_DEPENDENCIES = "WAITING_FOR_DEPENDENCIES"
@ -420,6 +422,64 @@ def test_memory_util(ray_start_cluster):
wait_for_condition(lambda: stats() == (0, 0, 2)) wait_for_condition(lambda: stats() == (0, 0, 2))
@pytest.mark.parametrize("override_max_retries", [False, True])
def test_override_max_retries(ray_start_cluster, override_max_retries):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
max_retries = ray_constants.DEFAULT_TASK_MAX_RETRIES
runtime_env = {}
if override_max_retries:
max_retries = 1
runtime_env["env_vars"] = {"RAY_TASK_MAX_RETRIES": str(max_retries)}
os.environ["RAY_TASK_MAX_RETRIES"] = str(max_retries)
# Since we're setting the OS environment variable after the driver process
# is already started, we need to set it a second time for the workers with
# runtime_env.
ray.init(cluster.address, runtime_env=runtime_env)
try:
@ray.remote
class ExecutionCounter:
def __init__(self):
self.count = 0
def inc(self):
self.count += 1
def pop(self):
count = self.count
self.count = 0
return count
@ray.remote
def f(counter):
ray.get(counter.inc.remote())
sys.exit(-1)
counter = ExecutionCounter.remote()
with pytest.raises(ray.exceptions.WorkerCrashedError):
ray.get(f.remote(counter))
assert ray.get(counter.pop.remote()) == max_retries + 1
# Check max_retries override still works.
with pytest.raises(ray.exceptions.WorkerCrashedError):
ray.get(f.options(max_retries=0).remote(counter))
assert ray.get(counter.pop.remote()) == 1
@ray.remote
def nested(counter):
ray.get(f.remote(counter))
# Check override works through nested tasks.
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(nested.remote(counter))
assert ray.get(counter.pop.remote()) == max_retries + 1
finally:
if override_max_retries:
del os.environ["RAY_TASK_MAX_RETRIES"]
if __name__ == "__main__": if __name__ == "__main__":
import pytest import pytest