mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Dashboard] Fix dashboard RAM and CPU with cgroups2 (#25710)
Closes #25283. The dashboard shows inaccurate memory and cpu data when run inside of a docker container, in particular when using cgroups v2. This PR fixes that.
This commit is contained in:
parent
0d5131ba37
commit
1055eadde0
5 changed files with 143 additions and 15 deletions
|
@ -33,6 +33,10 @@ IN_KUBERNETES_POD = "KUBERNETES_SERVICE_HOST" in os.environ
|
|||
# disk usage defined as the result of running psutil.disk_usage("/")
|
||||
# in the Ray container.
|
||||
ENABLE_K8S_DISK_USAGE = os.environ.get("RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE") == "1"
|
||||
# Try to determine if we're in a container.
|
||||
IN_CONTAINER = os.path.exists("/sys/fs/cgroup")
|
||||
# Using existence of /sys/fs/cgroup as the criterion is consistent with
|
||||
# Ray's existing resource logic, see e.g. ray._private.utils.get_num_cpus().
|
||||
|
||||
try:
|
||||
import gpustat.core as gpustat
|
||||
|
@ -205,13 +209,24 @@ class ReporterAgent(
|
|||
def __init__(self, dashboard_agent):
|
||||
"""Initialize the reporter object."""
|
||||
super().__init__(dashboard_agent)
|
||||
if IN_KUBERNETES_POD:
|
||||
# psutil does not compute this correctly when in a K8s pod.
|
||||
# Use ray._private.utils instead.
|
||||
cpu_count = ray._private.utils.get_num_cpus()
|
||||
self._cpu_counts = (cpu_count, cpu_count)
|
||||
|
||||
if IN_KUBERNETES_POD or IN_CONTAINER:
|
||||
# psutil does not give a meaningful logical cpu count when in a K8s pod, or
|
||||
# in a container in general.
|
||||
# Use ray._private.utils for this instead.
|
||||
logical_cpu_count = ray._private.utils.get_num_cpus(
|
||||
override_docker_cpu_warning=True
|
||||
)
|
||||
# (Override the docker warning to avoid dashboard log spam.)
|
||||
|
||||
# The dashboard expects a physical CPU count as well.
|
||||
# This is not always meaningful in a container, but we will go ahead
|
||||
# and give the dashboard what it wants using psutil.
|
||||
physical_cpu_count = psutil.cpu_count(logical=False)
|
||||
else:
|
||||
self._cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False))
|
||||
logical_cpu_count = psutil.cpu_count()
|
||||
physical_cpu_count = psutil.cpu_count(logical=False)
|
||||
self._cpu_counts = (logical_cpu_count, physical_cpu_count)
|
||||
|
||||
self._ip = dashboard_agent.ip
|
||||
self._is_head_node = self._ip == dashboard_agent.gcs_address.split(":")[0]
|
||||
|
|
|
@ -59,6 +59,9 @@ win32_job = None
|
|||
win32_AssignProcessToJobObject = None
|
||||
|
||||
|
||||
ENV_DISABLE_DOCKER_CPU_WARNING = "RAY_DISABLE_DOCKER_CPU_WARNING" in os.environ
|
||||
|
||||
|
||||
def get_user_temp_dir():
|
||||
if "RAY_TMPDIR" in os.environ:
|
||||
return os.environ["RAY_TMPDIR"]
|
||||
|
@ -421,10 +424,11 @@ def get_system_memory(
|
|||
docker_limit = None
|
||||
if os.path.exists(memory_limit_filename):
|
||||
with open(memory_limit_filename, "r") as f:
|
||||
docker_limit = int(f.read())
|
||||
docker_limit = int(f.read().strip())
|
||||
elif os.path.exists(memory_limit_filename_v2):
|
||||
with open(memory_limit_filename_v2, "r") as f:
|
||||
max_file = f.read()
|
||||
# Don't forget to strip() the newline:
|
||||
max_file = f.read().strip()
|
||||
if max_file.isnumeric():
|
||||
docker_limit = int(max_file)
|
||||
else:
|
||||
|
@ -508,7 +512,20 @@ def _get_docker_cpus(
|
|||
return cpu_quota or cpuset_num
|
||||
|
||||
|
||||
def get_num_cpus() -> int:
|
||||
def get_num_cpus(
|
||||
override_docker_cpu_warning: bool = ENV_DISABLE_DOCKER_CPU_WARNING,
|
||||
) -> int:
|
||||
"""
|
||||
Get the number of CPUs available on this node.
|
||||
Depending on the situation, use multiprocessing.cpu_count() or cgroups.
|
||||
|
||||
Args:
|
||||
override_docker_cpu_warning: An extra flag to explicitly turn off the Docker
|
||||
warning. Setting this flag True has the same effect as setting the env
|
||||
RAY_DISABLE_DOCKER_CPU_WARNING. By default, whether or not to log
|
||||
the warning is determined by the env variable
|
||||
RAY_DISABLE_DOCKER_CPU_WARNING.
|
||||
"""
|
||||
cpu_count = multiprocessing.cpu_count()
|
||||
if os.environ.get("RAY_USE_MULTIPROCESSING_CPU_COUNT"):
|
||||
logger.info(
|
||||
|
@ -527,8 +544,9 @@ def get_num_cpus() -> int:
|
|||
# Don't log this warning if we're on K8s or if the warning is
|
||||
# explicitly disabled.
|
||||
if (
|
||||
"RAY_DISABLE_DOCKER_CPU_WARNING" not in os.environ
|
||||
and "KUBERNETES_SERVICE_HOST" not in os.environ
|
||||
"KUBERNETES_SERVICE_HOST" not in os.environ
|
||||
and not ENV_DISABLE_DOCKER_CPU_WARNING
|
||||
and not override_docker_cpu_warning
|
||||
):
|
||||
logger.warning(
|
||||
"Detecting docker specified CPUs. In "
|
||||
|
|
17
python/ray/tests/kuberay/scripts/check_cpu_and_memory.py
Normal file
17
python/ray/tests/kuberay/scripts/check_cpu_and_memory.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
import ray
|
||||
|
||||
|
||||
def main():
|
||||
"""This script runs in a container with 1 CPU limit and 1G memory limit.
|
||||
Validate that Ray reads the correct limits.
|
||||
"""
|
||||
cpu_limit = ray._private.utils.get_num_cpus()
|
||||
mem_limit_gb = round(ray._private.utils.get_system_memory() / 10 ** 9, 2)
|
||||
assert cpu_limit == 1, cpu_limit
|
||||
assert mem_limit_gb == 1.00, mem_limit_gb
|
||||
print(f"Confirmed cpu limit {cpu_limit}.")
|
||||
print(f"Confirmed memory limit {mem_limit_gb} gigabyte.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -215,15 +215,16 @@ class KubeRayAutoscalingTest(unittest.TestCase):
|
|||
5. Autoscaler recognizes GPU annotations and Ray custom resources.
|
||||
6. Autoscaler and operator ignore pods marked for deletion.
|
||||
7. Autoscaler logs work. Autoscaler events are piped to the driver.
|
||||
|
||||
Items 1. and 2. protect the example in the documentation.
|
||||
Items 3. and 4. protect the autoscaler's ability to respond to Ray CR update.
|
||||
8. Ray utils show correct resource limits in the head container.
|
||||
|
||||
Tests the following modes of interaction with a Ray cluster on K8s:
|
||||
1. kubectl exec
|
||||
2. Ray Client
|
||||
3. Ray Job Submission
|
||||
|
||||
TODO (Dmitri): Split up the test logic.
|
||||
Too much is stuffed into this one test case.
|
||||
|
||||
Resources requested by this test are safely within the bounds of an m5.xlarge
|
||||
instance.
|
||||
|
||||
|
@ -261,6 +262,18 @@ class KubeRayAutoscalingTest(unittest.TestCase):
|
|||
pod_name_filter=HEAD_POD_PREFIX, namespace=RAY_CLUSTER_NAMESPACE
|
||||
)
|
||||
assert head_pod, "Could not find the Ray head pod."
|
||||
|
||||
# Confirm head pod resource allocation.
|
||||
# (This is a misplaced test of Ray's resource detection in containers.
|
||||
# See the TODO in the docstring.)
|
||||
logger.info("Confirming head pod resource allocation.")
|
||||
out = kubectl_exec_python_script( # Interaction mode #1: `kubectl exec`
|
||||
script_name="check_cpu_and_memory.py",
|
||||
pod=head_pod,
|
||||
container="ray-head",
|
||||
namespace="default",
|
||||
)
|
||||
|
||||
# Scale-up
|
||||
logger.info("Scaling up to one worker via Ray resource request.")
|
||||
# The request for 2 cpus should give us a 1-cpu head (already present) and a
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# coding: utf-8
|
||||
import glob
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
@ -299,7 +300,7 @@ def test_get_system_memory():
|
|||
)
|
||||
# cgroups v2, set
|
||||
with tempfile.NamedTemporaryFile("w") as memory_max_file:
|
||||
memory_max_file.write("100")
|
||||
memory_max_file.write("100\n")
|
||||
memory_max_file.flush()
|
||||
assert (
|
||||
ray._private.utils.get_system_memory(
|
||||
|
@ -323,6 +324,70 @@ def test_get_system_memory():
|
|||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("in_k8s", [True, False])
|
||||
@pytest.mark.parametrize("env_disable", [True, False])
|
||||
@pytest.mark.parametrize("override_disable", [True, False])
|
||||
@pytest.mark.parametrize("got_docker_cpus", [True, False])
|
||||
def test_get_num_cpus(
|
||||
in_k8s: bool,
|
||||
env_disable: bool,
|
||||
override_disable: bool,
|
||||
got_docker_cpus: bool,
|
||||
monkeypatch,
|
||||
):
|
||||
"""Tests
|
||||
- Conditions under which ray._private.utils.get_num_cpus logs a warning about
|
||||
docker.
|
||||
- Fallback to multiprocessing.cpu_count if there's no docker count available.
|
||||
"""
|
||||
# Shouldn't get the log warning if we're in K8s, the env variable is set,
|
||||
# the flag arg to get_num_cpus is set, or getting docker cpus fails.
|
||||
# Otherwise, should get the log message.
|
||||
should_not_log = any([in_k8s, env_disable, override_disable, not got_docker_cpus])
|
||||
expected_warning = (
|
||||
"Detecting docker specified CPUs. In "
|
||||
"previous versions of Ray, CPU detection in containers "
|
||||
"was incorrect. Please ensure that Ray has enough CPUs "
|
||||
"allocated. As a temporary workaround to revert to the "
|
||||
"prior behavior, set "
|
||||
"`RAY_USE_MULTIPROCESSING_CPU_COUNT=1` as an env var "
|
||||
"before starting Ray. Set the env var: "
|
||||
"`RAY_DISABLE_DOCKER_CPU_WARNING=1` to mute this warning."
|
||||
)
|
||||
if got_docker_cpus:
|
||||
mock_get_docker_cpus = mock.Mock(return_value=128)
|
||||
else:
|
||||
mock_get_docker_cpus = mock.Mock(side_effect=Exception())
|
||||
|
||||
if in_k8s:
|
||||
monkeypatch.setenv("KUBERNETES_SERVICE_HOST", 1)
|
||||
else:
|
||||
try:
|
||||
monkeypatch.delenv("KUBERNETES_SERVICE_HOST")
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
with mock.patch.multiple(
|
||||
"ray._private.utils",
|
||||
_get_docker_cpus=mock_get_docker_cpus,
|
||||
ENV_DISABLE_DOCKER_CPU_WARNING=env_disable,
|
||||
logger=mock.DEFAULT,
|
||||
) as mocks:
|
||||
num_cpus = ray._private.utils.get_num_cpus(override_disable)
|
||||
|
||||
if got_docker_cpus:
|
||||
# Got the docker count of 128 CPUs in the giant mock container.
|
||||
assert num_cpus == 128
|
||||
else:
|
||||
# Failed to get docker count and fell back to multiprocessing count.
|
||||
assert num_cpus == multiprocessing.cpu_count()
|
||||
|
||||
if should_not_log:
|
||||
mocks["logger"].warning.assert_not_called()
|
||||
else:
|
||||
mocks["logger"].warning.assert_called_with(expected_warning)
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="not relevant for windows")
|
||||
def test_detect_docker_cpus():
|
||||
# No limits set
|
||||
|
|
Loading…
Add table
Reference in a new issue