mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Autoscaler] Fix resource passing bug fix (#10397)
This commit is contained in:
parent
2afb54c99c
commit
b1f3c9e10e
5 changed files with 32 additions and 26 deletions
|
@ -8,6 +8,7 @@ import os
|
|||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
|
||||
from ray.autoscaler.docker import check_docker_running_cmd, \
|
||||
check_docker_image, \
|
||||
|
@ -81,18 +82,9 @@ def _with_environment_variables(cmd: str,
|
|||
automatically be converted to a one line yaml string.
|
||||
"""
|
||||
|
||||
def dict_as_one_line_yaml(d):
|
||||
items = []
|
||||
for key, val in d.items():
|
||||
item_str = "{}: {}".format(quote(str(key)), quote(str(val)))
|
||||
items.append(item_str)
|
||||
|
||||
return "{" + ",".join(items) + "}"
|
||||
|
||||
as_strings = []
|
||||
for key, val in environment_variables.items():
|
||||
if isinstance(val, dict):
|
||||
val = dict_as_one_line_yaml(val)
|
||||
val = json.dumps(val, separators=(",", ":"))
|
||||
s = "export {}={};".format(key, quote(val))
|
||||
as_strings.append(s)
|
||||
all_vars = "".join(as_strings)
|
||||
|
@ -102,7 +94,6 @@ def _with_environment_variables(cmd: str,
|
|||
def _with_interactive(cmd):
|
||||
force_interactive = ("true && source ~/.bashrc && "
|
||||
"export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ")
|
||||
|
||||
return ["bash", "--login", "-c", "-i", quote(force_interactive + cmd)]
|
||||
|
||||
|
||||
|
|
|
@ -257,9 +257,15 @@ class Node:
|
|||
"""Resolve and return the current resource spec for the node."""
|
||||
|
||||
def merge_resources(env_dict, params_dict):
|
||||
"""Merge two dictionaries, picking from the second in the event of a conflict.
|
||||
Also emit a warning on every conflict.
|
||||
"""Separates special case params and merges two dictionaries, picking from the
|
||||
first in the event of a conflict. Also emit a warning on every
|
||||
conflict.
|
||||
"""
|
||||
num_cpus = env_dict.pop("CPU", None)
|
||||
num_gpus = env_dict.pop("GPU", None)
|
||||
memory = env_dict.pop("memory", None)
|
||||
object_store_memory = env_dict.pop("object_store_memory", None)
|
||||
|
||||
result = params_dict.copy()
|
||||
result.update(env_dict)
|
||||
|
||||
|
@ -268,19 +274,24 @@ class Node:
|
|||
logger.warning("Autoscaler is overriding your resource:"
|
||||
"{}: {} with {}.".format(
|
||||
key, params_dict[key], env_dict[key]))
|
||||
return result
|
||||
return num_cpus, num_gpus, memory, object_store_memory, result
|
||||
|
||||
env_resources = {}
|
||||
env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE)
|
||||
if env_string:
|
||||
env_resources = json.loads(env_string)
|
||||
logger.info(f"Autosaler overriding resources: {env_resources}.")
|
||||
|
||||
if not self._resource_spec:
|
||||
resources = merge_resources(env_resources,
|
||||
self._ray_params.resources)
|
||||
num_cpus, num_gpus, memory, object_store_memory, resources = \
|
||||
merge_resources(env_resources, self._ray_params.resources)
|
||||
self._resource_spec = ResourceSpec(
|
||||
self._ray_params.num_cpus, self._ray_params.num_gpus,
|
||||
self._ray_params.memory, self._ray_params.object_store_memory,
|
||||
self._ray_params.num_cpus
|
||||
if num_cpus is None else num_cpus, self._ray_params.num_gpus
|
||||
if num_gpus is None else num_gpus, self._ray_params.memory
|
||||
if memory is None else memory,
|
||||
self._ray_params.object_store_memory
|
||||
if object_store_memory is None else object_store_memory,
|
||||
resources, self._ray_params.redis_max_memory).resolve(
|
||||
is_head=self.head, node_ip_address=self.node_ip_address)
|
||||
return self._resource_spec
|
||||
|
|
|
@ -615,7 +615,8 @@ def test_ray_address_environment_variable(ray_start_cluster):
|
|||
def test_ray_resources_environment_variable(ray_start_cluster):
|
||||
address = ray_start_cluster.address
|
||||
|
||||
os.environ["RAY_OVERRIDE_RESOURCES"] = "{\"custom1\":1, \"custom2\":2}"
|
||||
os.environ[
|
||||
"RAY_OVERRIDE_RESOURCES"] = "{\"custom1\":1, \"custom2\":2, \"CPU\":3}"
|
||||
ray.init(address=address, resources={"custom1": 3, "custom3": 3})
|
||||
|
||||
cluster_resources = ray.cluster_resources()
|
||||
|
@ -623,6 +624,7 @@ def test_ray_resources_environment_variable(ray_start_cluster):
|
|||
assert cluster_resources["custom1"] == 1
|
||||
assert cluster_resources["custom2"] == 2
|
||||
assert cluster_resources["custom3"] == 3
|
||||
assert cluster_resources["CPU"] == 3
|
||||
|
||||
|
||||
def test_gpu_info_parsing():
|
||||
|
|
|
@ -14,7 +14,7 @@ auth_config = {
|
|||
def test_environment_variable_encoder_strings():
|
||||
env_vars = {"var1": "quote between this \" and this", "var2": "123"}
|
||||
res = _with_environment_variables("echo hello", env_vars)
|
||||
expected = """export var1='quote between this " and this';export var2=123;echo hello""" # noqa: E501
|
||||
expected = """export var1='"quote between this \\" and this"';export var2='"123"';echo hello""" # noqa: E501
|
||||
assert res == expected
|
||||
|
||||
|
||||
|
@ -22,7 +22,7 @@ def test_environment_variable_encoder_dict():
|
|||
env_vars = {"value1": "string1", "value2": {"a": "b", "c": 2}}
|
||||
res = _with_environment_variables("echo hello", env_vars)
|
||||
|
||||
expected = """export value1=string1;export value2='{a: b,c: 2}';echo hello""" # noqa: E501
|
||||
expected = """export value1='"string1"';export value2='{"a":"b","c":2}';echo hello""" # noqa: E501
|
||||
assert res == expected
|
||||
|
||||
|
||||
|
@ -84,7 +84,7 @@ def test_ssh_command_runner():
|
|||
"--login",
|
||||
"-c",
|
||||
"-i",
|
||||
"""'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'quote between this " and this'"'"';export var2=123;echo helloo'""" # noqa: E501
|
||||
"""'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"quote between this \\" and this"'"'"';export var2='"'"'"123"'"'"';echo helloo'""" # noqa: E501
|
||||
]
|
||||
|
||||
# Much easier to debug this loop than the function call.
|
||||
|
@ -122,7 +122,7 @@ def test_docker_command_runner():
|
|||
# This string is insane because there are an absurd number of embedded
|
||||
# quotes. While this is a ridiculous string, the escape behavior is
|
||||
# important and somewhat difficult to get right for environment variables.
|
||||
cmd = """'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && docker exec -it container /bin/bash -c '"'"'bash --login -c -i '"'"'"'"'"'"'"'"'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'quote between this " and this'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';export var2=123;echo hello'"'"'"'"'"'"'"'"''"'"' '""" # noqa: E501
|
||||
cmd = """'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && docker exec -it container /bin/bash -c '"'"'bash --login -c -i '"'"'"'"'"'"'"'"'true && source ~/.bashrc && export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && export var1='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"quote between this \\" and this"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';export var2='"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"123"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"'"';echo hello'"'"'"'"'"'"'"'"''"'"' '""" # noqa: E501
|
||||
|
||||
expected = [
|
||||
"ssh", "-tt", "-i", "8265.pem", "-o", "StrictHostKeyChecking=no", "-o",
|
||||
|
@ -135,6 +135,8 @@ def test_docker_command_runner():
|
|||
]
|
||||
# Much easier to debug this loop than the function call.
|
||||
for x, y in zip(process_runner.calls[0], expected):
|
||||
print(f"expeted:\t{y}")
|
||||
print(f"actual: \t{x}")
|
||||
assert x == y
|
||||
process_runner.assert_has_call("1.2.3.4", exact=expected)
|
||||
|
||||
|
|
|
@ -366,10 +366,10 @@ class AutoscalingTest(unittest.TestCase):
|
|||
# These checks are done separately because we have no guarantees on the
|
||||
# order the dict is serialized in.
|
||||
runner.assert_has_call("172.0.0.0", "RAY_OVERRIDE_RESOURCES=")
|
||||
runner.assert_has_call("172.0.0.0", "CPU: 2")
|
||||
runner.assert_has_call("172.0.0.0", "\"CPU\":2")
|
||||
runner.assert_has_call("172.0.0.1", "RAY_OVERRIDE_RESOURCES=")
|
||||
runner.assert_has_call("172.0.0.1", "CPU: 32")
|
||||
runner.assert_has_call("172.0.0.1", "GPU: 8")
|
||||
runner.assert_has_call("172.0.0.1", "\"CPU\":32")
|
||||
runner.assert_has_call("172.0.0.1", "\"GPU\":8")
|
||||
|
||||
def testScaleUpLoadMetrics(self):
|
||||
config = MULTI_WORKER_CLUSTER.copy()
|
||||
|
|
Loading…
Add table
Reference in a new issue