[Placement group] Raise an exception when invalid resources are specified with the placement group. (#19680)

* done

* Make it work

* Fix issues

* done

* try

* done

* Fix remaining bugs.
This commit is contained in:
SangBin Cho 2021-11-05 06:41:00 +09:00 committed by GitHub
parent 585d472fdf
commit 56bab61fba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 244 additions and 52 deletions

View file

@ -9,8 +9,7 @@ from ray._private.runtime_env.validation import (
override_task_or_actor_runtime_env, ParsedRuntimeEnv)
import ray.worker
from ray.util.annotations import PublicAPI
from ray.util.placement_group import (
PlacementGroup, check_placement_group_index, get_current_placement_group)
from ray.util.placement_group import configure_placement_group_based_on_context
from ray import ActorClassID, Language
from ray._raylet import PythonFunctionDescriptor
@ -676,22 +675,6 @@ class ActorClass:
"actor `lifetime` argument must be either `None` or 'detached'"
)
if placement_group_capture_child_tasks is None:
placement_group_capture_child_tasks = (
worker.should_capture_child_tasks_in_placement_group)
if placement_group == "default":
if placement_group_capture_child_tasks:
placement_group = get_current_placement_group()
else:
placement_group = PlacementGroup.empty()
if not placement_group:
placement_group = PlacementGroup.empty()
check_placement_group_index(placement_group,
placement_group_bundle_index)
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
@ -752,6 +735,17 @@ class ActorClass:
creation_args = signature.flatten_args(function_signature, args,
kwargs)
if placement_group_capture_child_tasks is None:
placement_group_capture_child_tasks = (
worker.should_capture_child_tasks_in_placement_group)
placement_group = configure_placement_group_based_on_context(
placement_group_capture_child_tasks,
placement_group_bundle_index,
resources,
actor_placement_resources,
meta.class_name,
placement_group=placement_group)
if runtime_env and not isinstance(runtime_env, ParsedRuntimeEnv):
runtime_env = ParsedRuntimeEnv(runtime_env)
elif isinstance(runtime_env, ParsedRuntimeEnv):

View file

@ -8,11 +8,7 @@ from ray._raylet import PythonFunctionDescriptor
from ray import cross_language, Language
from ray._private.client_mode_hook import client_mode_convert_function
from ray._private.client_mode_hook import client_mode_should_convert
from ray.util.placement_group import (
PlacementGroup,
check_placement_group_index,
get_current_placement_group,
)
from ray.util.placement_group import configure_placement_group_based_on_context
import ray._private.signature
from ray._private.runtime_env.validation import (
override_task_or_actor_runtime_env, ParsedRuntimeEnv)
@ -285,33 +281,25 @@ class RemoteFunction:
if retry_exceptions is None:
retry_exceptions = self._retry_exceptions
if placement_group_capture_child_tasks is None:
placement_group_capture_child_tasks = (
worker.should_capture_child_tasks_in_placement_group)
if self._placement_group != "default":
if self._placement_group:
placement_group = self._placement_group
else:
placement_group = PlacementGroup.empty()
elif placement_group == "default":
if placement_group_capture_child_tasks:
placement_group = get_current_placement_group()
else:
placement_group = PlacementGroup.empty()
if not placement_group:
placement_group = PlacementGroup.empty()
check_placement_group_index(placement_group,
placement_group_bundle_index)
resources = ray._private.utils.resources_from_resource_arguments(
self._num_cpus, self._num_gpus, self._memory,
self._object_store_memory, self._resources, self._accelerator_type,
num_cpus, num_gpus, memory, object_store_memory, resources,
accelerator_type)
if placement_group_capture_child_tasks is None:
placement_group_capture_child_tasks = (
worker.should_capture_child_tasks_in_placement_group)
if placement_group == "default":
placement_group = self._placement_group
placement_group = configure_placement_group_based_on_context(
placement_group_capture_child_tasks,
placement_group_bundle_index,
resources,
{}, # no placement_resources for tasks
self._function_descriptor.function_name,
placement_group=placement_group)
if runtime_env and not isinstance(runtime_env, ParsedRuntimeEnv):
runtime_env = ParsedRuntimeEnv(runtime_env)
elif isinstance(runtime_env, ParsedRuntimeEnv):

View file

@ -45,9 +45,22 @@ def test_warning_for_infeasible_tasks(ray_start_regular, error_pubsub):
assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR
# Placement group cannot be made, but no warnings should occur.
pg = placement_group([{"GPU": 1}], strategy="STRICT_PACK")
pg.ready()
f.options(placement_group=pg).remote()
total_cpus = ray.cluster_resources()["CPU"]
# Occupy one cpu by an actor
@ray.remote(num_cpus=1)
class A:
pass
a = A.remote()
print(a)
@ray.remote(num_cpus=total_cpus)
def g():
pass
pg = placement_group([{"CPU": total_cpus}], strategy="STRICT_PACK")
g.options(placement_group=pg).remote()
errors = get_error_message(
p, 1, ray_constants.INFEASIBLE_TASK_ERROR, timeout=5)

View file

@ -44,6 +44,82 @@ def test_placement_ready(ray_start_regular, connect_to_client):
placement_group_assert_no_leak([pg])
def test_placement_group_invalid_resource_request(shutdown_only):
"""
Make sure exceptions are raised if
requested resources don't fit any bundles.
"""
ray.init(resources={"a": 1})
pg = ray.util.placement_group(bundles=[{"a": 1}])
#
# Test an actor with 0 cpu.
#
@ray.remote
class A:
def ready(self):
pass
# The actor cannot be scheduled with the default because
# it requires 1 cpu for the placement, but the pg doesn't have it.
with pytest.raises(ValueError):
a = A.options(placement_group=pg).remote()
# Shouldn't work with 1 CPU because pg doesn't contain CPUs.
with pytest.raises(ValueError):
a = A.options(num_cpus=1, placement_group=pg).remote()
# 0 CPU should work.
a = A.options(num_cpus=0, placement_group=pg).remote()
ray.get(a.ready.remote())
del a
#
# Test an actor with non-0 resources.
#
@ray.remote(resources={"a": 1})
class B:
def ready(self):
pass
# When resources are given to the placement group,
# it automatically adds 1 CPU to resources, so it should fail.
with pytest.raises(ValueError):
b = B.options(placement_group=pg).remote()
# If 0 cpu is given, it should work.
b = B.options(num_cpus=0, placement_group=pg).remote()
ray.get(b.ready.remote())
del b
# If resources are requested too much, it shouldn't work.
with pytest.raises(ValueError):
# The actor cannot be scheduled with no resource specified.
# Note that the default actor has 0 cpu.
B.options(num_cpus=0, resources={"a": 2}, placement_group=pg).remote()
#
# Test a function with 1 CPU.
#
@ray.remote
def f():
pass
# 1 CPU shouldn't work because the pg doesn't have CPU bundles.
with pytest.raises(ValueError):
f.options(placement_group=pg).remote()
# 0 CPU should work.
ray.get(f.options(placement_group=pg, num_cpus=0).remote())
#
# Test a function with 0 CPU.
#
@ray.remote(num_cpus=0)
def g():
pass
# 0 CPU should work.
ray.get(g.options(placement_group=pg).remote())
placement_group_assert_no_leak([pg])
@pytest.mark.parametrize("connect_to_client", [False, True])
def test_placement_group_pack(ray_start_cluster, connect_to_client):
@ray.remote(num_cpus=2)

View file

@ -1,7 +1,4 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from typing import Dict, Union, List, Optional
import ray
from ray._raylet import ObjectRef
@ -13,6 +10,7 @@ from ray._private.client_mode_hook import client_mode_should_convert
from ray._private.client_mode_hook import client_mode_wrap
bundle_reservation_check = None
BUNDLE_RESOURCE_LABEL = "bundle"
# We need to import this method to use for ready API.
@ -46,6 +44,10 @@ class PlacementGroup:
self.id = id
self.bundle_cache = bundle_cache
@property
def is_empty(self):
return self.id.is_nil()
def ready(self) -> ObjectRef:
"""Returns an ObjectRef to check ready status.
@ -71,7 +73,7 @@ class PlacementGroup:
return bundle_reservation_check.options(
placement_group=self, resources={
"bundle": 0.001
BUNDLE_RESOURCE_LABEL: 0.001
}).remote(self)
def wait(self, timeout_seconds: Union[float, int]) -> bool:
@ -185,6 +187,11 @@ def placement_group(bundles: List[Dict[str, float]],
creator is dead, or "detached", which means the placement group
will live as a global object independent of the creator.
Raises:
ValueError if bundle type is not a list.
ValueError if empty bundle or empty resource bundles are given.
ValueError if the wrong lifetime arguments are given.
Return:
PlacementGroup: Placement group object.
"""
@ -330,3 +337,117 @@ def check_placement_group_index(placement_group: PlacementGroup,
raise ValueError(f"placement group bundle index {bundle_index} "
f"is invalid. Valid placement group indexes: "
f"0-{placement_group.bundle_count}")
def _validate_resource_shape(placement_group, resources, placement_resources,
task_or_actor_repr):
def valid_resource_shape(resources, bundle_specs):
"""
If the resource shape cannot fit into every
bundle spec, return False
"""
for bundle in bundle_specs:
fit_in_bundle = True
for resource, requested_val in resources.items():
# Skip "bundle" resource as it is automatically added
# to all nodes with bundles by the placement group.
if resource == BUNDLE_RESOURCE_LABEL:
continue
if bundle.get(resource, 0) < requested_val:
fit_in_bundle = False
break
if fit_in_bundle:
# If resource request fits in any bundle, it is valid.
return True
return False
bundles = placement_group.bundle_specs
resources_valid = valid_resource_shape(resources, bundles)
placement_resources_valid = valid_resource_shape(placement_resources,
bundles)
if not resources_valid:
raise ValueError(f"Cannot schedule {task_or_actor_repr} with "
"the placement group because the resource request "
f"{resources} cannot fit into any bundles for "
f"the placement group, {bundles}.")
if not placement_resources_valid:
# Happens for the default actor case.
# placement_resources is not an exposed concept to users,
# so we should write more specialized error messages.
raise ValueError(f"Cannot schedule {task_or_actor_repr} with "
"the placement group because the actor requires "
f"{placement_resources.get('CPU', 0)} CPU for "
"creation, but it cannot "
f"fit into any bundles for the placement group, "
f"{bundles}. Consider "
"creating a placement group with CPU resources.")
def configure_placement_group_based_on_context(
placement_group_capture_child_tasks: bool,
bundle_index: int,
resources: Dict,
placement_resources: Dict,
task_or_actor_repr: str,
placement_group: Union[PlacementGroup, str, None] = "default")\
-> PlacementGroup:
"""Configure the placement group based on the given context.
Based on the given context, this API returns the placement group instance
for task/actor scheduling.
Params:
placement_group_capture_child_tasks: Whether or not the
placement group needs to be captured from the global
context.
bundle_index: The bundle index for tasks/actor scheduling.
resources: The scheduling resources.
placement_resources: The scheduling placement resources for
actors.
task_or_actor_repr: The repr of task or actor
function/class descriptor.
placement_group: The placement group instance.
- "default": Default placement group argument. Currently,
the default behavior is to capture the parent task'
placement group if placement_group_capture_child_tasks
is set.
- None: means placement group is explicitly not configured.
- Placement group instance: In this case, do nothing.
Returns:
Placement group instance based on the given context.
Raises:
ValueError: If the bundle index is invalid for the placement group
or the requested resources shape doesn't fit to any
bundles.
"""
# Validate inputs.
assert placement_group_capture_child_tasks is not None
assert resources is not None
# Validate and get the PlacementGroup instance.
# Placement group could be None, default, or placement group.
# Default behavior is "do not capture child tasks".
if placement_group != "default":
if not placement_group:
placement_group = PlacementGroup.empty()
elif placement_group == "default":
if placement_group_capture_child_tasks:
placement_group = get_current_placement_group()
else:
placement_group = PlacementGroup.empty()
if not placement_group:
placement_group = PlacementGroup.empty()
assert isinstance(placement_group, PlacementGroup)
# Validate the index.
check_placement_group_index(placement_group, bundle_index)
# Validate the shape.
if not placement_group.is_empty:
_validate_resource_shape(placement_group, resources,
placement_resources, task_or_actor_repr)
return placement_group