mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Core] Accelerator type API (#10561)
This commit is contained in:
parent
a699f6a4d8
commit
d6a9f0e2e4
10 changed files with 133 additions and 30 deletions
|
@ -220,6 +220,21 @@ load balancing, gang scheduling, and priority-based scheduling.
|
|||
:noindex:
|
||||
|
||||
|
||||
Accelerator Types
|
||||
------------------
|
||||
|
||||
Ray supports resource specific accelerator types. The `accelerator_type` field can be used to force to a task to run on a node with a specific type of accelerator. Under the hood, the accelerator type option is implemented as a custom resource demand of ``"accelerator_type:<type>": 0.001``. This forces the task to be placed on a node with that particular accelerator type available. This also lets the multi-node-type autoscaler know that there is demand for that type of resource, potentially triggering the launch of new nodes providing that accelerator.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ray.accelerators import NVIDIA_TESLA_V100
|
||||
|
||||
@ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100)
|
||||
def train(data):
|
||||
return "This function was run on a node with a Tesla V100 GPU"
|
||||
|
||||
See `ray.util.accelerators` to see available accelerator types. Current automatically detected accelerator types include Nvidia GPUs.
|
||||
|
||||
|
||||
Nested Remote Functions
|
||||
-----------------------
|
||||
|
|
|
@ -264,7 +264,7 @@ class ActorClassMetadata:
|
|||
def __init__(self, language, modified_class,
|
||||
actor_creation_function_descriptor, class_id, max_restarts,
|
||||
max_task_retries, num_cpus, num_gpus, memory,
|
||||
object_store_memory, resources):
|
||||
object_store_memory, resources, accelerator_type):
|
||||
self.language = language
|
||||
self.modified_class = modified_class
|
||||
self.actor_creation_function_descriptor = \
|
||||
|
@ -279,6 +279,7 @@ class ActorClassMetadata:
|
|||
self.memory = memory
|
||||
self.object_store_memory = object_store_memory
|
||||
self.resources = resources
|
||||
self.accelerator_type = accelerator_type
|
||||
self.last_export_session_and_job = None
|
||||
self.method_meta = ActorClassMethodMetadata.create(
|
||||
modified_class, actor_creation_function_descriptor)
|
||||
|
@ -336,7 +337,8 @@ class ActorClass:
|
|||
@classmethod
|
||||
def _ray_from_modified_class(cls, modified_class, class_id, max_restarts,
|
||||
max_task_retries, num_cpus, num_gpus, memory,
|
||||
object_store_memory, resources):
|
||||
object_store_memory, resources,
|
||||
accelerator_type):
|
||||
for attribute in [
|
||||
"remote",
|
||||
"_remote",
|
||||
|
@ -368,7 +370,7 @@ class ActorClass:
|
|||
Language.PYTHON, modified_class,
|
||||
actor_creation_function_descriptor, class_id, max_restarts,
|
||||
max_task_retries, num_cpus, num_gpus, memory, object_store_memory,
|
||||
resources)
|
||||
resources, accelerator_type)
|
||||
|
||||
return self
|
||||
|
||||
|
@ -376,13 +378,13 @@ class ActorClass:
|
|||
def _ray_from_function_descriptor(
|
||||
cls, language, actor_creation_function_descriptor, max_restarts,
|
||||
max_task_retries, num_cpus, num_gpus, memory, object_store_memory,
|
||||
resources):
|
||||
resources, accelerator_type):
|
||||
self = ActorClass.__new__(ActorClass)
|
||||
|
||||
self.__ray_metadata__ = ActorClassMetadata(
|
||||
language, None, actor_creation_function_descriptor, None,
|
||||
max_restarts, max_task_retries, num_cpus, num_gpus, memory,
|
||||
object_store_memory, resources)
|
||||
object_store_memory, resources, accelerator_type)
|
||||
|
||||
return self
|
||||
|
||||
|
@ -428,6 +430,7 @@ class ActorClass:
|
|||
memory=None,
|
||||
object_store_memory=None,
|
||||
resources=None,
|
||||
accelerator_type=None,
|
||||
max_concurrency=None,
|
||||
max_restarts=None,
|
||||
max_task_retries=None,
|
||||
|
@ -536,8 +539,9 @@ class ActorClass:
|
|||
# decorator. Last three conditions are to check that no resources were
|
||||
# specified when _remote() was called.
|
||||
if (meta.num_cpus is None and meta.num_gpus is None
|
||||
and meta.resources is None and num_cpus is None
|
||||
and num_gpus is None and resources is None):
|
||||
and meta.resources is None and meta.accelerator_type is None
|
||||
and num_cpus is None and num_gpus is None and resources is None
|
||||
and accelerator_type is None):
|
||||
# In the default case, actors acquire no resources for
|
||||
# their lifetime, and actor methods will require 1 CPU.
|
||||
cpus_to_use = ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE
|
||||
|
@ -572,8 +576,8 @@ class ActorClass:
|
|||
|
||||
resources = ray.utils.resources_from_resource_arguments(
|
||||
cpus_to_use, meta.num_gpus, meta.memory, meta.object_store_memory,
|
||||
meta.resources, num_cpus, num_gpus, memory, object_store_memory,
|
||||
resources)
|
||||
meta.resources, meta.accelerator_type, num_cpus, num_gpus, memory,
|
||||
object_store_memory, resources, accelerator_type)
|
||||
|
||||
# If the actor methods require CPU resources, then set the required
|
||||
# placement resources. If actor_placement_resources is empty, then
|
||||
|
@ -904,7 +908,7 @@ def modify_class(cls):
|
|||
|
||||
|
||||
def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
|
||||
max_restarts, max_task_retries):
|
||||
accelerator_type, max_restarts, max_task_retries):
|
||||
Class = modify_class(cls)
|
||||
|
||||
if max_restarts is None:
|
||||
|
@ -928,7 +932,8 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
|
|||
|
||||
return ActorClass._ray_from_modified_class(
|
||||
Class, ActorClassID.from_random(), max_restarts, max_task_retries,
|
||||
num_cpus, num_gpus, memory, object_store_memory, resources)
|
||||
num_cpus, num_gpus, memory, object_store_memory, resources,
|
||||
accelerator_type)
|
||||
|
||||
|
||||
def exit_actor():
|
||||
|
|
|
@ -79,10 +79,12 @@ def java_actor_class(class_name):
|
|||
return ActorClass._ray_from_function_descriptor(
|
||||
Language.JAVA,
|
||||
JavaFunctionDescriptor(class_name, "<init>", ""),
|
||||
0, # max_restarts,
|
||||
0, # max_task_retries,
|
||||
None, # num_cpus,
|
||||
None, # num_gpus,
|
||||
None, # memory,
|
||||
None, # object_store_memory,
|
||||
None) # resources,
|
||||
max_restarts=0,
|
||||
max_task_retries=0,
|
||||
num_cpus=None,
|
||||
num_gpus=None,
|
||||
memory=None,
|
||||
object_store_memory=None,
|
||||
resources=None,
|
||||
accelerator_type=None,
|
||||
)
|
||||
|
|
|
@ -130,7 +130,7 @@ DASHBOARD_DIED_ERROR = "dashboard_died"
|
|||
RAYLET_CONNECTION_ERROR = "raylet_connection_error"
|
||||
|
||||
# Used in gpu detection
|
||||
RESOURCE_CONSTRAINT_PREFIX = "gpu_type:"
|
||||
RESOURCE_CONSTRAINT_PREFIX = "accelerator_type:"
|
||||
|
||||
RESOURCES_ENVIRONMENT_VARIABLE = "RAY_OVERRIDE_RESOURCES"
|
||||
|
||||
|
|
|
@ -61,9 +61,9 @@ class RemoteFunction:
|
|||
"""
|
||||
|
||||
def __init__(self, language, function, function_descriptor, num_cpus,
|
||||
num_gpus, memory, object_store_memory, resources, num_returns,
|
||||
max_calls, max_retries, placement_group,
|
||||
placement_group_bundle_index):
|
||||
num_gpus, memory, object_store_memory, resources,
|
||||
accelerator_type, num_returns, max_calls, max_retries,
|
||||
placement_group, placement_group_bundle_index):
|
||||
self._language = language
|
||||
self._function = function
|
||||
self._function_name = (
|
||||
|
@ -79,6 +79,7 @@ class RemoteFunction:
|
|||
"setting object_store_memory is not implemented for tasks")
|
||||
self._object_store_memory = None
|
||||
self._resources = resources
|
||||
self._accelerator_type = accelerator_type
|
||||
self._num_returns = (DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS
|
||||
if num_returns is None else num_returns)
|
||||
self._max_calls = (DEFAULT_REMOTE_FUNCTION_MAX_CALLS
|
||||
|
@ -149,6 +150,7 @@ class RemoteFunction:
|
|||
num_gpus=None,
|
||||
memory=None,
|
||||
object_store_memory=None,
|
||||
accelerator_type=None,
|
||||
resources=None,
|
||||
max_retries=None,
|
||||
placement_group=None,
|
||||
|
@ -196,8 +198,9 @@ class RemoteFunction:
|
|||
|
||||
resources = ray.utils.resources_from_resource_arguments(
|
||||
self._num_cpus, self._num_gpus, self._memory,
|
||||
self._object_store_memory, self._resources, num_cpus, num_gpus,
|
||||
memory, object_store_memory, resources)
|
||||
self._object_store_memory, self._resources, self._accelerator_type,
|
||||
num_cpus, num_gpus, memory, object_store_memory, resources,
|
||||
accelerator_type)
|
||||
|
||||
def invocation(args, kwargs):
|
||||
if self._is_cross_language:
|
||||
|
|
|
@ -12,6 +12,7 @@ import pytest
|
|||
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.util.accelerators
|
||||
import ray.cluster_utils
|
||||
import ray.test_utils
|
||||
from ray import resource_spec
|
||||
|
@ -687,6 +688,54 @@ Blacklisted: No
|
|||
assert resource_spec._constraints_from_gpu_info(None) == {}
|
||||
|
||||
|
||||
def test_accelerator_type_api(shutdown_only):
|
||||
v100 = ray.util.accelerators.NVIDIA_TESLA_V100
|
||||
resource_name = f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}{v100}"
|
||||
ray.init(num_cpus=4, resources={resource_name: 1})
|
||||
|
||||
quantity = 1
|
||||
|
||||
@ray.remote(accelerator_type=v100)
|
||||
def decorated_func(quantity):
|
||||
return ray.available_resources()[resource_name] < quantity
|
||||
|
||||
assert ray.get(decorated_func.remote(quantity))
|
||||
|
||||
def via_options_func(quantity):
|
||||
return ray.available_resources()[resource_name] < quantity
|
||||
|
||||
assert ray.get(
|
||||
ray.remote(via_options_func).options(
|
||||
accelerator_type=v100).remote(quantity))
|
||||
|
||||
@ray.remote(accelerator_type=v100)
|
||||
class DecoratedActor:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def initialized(self):
|
||||
pass
|
||||
|
||||
class ActorWithOptions:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def initialized(self):
|
||||
pass
|
||||
|
||||
decorated_actor = DecoratedActor.remote()
|
||||
# Avoid a race condition where the actor hasn't been initialized and
|
||||
# claimed the resources yet.
|
||||
ray.get(decorated_actor.initialized.remote())
|
||||
assert ray.available_resources()[resource_name] < quantity
|
||||
|
||||
quantity = ray.available_resources()[resource_name]
|
||||
with_options = ray.remote(ActorWithOptions).options(
|
||||
accelerator_type=v100).remote()
|
||||
ray.get(with_options.initialized.remote())
|
||||
assert ray.available_resources()[resource_name] < quantity
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
|
|
6
python/ray/util/accelerators/__init__.py
Normal file
6
python/ray/util/accelerators/__init__.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from ray.util.accelerators.accelerators import NVIDIA_TESLA_V100
|
||||
|
||||
__all__ = [
|
||||
"NVIDIA_TESLA_V100", "NVIDIA_TESLA_P100", "NVIDIA_TESLA_T4",
|
||||
"NVIDIA_TESLA_P4", "NVIDIA_TESLA_K80"
|
||||
]
|
5
python/ray/util/accelerators/accelerators.py
Normal file
5
python/ray/util/accelerators/accelerators.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
NVIDIA_TESLA_V100 = "V100"
|
||||
NVIDIA_TESLA_P100 = "P100"
|
||||
NVIDIA_TESLA_T4 = "T4"
|
||||
NVIDIA_TESLA_P4 = "P4"
|
||||
NVIDIA_TESLA_K80 = "K80"
|
|
@ -313,9 +313,10 @@ def set_cuda_visible_devices(gpu_ids):
|
|||
|
||||
def resources_from_resource_arguments(
|
||||
default_num_cpus, default_num_gpus, default_memory,
|
||||
default_object_store_memory, default_resources, runtime_num_cpus,
|
||||
runtime_num_gpus, runtime_memory, runtime_object_store_memory,
|
||||
runtime_resources):
|
||||
default_object_store_memory, default_resources,
|
||||
default_accelerator_type, runtime_num_cpus, runtime_num_gpus,
|
||||
runtime_memory, runtime_object_store_memory, runtime_resources,
|
||||
runtime_accelerator_type):
|
||||
"""Determine a task's resource requirements.
|
||||
|
||||
Args:
|
||||
|
@ -376,6 +377,13 @@ def resources_from_resource_arguments(
|
|||
resources["object_store_memory"] = ray_constants.to_memory_units(
|
||||
object_store_memory, round_up=True)
|
||||
|
||||
if runtime_accelerator_type is not None:
|
||||
resources[f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}"
|
||||
f"{runtime_accelerator_type}"] = 0.001
|
||||
elif default_accelerator_type is not None:
|
||||
resources[f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}"
|
||||
f"{default_accelerator_type}"] = 0.001
|
||||
|
||||
return resources
|
||||
|
||||
|
||||
|
|
|
@ -1653,6 +1653,7 @@ def make_decorator(num_returns=None,
|
|||
memory=None,
|
||||
object_store_memory=None,
|
||||
resources=None,
|
||||
accelerator_type=None,
|
||||
max_calls=None,
|
||||
max_retries=None,
|
||||
max_restarts=None,
|
||||
|
@ -1687,8 +1688,9 @@ def make_decorator(num_returns=None,
|
|||
" integer")
|
||||
return ray.remote_function.RemoteFunction(
|
||||
Language.PYTHON, function_or_class, None, num_cpus, num_gpus,
|
||||
memory, object_store_memory, resources, num_returns, max_calls,
|
||||
max_retries, placement_group, placement_group_bundle_index)
|
||||
memory, object_store_memory, resources, accelerator_type,
|
||||
num_returns, max_calls, max_retries, placement_group,
|
||||
placement_group_bundle_index)
|
||||
|
||||
if inspect.isclass(function_or_class):
|
||||
if num_returns is not None:
|
||||
|
@ -1709,7 +1711,8 @@ def make_decorator(num_returns=None,
|
|||
" positive integer")
|
||||
return ray.actor.make_actor(function_or_class, num_cpus, num_gpus,
|
||||
memory, object_store_memory, resources,
|
||||
max_restarts, max_task_retries)
|
||||
accelerator_type, max_restarts,
|
||||
max_task_retries)
|
||||
|
||||
raise TypeError("The @ray.remote decorator must be applied to "
|
||||
"either a function or to a class.")
|
||||
|
@ -1745,6 +1748,9 @@ def remote(*args, **kwargs):
|
|||
* **resources:** The quantity of various custom resources to reserve for
|
||||
this task or for the lifetime of the actor. This is a dictionary mapping
|
||||
strings (resource names) to numbers.
|
||||
* **accelerator_type:** If specified, requires that the task or actor run
|
||||
on a node with the specified type of accelerator. See `ray.accelerators`
|
||||
for accelerator types.
|
||||
* **max_calls:** Only for *remote functions*. This specifies the maximum
|
||||
number of times that a given worker can execute the given remote function
|
||||
before it must exit (this can be used to address memory leaks in
|
||||
|
@ -1833,6 +1839,7 @@ def remote(*args, **kwargs):
|
|||
"memory",
|
||||
"object_store_memory",
|
||||
"resources",
|
||||
"accelerator_type",
|
||||
"max_calls",
|
||||
"max_restarts",
|
||||
"max_task_retries",
|
||||
|
@ -1851,6 +1858,8 @@ def remote(*args, **kwargs):
|
|||
assert "CPU" not in resources, "Use the 'num_cpus' argument."
|
||||
assert "GPU" not in resources, "Use the 'num_gpus' argument."
|
||||
|
||||
accelerator_type = kwargs.get("accelerator_type")
|
||||
|
||||
# Handle other arguments.
|
||||
num_returns = kwargs.get("num_returns")
|
||||
max_calls = kwargs.get("max_calls")
|
||||
|
@ -1867,6 +1876,7 @@ def remote(*args, **kwargs):
|
|||
memory=memory,
|
||||
object_store_memory=object_store_memory,
|
||||
resources=resources,
|
||||
accelerator_type=accelerator_type,
|
||||
max_calls=max_calls,
|
||||
max_restarts=max_restarts,
|
||||
max_task_retries=max_task_retries,
|
||||
|
|
Loading…
Add table
Reference in a new issue