diff --git a/doc/source/advanced.rst b/doc/source/advanced.rst index 0deb18c29..c89e3131d 100644 --- a/doc/source/advanced.rst +++ b/doc/source/advanced.rst @@ -220,6 +220,21 @@ load balancing, gang scheduling, and priority-based scheduling. :noindex: +Accelerator Types +------------------ + +Ray supports resource specific accelerator types. The `accelerator_type` field can be used to force to a task to run on a node with a specific type of accelerator. Under the hood, the accelerator type option is implemented as a custom resource demand of ``"accelerator_type:": 0.001``. This forces the task to be placed on a node with that particular accelerator type available. This also lets the multi-node-type autoscaler know that there is demand for that type of resource, potentially triggering the launch of new nodes providing that accelerator. + +.. code-block:: python + + from ray.accelerators import NVIDIA_TESLA_V100 + + @ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100) + def train(data): + return "This function was run on a node with a Tesla V100 GPU" + +See `ray.util.accelerators` to see available accelerator types. Current automatically detected accelerator types include Nvidia GPUs. + Nested Remote Functions ----------------------- diff --git a/python/ray/actor.py b/python/ray/actor.py index 1e009aaf8..caeb71275 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -264,7 +264,7 @@ class ActorClassMetadata: def __init__(self, language, modified_class, actor_creation_function_descriptor, class_id, max_restarts, max_task_retries, num_cpus, num_gpus, memory, - object_store_memory, resources): + object_store_memory, resources, accelerator_type): self.language = language self.modified_class = modified_class self.actor_creation_function_descriptor = \ @@ -279,6 +279,7 @@ class ActorClassMetadata: self.memory = memory self.object_store_memory = object_store_memory self.resources = resources + self.accelerator_type = accelerator_type self.last_export_session_and_job = None self.method_meta = ActorClassMethodMetadata.create( modified_class, actor_creation_function_descriptor) @@ -336,7 +337,8 @@ class ActorClass: @classmethod def _ray_from_modified_class(cls, modified_class, class_id, max_restarts, max_task_retries, num_cpus, num_gpus, memory, - object_store_memory, resources): + object_store_memory, resources, + accelerator_type): for attribute in [ "remote", "_remote", @@ -368,7 +370,7 @@ class ActorClass: Language.PYTHON, modified_class, actor_creation_function_descriptor, class_id, max_restarts, max_task_retries, num_cpus, num_gpus, memory, object_store_memory, - resources) + resources, accelerator_type) return self @@ -376,13 +378,13 @@ class ActorClass: def _ray_from_function_descriptor( cls, language, actor_creation_function_descriptor, max_restarts, max_task_retries, num_cpus, num_gpus, memory, object_store_memory, - resources): + resources, accelerator_type): self = ActorClass.__new__(ActorClass) self.__ray_metadata__ = ActorClassMetadata( language, None, actor_creation_function_descriptor, None, max_restarts, max_task_retries, num_cpus, num_gpus, memory, - object_store_memory, resources) + object_store_memory, resources, accelerator_type) return self @@ -428,6 +430,7 @@ class ActorClass: memory=None, object_store_memory=None, resources=None, + accelerator_type=None, max_concurrency=None, max_restarts=None, max_task_retries=None, @@ -536,8 +539,9 @@ class ActorClass: # decorator. Last three conditions are to check that no resources were # specified when _remote() was called. if (meta.num_cpus is None and meta.num_gpus is None - and meta.resources is None and num_cpus is None - and num_gpus is None and resources is None): + and meta.resources is None and meta.accelerator_type is None + and num_cpus is None and num_gpus is None and resources is None + and accelerator_type is None): # In the default case, actors acquire no resources for # their lifetime, and actor methods will require 1 CPU. cpus_to_use = ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE @@ -572,8 +576,8 @@ class ActorClass: resources = ray.utils.resources_from_resource_arguments( cpus_to_use, meta.num_gpus, meta.memory, meta.object_store_memory, - meta.resources, num_cpus, num_gpus, memory, object_store_memory, - resources) + meta.resources, meta.accelerator_type, num_cpus, num_gpus, memory, + object_store_memory, resources, accelerator_type) # If the actor methods require CPU resources, then set the required # placement resources. If actor_placement_resources is empty, then @@ -904,7 +908,7 @@ def modify_class(cls): def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, - max_restarts, max_task_retries): + accelerator_type, max_restarts, max_task_retries): Class = modify_class(cls) if max_restarts is None: @@ -928,7 +932,8 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, return ActorClass._ray_from_modified_class( Class, ActorClassID.from_random(), max_restarts, max_task_retries, - num_cpus, num_gpus, memory, object_store_memory, resources) + num_cpus, num_gpus, memory, object_store_memory, resources, + accelerator_type) def exit_actor(): diff --git a/python/ray/cross_language.py b/python/ray/cross_language.py index 8b9dc77da..36b570f18 100644 --- a/python/ray/cross_language.py +++ b/python/ray/cross_language.py @@ -79,10 +79,12 @@ def java_actor_class(class_name): return ActorClass._ray_from_function_descriptor( Language.JAVA, JavaFunctionDescriptor(class_name, "", ""), - 0, # max_restarts, - 0, # max_task_retries, - None, # num_cpus, - None, # num_gpus, - None, # memory, - None, # object_store_memory, - None) # resources, + max_restarts=0, + max_task_retries=0, + num_cpus=None, + num_gpus=None, + memory=None, + object_store_memory=None, + resources=None, + accelerator_type=None, + ) diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 3fdf50b8e..c89019c59 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -130,7 +130,7 @@ DASHBOARD_DIED_ERROR = "dashboard_died" RAYLET_CONNECTION_ERROR = "raylet_connection_error" # Used in gpu detection -RESOURCE_CONSTRAINT_PREFIX = "gpu_type:" +RESOURCE_CONSTRAINT_PREFIX = "accelerator_type:" RESOURCES_ENVIRONMENT_VARIABLE = "RAY_OVERRIDE_RESOURCES" diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index cadcfa622..579d2828b 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -61,9 +61,9 @@ class RemoteFunction: """ def __init__(self, language, function, function_descriptor, num_cpus, - num_gpus, memory, object_store_memory, resources, num_returns, - max_calls, max_retries, placement_group, - placement_group_bundle_index): + num_gpus, memory, object_store_memory, resources, + accelerator_type, num_returns, max_calls, max_retries, + placement_group, placement_group_bundle_index): self._language = language self._function = function self._function_name = ( @@ -79,6 +79,7 @@ class RemoteFunction: "setting object_store_memory is not implemented for tasks") self._object_store_memory = None self._resources = resources + self._accelerator_type = accelerator_type self._num_returns = (DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS if num_returns is None else num_returns) self._max_calls = (DEFAULT_REMOTE_FUNCTION_MAX_CALLS @@ -149,6 +150,7 @@ class RemoteFunction: num_gpus=None, memory=None, object_store_memory=None, + accelerator_type=None, resources=None, max_retries=None, placement_group=None, @@ -196,8 +198,9 @@ class RemoteFunction: resources = ray.utils.resources_from_resource_arguments( self._num_cpus, self._num_gpus, self._memory, - self._object_store_memory, self._resources, num_cpus, num_gpus, - memory, object_store_memory, resources) + self._object_store_memory, self._resources, self._accelerator_type, + num_cpus, num_gpus, memory, object_store_memory, resources, + accelerator_type) def invocation(args, kwargs): if self._is_cross_language: diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 14ef5b634..6b996f9a1 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -12,6 +12,7 @@ import pytest import ray import ray.ray_constants as ray_constants +import ray.util.accelerators import ray.cluster_utils import ray.test_utils from ray import resource_spec @@ -687,6 +688,54 @@ Blacklisted: No assert resource_spec._constraints_from_gpu_info(None) == {} +def test_accelerator_type_api(shutdown_only): + v100 = ray.util.accelerators.NVIDIA_TESLA_V100 + resource_name = f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}{v100}" + ray.init(num_cpus=4, resources={resource_name: 1}) + + quantity = 1 + + @ray.remote(accelerator_type=v100) + def decorated_func(quantity): + return ray.available_resources()[resource_name] < quantity + + assert ray.get(decorated_func.remote(quantity)) + + def via_options_func(quantity): + return ray.available_resources()[resource_name] < quantity + + assert ray.get( + ray.remote(via_options_func).options( + accelerator_type=v100).remote(quantity)) + + @ray.remote(accelerator_type=v100) + class DecoratedActor: + def __init__(self): + pass + + def initialized(self): + pass + + class ActorWithOptions: + def __init__(self): + pass + + def initialized(self): + pass + + decorated_actor = DecoratedActor.remote() + # Avoid a race condition where the actor hasn't been initialized and + # claimed the resources yet. + ray.get(decorated_actor.initialized.remote()) + assert ray.available_resources()[resource_name] < quantity + + quantity = ray.available_resources()[resource_name] + with_options = ray.remote(ActorWithOptions).options( + accelerator_type=v100).remote() + ray.get(with_options.initialized.remote()) + assert ray.available_resources()[resource_name] < quantity + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/accelerators/__init__.py b/python/ray/util/accelerators/__init__.py new file mode 100644 index 000000000..b4dcc406a --- /dev/null +++ b/python/ray/util/accelerators/__init__.py @@ -0,0 +1,6 @@ +from ray.util.accelerators.accelerators import NVIDIA_TESLA_V100 + +__all__ = [ + "NVIDIA_TESLA_V100", "NVIDIA_TESLA_P100", "NVIDIA_TESLA_T4", + "NVIDIA_TESLA_P4", "NVIDIA_TESLA_K80" +] diff --git a/python/ray/util/accelerators/accelerators.py b/python/ray/util/accelerators/accelerators.py new file mode 100644 index 000000000..c9c565d66 --- /dev/null +++ b/python/ray/util/accelerators/accelerators.py @@ -0,0 +1,5 @@ +NVIDIA_TESLA_V100 = "V100" +NVIDIA_TESLA_P100 = "P100" +NVIDIA_TESLA_T4 = "T4" +NVIDIA_TESLA_P4 = "P4" +NVIDIA_TESLA_K80 = "K80" diff --git a/python/ray/utils.py b/python/ray/utils.py index e7323fc9a..3dd4379fd 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -313,9 +313,10 @@ def set_cuda_visible_devices(gpu_ids): def resources_from_resource_arguments( default_num_cpus, default_num_gpus, default_memory, - default_object_store_memory, default_resources, runtime_num_cpus, - runtime_num_gpus, runtime_memory, runtime_object_store_memory, - runtime_resources): + default_object_store_memory, default_resources, + default_accelerator_type, runtime_num_cpus, runtime_num_gpus, + runtime_memory, runtime_object_store_memory, runtime_resources, + runtime_accelerator_type): """Determine a task's resource requirements. Args: @@ -376,6 +377,13 @@ def resources_from_resource_arguments( resources["object_store_memory"] = ray_constants.to_memory_units( object_store_memory, round_up=True) + if runtime_accelerator_type is not None: + resources[f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" + f"{runtime_accelerator_type}"] = 0.001 + elif default_accelerator_type is not None: + resources[f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" + f"{default_accelerator_type}"] = 0.001 + return resources diff --git a/python/ray/worker.py b/python/ray/worker.py index 42ed3655b..d83d42260 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1653,6 +1653,7 @@ def make_decorator(num_returns=None, memory=None, object_store_memory=None, resources=None, + accelerator_type=None, max_calls=None, max_retries=None, max_restarts=None, @@ -1687,8 +1688,9 @@ def make_decorator(num_returns=None, " integer") return ray.remote_function.RemoteFunction( Language.PYTHON, function_or_class, None, num_cpus, num_gpus, - memory, object_store_memory, resources, num_returns, max_calls, - max_retries, placement_group, placement_group_bundle_index) + memory, object_store_memory, resources, accelerator_type, + num_returns, max_calls, max_retries, placement_group, + placement_group_bundle_index) if inspect.isclass(function_or_class): if num_returns is not None: @@ -1709,7 +1711,8 @@ def make_decorator(num_returns=None, " positive integer") return ray.actor.make_actor(function_or_class, num_cpus, num_gpus, memory, object_store_memory, resources, - max_restarts, max_task_retries) + accelerator_type, max_restarts, + max_task_retries) raise TypeError("The @ray.remote decorator must be applied to " "either a function or to a class.") @@ -1745,6 +1748,9 @@ def remote(*args, **kwargs): * **resources:** The quantity of various custom resources to reserve for this task or for the lifetime of the actor. This is a dictionary mapping strings (resource names) to numbers. + * **accelerator_type:** If specified, requires that the task or actor run + on a node with the specified type of accelerator. See `ray.accelerators` + for accelerator types. * **max_calls:** Only for *remote functions*. This specifies the maximum number of times that a given worker can execute the given remote function before it must exit (this can be used to address memory leaks in @@ -1833,6 +1839,7 @@ def remote(*args, **kwargs): "memory", "object_store_memory", "resources", + "accelerator_type", "max_calls", "max_restarts", "max_task_retries", @@ -1851,6 +1858,8 @@ def remote(*args, **kwargs): assert "CPU" not in resources, "Use the 'num_cpus' argument." assert "GPU" not in resources, "Use the 'num_gpus' argument." + accelerator_type = kwargs.get("accelerator_type") + # Handle other arguments. num_returns = kwargs.get("num_returns") max_calls = kwargs.get("max_calls") @@ -1867,6 +1876,7 @@ def remote(*args, **kwargs): memory=memory, object_store_memory=object_store_memory, resources=resources, + accelerator_type=accelerator_type, max_calls=max_calls, max_restarts=max_restarts, max_task_retries=max_task_retries,