[kuberay][autoscaler] Improve CPU, GPU, and memory detection. (#26219)

This PR improves the autoscaler's resource detection logic
This commit is contained in:
Dmitri Gekhtman 2022-07-02 11:32:05 -07:00 committed by GitHub
parent 34d1e580cb
commit 7d3ceb222c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 59 deletions

View file

@ -1,10 +1,10 @@
import decimal
import json import json
import logging import logging
import math
import time import time
from contextlib import suppress
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import kubernetes
import requests import requests
from ray.autoscaler._private.constants import ( from ray.autoscaler._private.constants import (
@ -29,8 +29,6 @@ RAYCLUSTER_FETCH_RETRY_S = 5
# Used as the name of the "head node type" by the autoscaler. # Used as the name of the "head node type" by the autoscaler.
_HEAD_GROUP_NAME = "head-group" _HEAD_GROUP_NAME = "head-group"
_GPU_WARNING_LOGGED = False
class AutoscalingConfigProducer: class AutoscalingConfigProducer:
"""Produces an autoscaling config by reading data from the RayCluster CR. """Produces an autoscaling config by reading data from the RayCluster CR.
@ -259,15 +257,14 @@ def _get_num_cpus(
k8s_resource_limits: Dict[str, str], k8s_resource_limits: Dict[str, str],
group_name: str, group_name: str,
) -> int: ) -> int:
if "num_cpus" in ray_start_params: """Get CPU annotation from ray_start_params or k8s_resource_limits,
return int(ray_start_params["num_cpus"]) with priority for ray_start_params.
"""
if "num-cpus" in ray_start_params:
return int(ray_start_params["num-cpus"])
elif "cpu" in k8s_resource_limits: elif "cpu" in k8s_resource_limits:
cpu_str = str(k8s_resource_limits["cpu"]) cpu_quantity: str = k8s_resource_limits["cpu"]
if cpu_str[-1] == "m": return _round_up_k8s_quantity(cpu_quantity)
# For example, '500m' rounds up to 1.
return math.ceil(int(cpu_str[:-1]) / 1000)
else:
return int(cpu_str)
else: else:
# Getting the number of CPUs is important, so raise an error if we can't do it. # Getting the number of CPUs is important, so raise an error if we can't do it.
raise ValueError( raise ValueError(
@ -280,13 +277,14 @@ def _get_num_cpus(
def _get_memory( def _get_memory(
ray_start_params: Dict[str, str], k8s_resource_limits: Dict[str, Any] ray_start_params: Dict[str, str], k8s_resource_limits: Dict[str, Any]
) -> Optional[int]: ) -> Optional[int]:
"""Get memory resource annotation from ray_start_params, if it is set there. """Get memory resource annotation from ray_start_params or k8s_resource_limits,
with priority for ray_start_params.
TODO, maybe: Consider container resource limits as in
https://github.com/ray-project/ray/pull/14567/files
""" """
if "memory" in ray_start_params: if "memory" in ray_start_params:
return int(ray_start_params["memory"]) return int(ray_start_params["memory"])
elif "memory" in k8s_resource_limits:
memory_quantity: str = k8s_resource_limits["memory"]
return _round_up_k8s_quantity(memory_quantity)
return None return None
@ -295,34 +293,44 @@ def _get_num_gpus(
k8s_resource_limits: Dict[str, Any], k8s_resource_limits: Dict[str, Any],
group_name: str, group_name: str,
) -> Optional[int]: ) -> Optional[int]:
"""Read the number of GPUs from the Ray start params. """Get memory resource annotation from ray_start_params or k8s_resource_limits,
with priority for ray_start_params.
Potential TODO: Read GPU info from the container spec, here and in the
Ray Operator.
""" """
if "num-gpus" in ray_start_params: if "num-gpus" in ray_start_params:
return int(ray_start_params["num-gpus"]) return int(ray_start_params["num-gpus"])
else:
# Issue a warning if GPUs are present in the container spec but not in the for key in k8s_resource_limits:
# ray start params. # e.g. nvidia.com/gpu
# TODO: Consider reading GPU info from container spec. if key.endswith("gpu"):
for key in k8s_resource_limits: # Typically, this is a string representing an interger, e.g. "1".
global _GPU_WARNING_LOGGED gpu_resource_quantity = k8s_resource_limits[key]
if "gpu" in key and not _GPU_WARNING_LOGGED: # Convert to int, making no assumptions on the gpu_resource_quantity,
with suppress(Exception): # besides that it's valid as a K8s resource quantity.
if int(k8s_resource_limits[key]) > 0: num_gpus = _round_up_k8s_quantity(gpu_resource_quantity)
logger.warning( if num_gpus > 0:
f"Detected GPUs in container resources for group {group_name}." # Only one GPU type supported for now, break out on first
"To ensure Ray and the autoscaler are aware of the GPUs," # "/gpu" match.
" set the `--num-gpus` rayStartParam." return num_gpus
)
_GPU_WARNING_LOGGED = True
break
return None return None
def _round_up_k8s_quantity(quantity: str) -> int:
"""Rounds a Kubernetes resource quantity up to the nearest integer.
Args:
quantity: Resource quantity as a string in the canonical K8s form.
Returns:
The quantity, rounded up, as an integer.
"""
resource_decimal: decimal.Decimal = kubernetes.utils.quantity.parse_quantity(
quantity
)
rounded = resource_decimal.to_integral_value(rounding=decimal.ROUND_UP)
return int(rounded)
def _get_custom_resources( def _get_custom_resources(
ray_start_params: Dict[str, Any], group_name: str ray_start_params: Dict[str, Any], group_name: str
) -> Dict[str, int]: ) -> Dict[str, int]:

View file

@ -71,7 +71,7 @@ def _setup_logging() -> None:
filename=ray_constants.MONITOR_LOG_FILE_NAME, # monitor.log filename=ray_constants.MONITOR_LOG_FILE_NAME, # monitor.log
max_bytes=ray_constants.LOGGING_ROTATE_BYTES, max_bytes=ray_constants.LOGGING_ROTATE_BYTES,
backup_count=ray_constants.LOGGING_ROTATE_BACKUP_COUNT, backup_count=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
logger_name="ray", # Root of the logging hierachy for Ray code. logger_name="ray", # Root of the logging hierarchy for Ray code.
) )
# Logs will also be written to the container's stdout. # Logs will also be written to the container's stdout.
# The stdout handler was set up in the cli entry point. # The stdout handler was set up in the cli entry point.

View file

@ -41,7 +41,6 @@ from ray.autoscaler._private.commands import (
) )
from ray.autoscaler._private.constants import RAY_PROCESSES from ray.autoscaler._private.constants import RAY_PROCESSES
from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID
from ray.autoscaler._private.kuberay.run_autoscaler import run_kuberay_autoscaler
from ray.dashboard.modules.job.cli import job_cli_group from ray.dashboard.modules.job.cli import job_cli_group
from ray.experimental.state.api import get_log, list_logs from ray.experimental.state.api import get_log, list_logs
from ray.experimental.state.common import DEFAULT_RPC_TIMEOUT, DEFAULT_LOG_LIMIT from ray.experimental.state.common import DEFAULT_RPC_TIMEOUT, DEFAULT_LOG_LIMIT
@ -2292,6 +2291,10 @@ def kuberay_autoscaler(cluster_name: str, cluster_namespace: str) -> None:
KubeRay cluster configs. KubeRay cluster configs.
`ray kuberay-autoscaler` is NOT a public CLI. `ray kuberay-autoscaler` is NOT a public CLI.
""" """
# Delay import to avoid introducing Ray core dependency on the Python Kubernetes
# client.
from ray.autoscaler._private.kuberay.run_autoscaler import run_kuberay_autoscaler
run_kuberay_autoscaler(cluster_name, cluster_namespace) run_kuberay_autoscaler(cluster_name, cluster_namespace)

View file

@ -1,3 +1,4 @@
import copy
from pathlib import Path from pathlib import Path
import requests import requests
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
@ -10,20 +11,30 @@ import yaml
from ray.autoscaler._private.kuberay.autoscaling_config import ( from ray.autoscaler._private.kuberay.autoscaling_config import (
_derive_autoscaling_config_from_ray_cr, _derive_autoscaling_config_from_ray_cr,
AutoscalingConfigProducer, AutoscalingConfigProducer,
_round_up_k8s_quantity,
) )
AUTOSCALING_CONFIG_MODULE_PATH = "ray.autoscaler._private.kuberay.autoscaling_config" AUTOSCALING_CONFIG_MODULE_PATH = "ray.autoscaler._private.kuberay.autoscaling_config"
def _get_basic_ray_cr() -> dict: def _get_basic_ray_cr() -> dict:
"""Returns the example Ray CR included in the Ray documentation.""" """Returns the example Ray CR included in the Ray documentation,
modified to include a GPU worker group.
"""
cr_path = str( cr_path = str(
Path(__file__).resolve().parents[2] Path(__file__).resolve().parents[2]
/ "autoscaler" / "autoscaler"
/ "kuberay" / "kuberay"
/ "ray-cluster.complete.yaml" / "ray-cluster.complete.yaml"
) )
return yaml.safe_load(open(cr_path).read()) config = yaml.safe_load(open(cr_path).read())
gpu_group = copy.deepcopy(config["spec"]["workerGroupSpecs"][0])
gpu_group["groupName"] = "gpu-group"
gpu_group["template"]["spec"]["containers"][0]["resources"]["limits"].setdefault(
"nvidia.com/gpu", 3
)
config["spec"]["workerGroupSpecs"].append(gpu_group)
return config
def _get_basic_autoscaling_config() -> dict: def _get_basic_autoscaling_config() -> dict:
@ -44,6 +55,7 @@ def _get_basic_autoscaling_config() -> dict:
"node_config": {}, "node_config": {},
"resources": { "resources": {
"CPU": 1, "CPU": 1,
"memory": 1000000000,
"Custom1": 1, "Custom1": 1,
"Custom2": 5, "Custom2": 5,
}, },
@ -54,10 +66,24 @@ def _get_basic_autoscaling_config() -> dict:
"node_config": {}, "node_config": {},
"resources": { "resources": {
"CPU": 1, "CPU": 1,
"memory": 536870912,
"Custom2": 5, "Custom2": 5,
"Custom3": 1, "Custom3": 1,
}, },
}, },
# Same as "small-group" with a GPU entry added.
"gpu-group": {
"max_workers": 300,
"min_workers": 1,
"node_config": {},
"resources": {
"CPU": 1,
"memory": 536870912,
"Custom2": 5,
"Custom3": 1,
"GPU": 3,
},
},
}, },
"auth": {}, "auth": {},
"cluster_synced_files": [], "cluster_synced_files": [],
@ -69,7 +95,7 @@ def _get_basic_autoscaling_config() -> dict:
"head_start_ray_commands": [], "head_start_ray_commands": [],
"idle_timeout_minutes": 5, "idle_timeout_minutes": 5,
"initialization_commands": [], "initialization_commands": [],
"max_workers": 300, "max_workers": 600,
"setup_commands": [], "setup_commands": [],
"upscaling_speed": 1, "upscaling_speed": 1,
"worker_nodes": {}, "worker_nodes": {},
@ -99,19 +125,25 @@ def _get_no_cpu_error() -> str:
) )
def _get_ray_cr_memory_and_gpu() -> dict: def _get_ray_cr_with_overrides() -> dict:
"""CR with memory and gpu rayStartParams.""" """CR with memory, cpu, and gpu overrides from rayStartParams."""
cr = _get_basic_ray_cr() cr = _get_basic_ray_cr()
cr["spec"]["workerGroupSpecs"][0]["rayStartParams"]["memory"] = "300000000" cr["spec"]["workerGroupSpecs"][0]["rayStartParams"]["memory"] = "300000000"
cr["spec"]["workerGroupSpecs"][0]["rayStartParams"]["num-gpus"] = "1" # num-gpus rayStartParam with no gpus in container limits
cr["spec"]["workerGroupSpecs"][0]["rayStartParams"]["num-gpus"] = "100"
# num-gpus rayStartParam overriding gpus in container limits
cr["spec"]["workerGroupSpecs"][1]["rayStartParams"]["num-gpus"] = "100"
cr["spec"]["workerGroupSpecs"][0]["rayStartParams"]["num-cpus"] = "100"
return cr return cr
def _get_autoscaling_config_memory_and_gpu() -> dict: def _get_autoscaling_config_with_overrides() -> dict:
"""Autoscaling config with memory and gpu annotations.""" """Autoscaling config with memory and gpu annotations."""
config = _get_basic_autoscaling_config() config = _get_basic_autoscaling_config()
config["available_node_types"]["small-group"]["resources"]["memory"] = 300000000 config["available_node_types"]["small-group"]["resources"]["memory"] = 300000000
config["available_node_types"]["small-group"]["resources"]["GPU"] = 1 config["available_node_types"]["small-group"]["resources"]["GPU"] = 100
config["available_node_types"]["small-group"]["resources"]["CPU"] = 100
config["available_node_types"]["gpu-group"]["resources"]["GPU"] = 100
return config return config
@ -151,6 +183,21 @@ def _get_autoscaling_config_with_options() -> dict:
return config return config
@pytest.mark.parametrize(
"input,output",
[
# There's no particular discipline to these test cases.
("100m", 1),
("15001m", 16),
("2", 2),
("100Mi", 104857600),
("1G", 1000000000),
],
)
def test_resource_quantity(input: str, output: int):
assert _round_up_k8s_quantity(input) == output, output
PARAM_ARGS = ",".join( PARAM_ARGS = ",".join(
[ [
"ray_cr_in", "ray_cr_in",
@ -182,20 +229,12 @@ TEST_DATA = (
id="no-cpu-error", id="no-cpu-error",
), ),
pytest.param( pytest.param(
_get_ray_cr_memory_and_gpu(), _get_ray_cr_with_overrides(),
_get_autoscaling_config_memory_and_gpu(), _get_autoscaling_config_with_overrides(),
None, None,
None, None,
None, None,
id="memory-and-gpu", id="overrides",
),
pytest.param(
_get_ray_cr_missing_gpu_arg(),
_get_basic_autoscaling_config(),
None,
None,
_get_gpu_complaint(),
id="gpu-complaint",
), ),
pytest.param( pytest.param(
_get_ray_cr_with_autoscaler_options(), _get_ray_cr_with_autoscaler_options(),
@ -239,7 +278,8 @@ def test_cr_image_consistency():
cr = _get_basic_ray_cr() cr = _get_basic_ray_cr()
group_specs = [cr["spec"]["headGroupSpec"]] + cr["spec"]["workerGroupSpecs"] group_specs = [cr["spec"]["headGroupSpec"]] + cr["spec"]["workerGroupSpecs"]
assert len(group_specs) == 2 # Head, CPU group, GPU group.
assert len(group_specs) == 3
ray_containers = [ ray_containers = [
group_spec["template"]["spec"]["containers"][0] for group_spec in group_specs group_spec["template"]["spec"]["containers"][0] for group_spec in group_specs