From 1055eadde0b92921fdb42a5e39c7c1ba6c2faadb Mon Sep 17 00:00:00 2001 From: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com> Date: Sun, 26 Jun 2022 14:01:26 -0700 Subject: [PATCH] [Dashboard] Fix dashboard RAM and CPU with cgroups2 (#25710) Closes #25283. The dashboard shows inaccurate memory and cpu data when run inside of a docker container, in particular when using cgroups v2. This PR fixes that. --- dashboard/modules/reporter/reporter_agent.py | 27 ++++++-- python/ray/_private/utils.py | 28 ++++++-- .../kuberay/scripts/check_cpu_and_memory.py | 17 +++++ .../ray/tests/kuberay/test_autoscaling_e2e.py | 19 +++++- python/ray/tests/test_advanced_8.py | 67 ++++++++++++++++++- 5 files changed, 143 insertions(+), 15 deletions(-) create mode 100644 python/ray/tests/kuberay/scripts/check_cpu_and_memory.py diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index 8ecc752de..32eb9c4d3 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -33,6 +33,10 @@ IN_KUBERNETES_POD = "KUBERNETES_SERVICE_HOST" in os.environ # disk usage defined as the result of running psutil.disk_usage("/") # in the Ray container. ENABLE_K8S_DISK_USAGE = os.environ.get("RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE") == "1" +# Try to determine if we're in a container. +IN_CONTAINER = os.path.exists("/sys/fs/cgroup") +# Using existence of /sys/fs/cgroup as the criterion is consistent with +# Ray's existing resource logic, see e.g. ray._private.utils.get_num_cpus(). try: import gpustat.core as gpustat @@ -205,13 +209,24 @@ class ReporterAgent( def __init__(self, dashboard_agent): """Initialize the reporter object.""" super().__init__(dashboard_agent) - if IN_KUBERNETES_POD: - # psutil does not compute this correctly when in a K8s pod. - # Use ray._private.utils instead. - cpu_count = ray._private.utils.get_num_cpus() - self._cpu_counts = (cpu_count, cpu_count) + + if IN_KUBERNETES_POD or IN_CONTAINER: + # psutil does not give a meaningful logical cpu count when in a K8s pod, or + # in a container in general. + # Use ray._private.utils for this instead. + logical_cpu_count = ray._private.utils.get_num_cpus( + override_docker_cpu_warning=True + ) + # (Override the docker warning to avoid dashboard log spam.) + + # The dashboard expects a physical CPU count as well. + # This is not always meaningful in a container, but we will go ahead + # and give the dashboard what it wants using psutil. + physical_cpu_count = psutil.cpu_count(logical=False) else: - self._cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False)) + logical_cpu_count = psutil.cpu_count() + physical_cpu_count = psutil.cpu_count(logical=False) + self._cpu_counts = (logical_cpu_count, physical_cpu_count) self._ip = dashboard_agent.ip self._is_head_node = self._ip == dashboard_agent.gcs_address.split(":")[0] diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index c4378d39d..05a9e1904 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -59,6 +59,9 @@ win32_job = None win32_AssignProcessToJobObject = None +ENV_DISABLE_DOCKER_CPU_WARNING = "RAY_DISABLE_DOCKER_CPU_WARNING" in os.environ + + def get_user_temp_dir(): if "RAY_TMPDIR" in os.environ: return os.environ["RAY_TMPDIR"] @@ -421,10 +424,11 @@ def get_system_memory( docker_limit = None if os.path.exists(memory_limit_filename): with open(memory_limit_filename, "r") as f: - docker_limit = int(f.read()) + docker_limit = int(f.read().strip()) elif os.path.exists(memory_limit_filename_v2): with open(memory_limit_filename_v2, "r") as f: - max_file = f.read() + # Don't forget to strip() the newline: + max_file = f.read().strip() if max_file.isnumeric(): docker_limit = int(max_file) else: @@ -508,7 +512,20 @@ def _get_docker_cpus( return cpu_quota or cpuset_num -def get_num_cpus() -> int: +def get_num_cpus( + override_docker_cpu_warning: bool = ENV_DISABLE_DOCKER_CPU_WARNING, +) -> int: + """ + Get the number of CPUs available on this node. + Depending on the situation, use multiprocessing.cpu_count() or cgroups. + + Args: + override_docker_cpu_warning: An extra flag to explicitly turn off the Docker + warning. Setting this flag True has the same effect as setting the env + RAY_DISABLE_DOCKER_CPU_WARNING. By default, whether or not to log + the warning is determined by the env variable + RAY_DISABLE_DOCKER_CPU_WARNING. + """ cpu_count = multiprocessing.cpu_count() if os.environ.get("RAY_USE_MULTIPROCESSING_CPU_COUNT"): logger.info( @@ -527,8 +544,9 @@ def get_num_cpus() -> int: # Don't log this warning if we're on K8s or if the warning is # explicitly disabled. if ( - "RAY_DISABLE_DOCKER_CPU_WARNING" not in os.environ - and "KUBERNETES_SERVICE_HOST" not in os.environ + "KUBERNETES_SERVICE_HOST" not in os.environ + and not ENV_DISABLE_DOCKER_CPU_WARNING + and not override_docker_cpu_warning ): logger.warning( "Detecting docker specified CPUs. In " diff --git a/python/ray/tests/kuberay/scripts/check_cpu_and_memory.py b/python/ray/tests/kuberay/scripts/check_cpu_and_memory.py new file mode 100644 index 000000000..37ebcfd3c --- /dev/null +++ b/python/ray/tests/kuberay/scripts/check_cpu_and_memory.py @@ -0,0 +1,17 @@ +import ray + + +def main(): + """This script runs in a container with 1 CPU limit and 1G memory limit. + Validate that Ray reads the correct limits. + """ + cpu_limit = ray._private.utils.get_num_cpus() + mem_limit_gb = round(ray._private.utils.get_system_memory() / 10 ** 9, 2) + assert cpu_limit == 1, cpu_limit + assert mem_limit_gb == 1.00, mem_limit_gb + print(f"Confirmed cpu limit {cpu_limit}.") + print(f"Confirmed memory limit {mem_limit_gb} gigabyte.") + + +if __name__ == "__main__": + main() diff --git a/python/ray/tests/kuberay/test_autoscaling_e2e.py b/python/ray/tests/kuberay/test_autoscaling_e2e.py index 1eaa5acbb..e0b19f64f 100644 --- a/python/ray/tests/kuberay/test_autoscaling_e2e.py +++ b/python/ray/tests/kuberay/test_autoscaling_e2e.py @@ -215,15 +215,16 @@ class KubeRayAutoscalingTest(unittest.TestCase): 5. Autoscaler recognizes GPU annotations and Ray custom resources. 6. Autoscaler and operator ignore pods marked for deletion. 7. Autoscaler logs work. Autoscaler events are piped to the driver. - - Items 1. and 2. protect the example in the documentation. - Items 3. and 4. protect the autoscaler's ability to respond to Ray CR update. + 8. Ray utils show correct resource limits in the head container. Tests the following modes of interaction with a Ray cluster on K8s: 1. kubectl exec 2. Ray Client 3. Ray Job Submission + TODO (Dmitri): Split up the test logic. + Too much is stuffed into this one test case. + Resources requested by this test are safely within the bounds of an m5.xlarge instance. @@ -261,6 +262,18 @@ class KubeRayAutoscalingTest(unittest.TestCase): pod_name_filter=HEAD_POD_PREFIX, namespace=RAY_CLUSTER_NAMESPACE ) assert head_pod, "Could not find the Ray head pod." + + # Confirm head pod resource allocation. + # (This is a misplaced test of Ray's resource detection in containers. + # See the TODO in the docstring.) + logger.info("Confirming head pod resource allocation.") + out = kubectl_exec_python_script( # Interaction mode #1: `kubectl exec` + script_name="check_cpu_and_memory.py", + pod=head_pod, + container="ray-head", + namespace="default", + ) + # Scale-up logger.info("Scaling up to one worker via Ray resource request.") # The request for 2 cpus should give us a 1-cpu head (already present) and a diff --git a/python/ray/tests/test_advanced_8.py b/python/ray/tests/test_advanced_8.py index 7f1d8aead..eb7f6151d 100644 --- a/python/ray/tests/test_advanced_8.py +++ b/python/ray/tests/test_advanced_8.py @@ -1,6 +1,7 @@ # coding: utf-8 import glob import logging +import multiprocessing import os import sys import tempfile @@ -299,7 +300,7 @@ def test_get_system_memory(): ) # cgroups v2, set with tempfile.NamedTemporaryFile("w") as memory_max_file: - memory_max_file.write("100") + memory_max_file.write("100\n") memory_max_file.flush() assert ( ray._private.utils.get_system_memory( @@ -323,6 +324,70 @@ def test_get_system_memory(): ) +@pytest.mark.parametrize("in_k8s", [True, False]) +@pytest.mark.parametrize("env_disable", [True, False]) +@pytest.mark.parametrize("override_disable", [True, False]) +@pytest.mark.parametrize("got_docker_cpus", [True, False]) +def test_get_num_cpus( + in_k8s: bool, + env_disable: bool, + override_disable: bool, + got_docker_cpus: bool, + monkeypatch, +): + """Tests + - Conditions under which ray._private.utils.get_num_cpus logs a warning about + docker. + - Fallback to multiprocessing.cpu_count if there's no docker count available. + """ + # Shouldn't get the log warning if we're in K8s, the env variable is set, + # the flag arg to get_num_cpus is set, or getting docker cpus fails. + # Otherwise, should get the log message. + should_not_log = any([in_k8s, env_disable, override_disable, not got_docker_cpus]) + expected_warning = ( + "Detecting docker specified CPUs. In " + "previous versions of Ray, CPU detection in containers " + "was incorrect. Please ensure that Ray has enough CPUs " + "allocated. As a temporary workaround to revert to the " + "prior behavior, set " + "`RAY_USE_MULTIPROCESSING_CPU_COUNT=1` as an env var " + "before starting Ray. Set the env var: " + "`RAY_DISABLE_DOCKER_CPU_WARNING=1` to mute this warning." + ) + if got_docker_cpus: + mock_get_docker_cpus = mock.Mock(return_value=128) + else: + mock_get_docker_cpus = mock.Mock(side_effect=Exception()) + + if in_k8s: + monkeypatch.setenv("KUBERNETES_SERVICE_HOST", 1) + else: + try: + monkeypatch.delenv("KUBERNETES_SERVICE_HOST") + except KeyError: + pass + + with mock.patch.multiple( + "ray._private.utils", + _get_docker_cpus=mock_get_docker_cpus, + ENV_DISABLE_DOCKER_CPU_WARNING=env_disable, + logger=mock.DEFAULT, + ) as mocks: + num_cpus = ray._private.utils.get_num_cpus(override_disable) + + if got_docker_cpus: + # Got the docker count of 128 CPUs in the giant mock container. + assert num_cpus == 128 + else: + # Failed to get docker count and fell back to multiprocessing count. + assert num_cpus == multiprocessing.cpu_count() + + if should_not_log: + mocks["logger"].warning.assert_not_called() + else: + mocks["logger"].warning.assert_called_with(expected_warning) + + @pytest.mark.skipif(sys.platform == "win32", reason="not relevant for windows") def test_detect_docker_cpus(): # No limits set