[core] Simplify options handling [Part 2] (#23882)

* cleanup

* keep the original semantics
This commit is contained in:
Siyuan (Ryans) Zhuang 2022-04-13 22:09:14 -07:00 committed by GitHub
parent f400c20246
commit 9c81a97fd9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 134 deletions

View file

@ -12,7 +12,7 @@ import sys
import tempfile
import threading
import time
from typing import Optional, Sequence, Tuple, Any, Union
from typing import Optional, Sequence, Tuple, Any, Union, Dict
import uuid
import grpc
import warnings
@ -283,52 +283,16 @@ def set_cuda_visible_devices(gpu_ids):
last_set_gpu_ids = gpu_ids
def resources_from_resource_arguments(
default_num_cpus,
default_num_gpus,
default_memory,
default_object_store_memory,
default_resources,
default_accelerator_type,
runtime_num_cpus,
runtime_num_gpus,
runtime_memory,
runtime_object_store_memory,
runtime_resources,
runtime_accelerator_type,
):
def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Determine a task's resource requirements.
Args:
default_num_cpus: The default number of CPUs required by this function
or actor method.
default_num_gpus: The default number of GPUs required by this function
or actor method.
default_memory: The default heap memory required by this function
or actor method.
default_object_store_memory: The default object store memory required
by this function or actor method.
default_resources: The default custom resources required by this
function or actor method.
runtime_num_cpus: The number of CPUs requested when the task was
invoked.
runtime_num_gpus: The number of GPUs requested when the task was
invoked.
runtime_memory: The heap memory requested when the task was invoked.
runtime_object_store_memory: The object store memory requested when
the task was invoked.
runtime_resources: The custom resources requested when the task was
invoked.
options_dict: The dictionary that contains resources requirements.
Returns:
A dictionary of the resource requirements for the task.
"""
if runtime_resources is not None:
resources = runtime_resources.copy()
elif default_resources is not None:
resources = default_resources.copy()
else:
resources = {}
resources = (options_dict.get("resources") or {}).copy()
if "CPU" in resources or "GPU" in resources:
raise ValueError(
@ -340,33 +304,25 @@ def resources_from_resource_arguments(
"contain the key 'memory' or 'object_store_memory'"
)
assert default_num_cpus is not None
resources["CPU"] = (
default_num_cpus if runtime_num_cpus is None else runtime_num_cpus
)
num_cpus = options_dict.get("num_cpus")
num_gpus = options_dict.get("num_gpus")
memory = options_dict.get("memory")
object_store_memory = options_dict.get("object_store_memory")
accelerator_type = options_dict.get("accelerator_type")
if runtime_num_gpus is not None:
resources["GPU"] = runtime_num_gpus
elif default_num_gpus is not None:
resources["GPU"] = default_num_gpus
# Order of arguments matter for short circuiting.
memory = runtime_memory or default_memory
object_store_memory = runtime_object_store_memory or default_object_store_memory
if num_cpus is not None:
resources["CPU"] = num_cpus
if num_gpus is not None:
resources["GPU"] = num_gpus
if memory is not None:
resources["memory"] = ray_constants.to_memory_units(memory, round_up=True)
if object_store_memory is not None:
resources["object_store_memory"] = ray_constants.to_memory_units(
object_store_memory, round_up=True
)
if runtime_accelerator_type is not None:
if accelerator_type is not None:
resources[
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" f"{runtime_accelerator_type}"
] = 0.001
elif default_accelerator_type is not None:
resources[
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" f"{default_accelerator_type}"
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}"
] = 0.001
return resources

View file

@ -521,7 +521,7 @@ class ActorClass:
"""
return self._remote(args=args, kwargs=kwargs, **self._default_options)
def options(self, args=None, kwargs=None, **actor_options):
def options(self, **actor_options):
"""Configures and overrides the actor instantiation parameters.
The arguments are the same as those that can be passed
@ -704,12 +704,6 @@ class ActorClass:
name = actor_options["name"]
namespace = actor_options["namespace"]
lifetime = actor_options["lifetime"]
num_cpus = actor_options["num_cpus"]
num_gpus = actor_options["num_gpus"]
accelerator_type = actor_options["accelerator_type"]
resources = actor_options["resources"]
memory = actor_options["memory"]
object_store_memory = actor_options["object_store_memory"]
runtime_env = actor_options["runtime_env"]
placement_group = actor_options["placement_group"]
placement_group_bundle_index = actor_options["placement_group_bundle_index"]
@ -762,31 +756,6 @@ class ActorClass:
"'non_detached' and 'None'."
)
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
# specified when _remote() was called.
if (
num_cpus is None
and num_gpus is None
and resources is None
and accelerator_type is None
):
# In the default case, actors acquire no resources for
# their lifetime, and actor methods will require 1 CPU.
cpus_to_use = ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE
else:
# If any resources are specified (here or in decorator), then
# all resources are acquired for the actor's lifetime and no
# resources are associated with methods.
cpus_to_use = (
ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED
if num_cpus is None
else num_cpus
)
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED
# LOCAL_MODE cannot handle cross_language
if worker.mode == ray.LOCAL_MODE:
assert (
@ -811,21 +780,27 @@ class ActorClass:
meta.method_meta.methods.keys(),
)
# TODO(suquark): cleanup "resources_from_resource_arguments" later.
resources = ray._private.utils.resources_from_resource_arguments(
cpus_to_use,
num_gpus,
memory,
object_store_memory,
resources,
accelerator_type,
num_cpus,
num_gpus,
memory,
object_store_memory,
resources,
accelerator_type,
)
resources = ray._private.utils.resources_from_ray_options(actor_options)
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
# specified when _remote() was called.
# TODO(suquark): In the original code, memory is not considered as resources,
# when deciding the default CPUs. It is strange, but we keep the original
# semantics in case that it breaks user applications & tests.
if not set(resources.keys()).difference({"memory", "object_store_memory"}):
# In the default case, actors acquire no resources for
# their lifetime, and actor methods will require 1 CPU.
resources.setdefault("CPU", ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE)
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE
else:
# If any resources are specified (here or in decorator), then
# all resources are acquired for the actor's lifetime and no
# resources are associated with methods.
resources.setdefault(
"CPU", ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED
)
actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED
# If the actor methods require CPU resources, then set the required
# placement resources. If actor_placement_resources is empty, then

View file

@ -119,12 +119,7 @@ class RemoteFunction:
f"try '{self._function_name}.remote()'."
)
def options(
self,
args=None,
kwargs=None,
**task_options,
):
def options(self, **task_options):
"""Configures and overrides the task invocation parameters.
The arguments are the same as those that can be passed to :obj:`ray.remote`.
@ -229,12 +224,6 @@ class RemoteFunction:
# TODO(suquark): cleanup these fields
name = task_options["name"]
num_cpus = task_options["num_cpus"]
num_gpus = task_options["num_gpus"]
accelerator_type = task_options["accelerator_type"]
resources = task_options["resources"]
memory = task_options["memory"]
object_store_memory = task_options["object_store_memory"]
runtime_env = parse_runtime_env(task_options["runtime_env"])
placement_group = task_options["placement_group"]
placement_group_bundle_index = task_options["placement_group_bundle_index"]
@ -246,21 +235,7 @@ class RemoteFunction:
max_retries = task_options["max_retries"]
retry_exceptions = task_options["retry_exceptions"]
# TODO(suquark): cleanup "resources_from_resource_arguments" later.
resources = ray._private.utils.resources_from_resource_arguments(
num_cpus,
num_gpus,
memory,
object_store_memory,
resources,
accelerator_type,
num_cpus,
num_gpus,
memory,
object_store_memory,
resources,
accelerator_type,
)
resources = ray._private.utils.resources_from_ray_options(task_options)
if scheduling_strategy is None or isinstance(
scheduling_strategy, PlacementGroupSchedulingStrategy

View file

@ -2112,7 +2112,6 @@ def _mode(worker=global_worker):
def _make_remote(function_or_class, options):
# filter out placeholders in options
if inspect.isfunction(function_or_class) or is_cython(function_or_class):
ray_option_utils.validate_task_options(options, in_options=False)
return ray.remote_function.RemoteFunction(