[rfc][ci] create fake docker-compose cluster environment (#20256)

Following #18987 this PR adds a docker-compose based local multi node cluster.

The fake multinode docker comprises two parts. The docker_monitor.py script is a watch script calling docker compose up whenever the docker-compose.yaml changes. The node provider creates and updates the docker compose according to the autoscaling requirements.

This mode fully supports autoscaling and comes with test utilities to start and connect to docker-compose autoscaling environments. There's also a sample test case showing how this can be used.
This commit is contained in:
Kai Fricke 2022-01-10 20:35:36 -08:00 committed by GitHub
parent 4a8a8b30b0
commit 5a7f6e4fdd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 1351 additions and 15 deletions

View file

@ -229,6 +229,26 @@
--test_tag_filters=tests_dir_S,tests_dir_T,tests_dir_U,tests_dir_V,tests_dir_W,tests_dir_X,tests_dir_Y,tests_dir_Z,-example,-py37,-soft_imports,-gpu_only,-rllib
python/ray/tune/...
- label: ":octopus: Tune multinode tests"
conditions: [ "RAY_CI_TUNE_AFFECTED" ]
commands:
- LINUX_WHEELS=1 ./ci/travis/ci.sh build
- mkdir -p ~/.docker/cli-plugins/ && curl -SL https://github.com/docker/compose/releases/download/v2.0.1/docker-compose-linux-x86_64 -o ~/.docker/cli-plugins/docker-compose && chmod +x ~/.docker/cli-plugins/docker-compose
- pip install -U docker aws_requests_auth boto3
- python ./ci/travis/build-docker-images.py --py-versions py37 --device-types cpu --build-type LOCAL --build-base
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only
--test_tag_filters=multinode,-example,-flaky,-py37,-soft_imports,-gpu_only,-rllib
python/ray/tune/...
--test_env=RAY_DOCKER_IMAGE="rayproject/ray:nightly-py37-cpu"
--test_env=RAY_TEMPDIR="/ray-mount"
--test_env=RAY_HOSTDIR="/ray"
--test_env=RAY_TESTHOST="dind-daemon"
--test_env=DOCKER_HOST=tcp://docker:2376
--test_env=DOCKER_TLS_VERIFY=1
--test_env=DOCKER_CERT_PATH=/certs/client
--test_env=DOCKER_TLS_CERTDIR=/certs
- label: ":octopus: Tune examples {w/o tf/pytorch; no RLlib}"
conditions: ["RAY_CI_TUNE_AFFECTED"]
commands:

View file

@ -556,7 +556,8 @@ MERGE = "MERGE"
HUMAN = "HUMAN"
PR = "PR"
BUILDKITE = "BUILDKITE"
BUILD_TYPES = [MERGE, HUMAN, PR, BUILDKITE]
LOCAL = "LOCAL"
BUILD_TYPES = [MERGE, HUMAN, PR, BUILDKITE, LOCAL]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
@ -627,6 +628,7 @@ if __name__ == "__main__":
build_type = args.build_type
is_buildkite = build_type == BUILDKITE
is_local = build_type == LOCAL
if build_type == BUILDKITE:
if os.environ.get("BUILDKITE_PULL_REQUEST", "") == "false":
@ -637,13 +639,13 @@ if __name__ == "__main__":
if build_type == HUMAN:
# If manually triggered, request user for branch and SHA value to use.
_configure_human_version()
if (build_type in {HUMAN, MERGE, BUILDKITE}
if (build_type in {HUMAN, MERGE, BUILDKITE, LOCAL}
or _check_if_docker_files_modified()
or args.only_build_worker_container):
DOCKER_CLIENT = docker.from_env()
is_merge = build_type == MERGE
# Buildkite is authenticated in the background.
if is_merge and not is_buildkite:
if is_merge and not is_buildkite and not is_local:
# We do this here because we want to be authenticated for
# Docker pulls as well as pushes (to avoid rate-limits).
username, password = _get_docker_creds()

View file

@ -1,7 +1,11 @@
.. _fake-multinode:
Testing Autoscaling Locally
===========================
Testing autoscaling behavior is important for autoscaler development and the debugging of applications that depend on autoscaler behavior. You can run the autoscaler locally without needing to launch a real cluster with one of the following methods:
Testing autoscaling behavior is important for autoscaler development and the debugging of applications that depend
on autoscaler behavior. You can run the autoscaler locally without needing to launch a real cluster with one of the
following methods:
Using ``RAY_FAKE_CLUSTER=1 ray start``
--------------------------------------
@ -67,15 +71,141 @@ Python documentation:
.. autoclass:: ray.cluster_utils.AutoscalingCluster
:members:
Features and Limitations
------------------------
Features and Limitations of ``fake_multinode``
----------------------------------------------
Most of the features of the autoscaler are supported in fake multi-node mode. For example, if you update the contents of the YAML file, the autoscaler will pick up the new configuration and apply changes, as it does in a real cluster. Node selection, launch, and termination are governed by the same bin-packing and idle timeout algorithms as in a real cluster.
However, there are a few limitations:
1. All node raylets run uncontainerized on the local machine, and hence they share the same IP address.
1. All node raylets run uncontainerized on the local machine, and hence they share the same IP address. See the :ref:`fake_multinode_docker <fake-multinode-docker>` section for an alternative local multi node setup.
2. Configurations for auth, setup, initialization, Ray start, file sync, and anything cloud-specific are not supported.
3. It's necessary to limit the number of nodes / node CPU / object store memory to avoid overloading your local machine.
.. _fake-multinode-docker:
Testing containerized multi nodes locally with Docker compose
=============================================================
To go one step further and locally test a multi node setup where each node uses its own container (and thus
has a separate filesystem, IP address, and Ray processes), you can use the ``fake_multinode_docker`` node provider.
The setup is very similar to the :ref:`fake_multinode <fake-multinode>` provider. However, you need to start a monitoring process
(``docker_monitor.py``) that takes care of running the ``docker compose`` command.
Prerequisites:
1. Make sure you have `docker <https://docs.docker.com/get-docker/>`_ installed.
2. Make sure you have the `docker compose V2 plugin <https://docs.docker.com/compose/cli-command/#installing-compose-v2>`_ installed.
Using ``RAY_FAKE_CLUSTER=1 ray up``
-----------------------------------
Instructions:
1. Navigate to the root directory of the Ray repo you have cloned locally.
2. Locate the `fake_multi_node/example_docker.yaml <https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/_private/fake_multi_node/example_docker.yaml>`__ example file and fill in the number of CPUs and GPUs the local machine has for the head node type config. The YAML follows the same format as cluster autoscaler configurations, but some fields are not supported.
3. Configure worker types and other autoscaling configs as desired in the YAML file.
4. Make sure the ``shared_volume_dir`` is empty on the host system
5. Start the monitoring process:
.. code-block:: shell
$ python ./python/ray/autoscaler/_private/fake_multi_node/docker_monitor.py \
./python/ray/autoscaler/_private/fake_multi_node/example_docker.yaml
6. Start the Ray cluster using ``ray up``:
.. code-block:: shell
$ RAY_FAKE_CLUSTER=1 ray up -y ./python/ray/autoscaler/_private/fake_multi_node/example_docker.yaml
7. Connect your application to the fake local cluster with ``ray.init("ray://localhost:10002")``.
8. Alternatively, get a shell on the head node:
.. code-block:: shell
$ docker exec -it fake_docker_fffffffffffffffffffffffffffffffffffffffffffffffffff00000_1 bash
Using ``ray.autoscaler._private.fake_multi_node.test_utils.DockerCluster``
--------------------------------------------------------------------------
This utility is used to write tests that use multi node behavior. The ``DockerCluster`` class can
be used to setup a Docker-compose cluster in a temporary directory, start the monitoring process,
wait for the cluster to come up, connect to it, and update the configuration.
Please see the API documentation and example test cases on how to use this utility.
.. autoclass:: ray.autoscaler._private.fake_multi_node.test_utils.DockerCluster
:members:
Features and Limitations of ``fake_multinode_docker``
-----------------------------------------------------
The fake multinode docker node provider provides fully fledged nodes in their own containers. However,
some limitations still remain:
1. Configurations for auth, setup, initialization, Ray start, file sync, and anything cloud-specific are not supported
(but might be in the future).
2. It's necessary to limit the number of nodes / node CPU / object store memory to avoid overloading your local machine.
3. In docker-in-docker setups, a careful setup has to be followed to make the fake multinode docker provider work (see below).
Setting up in a Docker-in-Docker (dind) environment
---------------------------------------------------
When setting up in a Docker-in-Docker (dind) environment (e.g. the Ray OSS Buildkite environment), some
things have to be kept in mind. To make this clear, consider these concepts:
* The **host** is the not-containerized machine on which the code is executed (e.g. Buildkite runner)
* The **outer container** is the container running directly on the **host**. In the Ray OSS Buildkite environment,
two containers are started - a *dind* network host and a container with the Ray source code and wheel in it.
* The **inner container** is a container started by the fake multinode docker node provider.
The control plane for the multinode docker node provider lives in the outer container. However, ``docker compose``
commands are executed from the connected docker-in-docker network. In the Ray OSS Buildkite environment, this is
the ``dind-daemon`` container running on the host docker. If you e.g. mounted ``/var/run/docker.sock`` from the
host instead, it would be the host docker daemon. We will refer to both as the **host daemon** from now on.
The outer container modifies files that have to be mounted in the inner containers (and modified from there
as well). This means that the host daemon also has to have access to these files.
Similarly, the inner containers expose ports - but because the containers are actually started by the host daemon,
the ports are also only accessible on the host (or the dind container).
For the Ray OSS Buildkite environment, we thus set some environment variables:
* ``RAY_TEMPDIR="/ray-mount"``. This environment variable defines where the temporary directory for the
cluster files should be created. This directory has to be accessible by the host, the outer container,
and the inner container. In the inner container, we can control the directory name.
* ``RAY_HOSTDIR="/ray"``. In the case where the shared directory has a different name on the host, we can
rewrite the mount points dynamically. In this example, the outer container is started with ``-v /ray:/ray-mount``
or similar, so the directory on the host is ``/ray`` and in the outer container ``/ray-mount`` (see ``RAY_TEMPDIR``).
* ``RAY_TESTHOST="dind-daemon"`` As the containers are started by the host daemon, we can't just connect to
``localhost``, as the ports are not exposed to the outer container. Thus, we can set the Ray host with this environment
variable.
Lastly, docker-compose obviously requires a docker image. The default docker image is ``rayproject/ray:nightly``.
However, on our Ray OSS Buildkite environment, we want to test changes from the respective PR or Branch. Thus, we set
* ``RAY_DOCKER_IMAGE="rayproject/ray:nightly-py37-cpu"``
which is the name of the docker image we build in one of the previous build steps.
Local development
-----------------
If you're doing local development on the fake multi node docker module, you can set
* ``FAKE_CLUSTER_DEV="auto"``
this will mount the ``ray/python/ray/autoscaler`` directory to the started nodes. Please note that
this is will probably not work in your docker-in-docker setup.

View file

@ -0,0 +1,89 @@
import os
import subprocess
from typing import Dict, List, Tuple
from ray.autoscaler._private.docker import with_docker_exec
from ray.autoscaler.command_runner import CommandRunnerInterface
class FakeDockerCommandRunner(CommandRunnerInterface):
"""Command runner for the fke docker multinode cluster.
This command runner uses ``docker exec`` and ``docker cp`` to
run commands and copy files, respectively.
The regular ``DockerCommandRunner`` is made for use in SSH settings
where Docker runs on a remote hose. In contrast, this command runner
does not wrap the docker commands in ssh calls.
"""
def __init__(self, docker_config, **common_args):
self.container_name = docker_config["container_name"]
self.docker_config = docker_config
self.home_dir = None
self.initialized = False
# Optionally use 'podman' instead of 'docker'
use_podman = docker_config.get("use_podman", False)
self.docker_cmd = "podman" if use_podman else "docker"
def _run_shell(self, cmd: str, timeout: int = 120) -> str:
return subprocess.check_output(
cmd, shell=True, timeout=timeout, encoding="utf-8")
def run(
self,
cmd: str = None,
timeout: int = 120,
exit_on_fail: bool = False,
port_forward: List[Tuple[int, int]] = None,
with_output: bool = False,
environment_variables: Dict[str, object] = None,
run_env: str = "auto",
ssh_options_override_ssh_key: str = "",
shutdown_after_run: bool = False,
) -> str:
prefix = with_docker_exec(
[cmd],
container_name=self.container_name,
with_interactive=False,
docker_cmd=self.docker_cmd)[0]
return self._run_shell(prefix)
def run_init(self, *, as_head: bool, file_mounts: Dict[str, str],
sync_run_yet: bool):
pass
def remote_shell_command_str(self):
return "{} exec -it {} bash".format(self.docker_cmd,
self.container_name)
def run_rsync_down(self, source, target, options=None):
docker_dir = os.path.dirname(self._docker_expand_user(source))
self._run_shell(
f"docker cp {self.container_name}:{docker_dir} {target}")
def run_rsync_up(self, source, target, options=None):
docker_dir = os.path.dirname(self._docker_expand_user(target))
self.run(cmd=f"mkdir -p {docker_dir}")
self._run_shell(
f"docker cp {source} {self.container_name}:{docker_dir}")
def _docker_expand_user(self, string, any_char=False):
user_pos = string.find("~")
if user_pos > -1:
if self.home_dir is None:
self.home_dir = self._run_shell(
with_docker_exec(
["printenv HOME"],
container_name=self.container_name,
docker_cmd=self.docker_cmd)).strip()
if any_char:
return string.replace("~/", self.home_dir + "/")
elif not any_char and user_pos == 0:
return string.replace("~", self.home_dir, 1)
return string

View file

@ -0,0 +1,221 @@
"""Fake multinode docker monitoring script.
This script is the "docker compose server" for the fake_multinode
provider using Docker compose. It should be started before running
`RAY_FAKE_CLUSTER=1 ray up <cluster_config>`.
This script reads the volume directory from a supplied fake multinode
docker cluster config file.
It then waits until a docker-compose.yaml file is created in the same
directory, which is done by the `ray up` command.
It then watches for changes in the docker-compose.yaml file and runs
`docker compose up` whenever changes are detected. This will start docker
containers as requested by the autoscaler.
Generally, the docker-compose.yaml will be mounted in the head node of the
cluster, which will then continue to change it according to the autoscaler
requirements.
Additionally, this script monitors the docker container status using
`docker status` and writes it into a `status.json`. This information is
again used by the autoscaler to determine if any nodes have died.
"""
import argparse
import json
import os
import shutil
import subprocess
import time
import yaml
from typing import Any, List, Dict, Optional
def _read_yaml(path: str):
with open(path, "rt") as f:
return yaml.safe_load(f)
def _update_docker_compose(docker_compose_path: str, project_name: str,
status: Optional[Dict[str, Any]]) -> bool:
docker_compose_config = _read_yaml(docker_compose_path)
if not docker_compose_config:
print("Docker compose currently empty")
return False
cmd = ["up", "-d"]
if status and len(status) > 0:
cmd += ["--no-recreate"]
shutdown = False
if not docker_compose_config["services"]:
# If no more nodes, run `down` instead of `up`
print("Shutting down nodes")
cmd = ["down"]
shutdown = True
try:
subprocess.check_output([
"docker", "compose", "-f", docker_compose_path, "-p", project_name
] + cmd + [
"--remove-orphans",
])
except Exception as e:
print(f"Ran into error when updating docker compose: {e}")
# Ignore error
return shutdown
def _get_ip(project_name: str,
container_name: str,
override_network: Optional[str] = None,
retry_times: int = 3) -> Optional[str]:
network = override_network or f"{project_name}_ray_local"
cmd = [
"docker", "inspect", "-f", "\"{{ .NetworkSettings.Networks"
f".{network}.IPAddress"
" }}\"", f"{container_name}"
]
for i in range(retry_times):
try:
ip_address = subprocess.check_output(cmd, encoding="utf-8")
except Exception:
time.sleep(1)
else:
return ip_address.strip().strip("\"").strip("\\\"")
return None
def _update_docker_status(docker_compose_path: str, project_name: str,
docker_status_path: str):
try:
data_str = subprocess.check_output([
"docker",
"compose",
"-f",
docker_compose_path,
"-p",
project_name,
"ps",
"--format",
"json",
])
data: List[Dict[str, str]] = json.loads(data_str)
except Exception as e:
print(f"Ran into error when fetching status: {e}")
return None
status = {}
for container in data:
node_id = container["Service"]
container_name = container["Name"]
if container["State"] == "running":
ip = _get_ip(project_name, container_name)
else:
ip = ""
container["IP"] = ip
status[node_id] = container
with open(docker_status_path, "wt") as f:
json.dump(status, f)
return status
def monitor_docker(docker_compose_path: str,
status_path: str,
project_name: str,
update_interval: float = 1.):
while not os.path.exists(docker_compose_path):
# Wait until cluster is created
time.sleep(0.5)
print("Docker compose config detected, starting status monitoring")
# Make sure this is always writeable from inside the containers
os.chmod(docker_compose_path, 0o777)
docker_config = {"force_update": True}
# Force update
next_update = time.monotonic() - 1.
shutdown = False
status = None
# Loop:
# If the config changed, update cluster.
# Every `update_interval` seconds, update docker status.
while not shutdown:
new_docker_config = _read_yaml(docker_compose_path)
if new_docker_config != docker_config:
# Update cluster
shutdown = _update_docker_compose(docker_compose_path,
project_name, status)
# Force status update
next_update = time.monotonic() - 1.
if time.monotonic() > next_update:
# Update docker status
status = _update_docker_status(docker_compose_path, project_name,
status_path)
next_update = time.monotonic() + update_interval
docker_config = new_docker_config
time.sleep(0.1)
print("Cluster shut down, terminating monitoring script.")
def start_monitor(config_file: str):
cluster_config = _read_yaml(config_file)
provider_config = cluster_config["provider"]
assert provider_config["type"] == "fake_multinode_docker", (
f"The docker monitor only works with providers of type "
f"`fake_multinode_docker`, got `{provider_config['type']}`")
project_name = provider_config["project_name"]
volume_dir = provider_config["shared_volume_dir"]
os.makedirs(volume_dir, mode=0o755, exist_ok=True)
# Create bootstrap config
bootstrap_config_path = os.path.join(volume_dir, "bootstrap_config.yaml")
shutil.copy(config_file, bootstrap_config_path)
# These two files usually don't exist, yet
docker_compose_config_path = os.path.join(volume_dir,
"docker-compose.yaml")
docker_status_path = os.path.join(volume_dir, "status.json")
if os.path.exists(docker_compose_config_path):
# We wait until this file exists, so remove it if it exists
# from a previous run.
os.remove(docker_compose_config_path)
if os.path.exists(docker_status_path):
os.remove(docker_status_path)
# Create empty file so it can be mounted
with open(docker_status_path, "wt") as f:
f.write("{}")
print(f"Starting monitor process. Please start Ray cluster with:\n"
f" RAY_FAKE_CLUSTER=1 ray up {config_file}")
monitor_docker(docker_compose_config_path, docker_status_path,
project_name)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"config_file",
help="Path to cluster config file containing a fake docker "
"cluster configuration.")
args = parser.parse_args()
start_monitor(args.config_file)

View file

@ -0,0 +1,85 @@
# This is an example config file to start a local
# multi-node cluster using Docker compose.
# It requires the ``docker compose`` plugin to be installed:
# https://docs.docker.com/compose/cli-command/#installing-compose-v2
# The resulting cluster will consist of docker containers
# scheduled via docker compose. These containers behave just like
# regular Ray nodes, have their own IPs, and can SSH into each other.
# They are mostly used to test multi-node setups and autoscaling on
# a single node.
# Example command to start a cluster with this config:
#
# python docker_monitor.py example_docker.yaml &
# RAY_FAKE_DOCKER=1 ray up -y example_docker.yaml
cluster_name: fake_docker
max_workers: 8
provider:
type: fake_multinode_docker
disable_launch_config_check: True
disable_node_updaters: True
# Docker-compose config
project_name: fake_docker
image: rayproject/ray:nightly
shared_volume_dir: /tmp/fake_docker
# For now, this has to be set here separately again:
head_resources:
CPU: 4
GPU: 0
auth:
ssh_user: ubuntu
available_node_types:
ray.head.default:
# You must set this manually to your "head" node resources!! The head
# node is launched via `ray start` and hence the autoscaler cannot
# configure its resources. The resources specified for its node type
# must line up with what Ray detects/is configured with on start.
resources:
CPU: 4
GPU: 0
node_config: {}
max_workers: 0
ray.worker.cpu:
resources:
CPU: 2
object_store_memory: 1000000000
node_config: {}
min_workers: 1
max_workers: 4
ray.worker.gpu:
resources:
CPU: 4
GPU: 1
object_store_memory: 1000000000
node_config: {}
min_workers: 1
max_workers: 2
head_node_type: ray.head.default
upscaling_speed: 1.0
idle_timeout_minutes: 0.1
# The start commands currently don't work - docker doesn't seem to like docker exec
# and Ray only works when including it in the docker-compose command
head_start_ray_commands: []
worker_start_ray_commands: []
# The docker config is currently not propagated to the node provider config.
# Thus, docker-specific configuration is expected to go into the provider part
# as demonstrated above.
docker: {}
#
# !!! Configurations below are not supported in fake cluster mode !!!
#
initialization_commands: []
setup_commands: []
head_setup_commands: []
worker_setup_commands: []
file_mounts: {}
cluster_synced_files: []
file_mounts_sync_continuously: false
rsync_exclude: []
rsync_filter: []

View file

@ -1,14 +1,24 @@
import copy
import json
import logging
import os
import json
import subprocess
from threading import RLock
import time
from types import ModuleType
from typing import Any, Dict, Optional
import yaml
import ray
from ray.autoscaler._private.fake_multi_node.command_runner import \
FakeDockerCommandRunner
from ray.autoscaler.command_runner import CommandRunnerInterface
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import (TAG_RAY_NODE_KIND, NODE_KIND_HEAD,
NODE_KIND_WORKER, TAG_RAY_USER_NODE_TYPE,
TAG_RAY_NODE_NAME, TAG_RAY_NODE_STATUS,
STATUS_UP_TO_DATE)
from ray.ray_constants import DEFAULT_PORT
logger = logging.getLogger(__name__)
@ -18,6 +28,176 @@ logger = logging.getLogger(__name__)
FAKE_HEAD_NODE_ID = "fffffffffffffffffffffffffffffffffffffffffffffffffff00000"
FAKE_HEAD_NODE_TYPE = "ray.head.default"
FAKE_DOCKER_DEFAULT_GCS_PORT = 16379
FAKE_DOCKER_DEFAULT_OBJECT_MANAGER_PORT = 18076
FAKE_DOCKER_DEFAULT_CLIENT_PORT = 10002
DOCKER_COMPOSE_SKELETON = {
"version": "3.9",
"services": {},
"networks": {
"ray_local": {}
}
}
DOCKER_NODE_SKELETON = {
"networks": ["ray_local"],
"mem_limit": "3000m",
"mem_reservation": "3000m",
"shm_size": "1200m",
"volumes": []
}
DOCKER_HEAD_CMD = (
"bash -c \""
"sudo mkdir -p {volume_dir} && "
"sudo chmod 777 {volume_dir} && "
"touch {volume_dir}/.in_docker && "
"sudo chmod 700 ~/.ssh && "
"sudo chmod 600 ~/.ssh/authorized_keys && "
"sudo chmod 600 ~/ray_bootstrap_key.pem && "
"sudo chown ray:users ~/.ssh ~/.ssh/authorized_keys && "
"((sudo apt update && sudo apt install -y openssh-server && "
"sudo service ssh start) || true) && "
"sleep 1 && "
"RAY_FAKE_CLUSTER=1 ray start --head "
"--autoscaling-config=~/ray_bootstrap_config.yaml "
"--object-manager-port=8076 "
"--num-cpus {num_cpus} "
"--num-gpus {num_gpus} "
# "--resources='{resources}' "
"--block\"")
DOCKER_WORKER_CMD = (
"bash -c \""
"sudo mkdir -p {volume_dir} && "
"sudo chmod 777 {volume_dir} && "
"touch {volume_dir}/.in_docker && "
"sudo chmod 700 ~/.ssh && "
"sudo chmod 600 ~/.ssh/authorized_keys && "
"sudo chown ray:users ~/.ssh ~/.ssh/authorized_keys && "
"((sudo apt update && sudo apt install -y openssh-server && "
"sudo service ssh start) || true) && "
"sleep 1 && "
f"ray start --address={FAKE_HEAD_NODE_ID}:6379 "
"--object-manager-port=8076 "
"--num-cpus {num_cpus} "
"--num-gpus {num_gpus} "
# "--resources='{resources}' "
"--block\"")
def host_dir(container_dir: str):
"""Replace local dir with potentially different host dir.
E.g. in docker-in-docker environments, the host dir might be
different to the mounted directory in the container.
This method will do a simple global replace to adjust the paths.
"""
ray_tempdir = os.environ.get("RAY_TEMPDIR", None)
ray_hostdir = os.environ.get("RAY_HOSTDIR", None)
if not ray_tempdir or not ray_hostdir:
return container_dir
return container_dir.replace(ray_tempdir, ray_hostdir)
def create_node_spec(head: bool,
docker_image: str,
mounted_cluster_dir: str,
mounted_node_dir: str,
num_cpus: int = 2,
num_gpus: int = 0,
resources: Optional[Dict] = None,
env_vars: Optional[Dict] = None,
host_gcs_port: int = 16379,
host_object_manager_port: int = 18076,
host_client_port: int = 10002,
volume_dir: Optional[str] = None,
node_state_path: Optional[str] = None,
docker_status_path: Optional[str] = None,
docker_compose_path: Optional[str] = None,
bootstrap_config_path: Optional[str] = None,
private_key_path: Optional[str] = None,
public_key_path: Optional[str] = None):
node_spec = copy.deepcopy(DOCKER_NODE_SKELETON)
node_spec["image"] = docker_image
bootstrap_cfg_path_on_container = "/home/ray/ray_bootstrap_config.yaml"
bootstrap_key_path_on_container = "/home/ray/ray_bootstrap_key.pem"
resources = resources or {}
cmd_kwargs = dict(
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=json.dumps(resources, indent=None),
volume_dir=volume_dir,
autoscaling_config=bootstrap_cfg_path_on_container)
env_vars = env_vars or {}
# Set to "auto" to mount current autoscaler directory to nodes for dev
fake_cluster_dev_dir = os.environ.get("FAKE_CLUSTER_DEV", "")
if fake_cluster_dev_dir:
if fake_cluster_dev_dir == "auto":
local_ray_dir = os.path.dirname(ray.__file__)
else:
local_ray_dir = fake_cluster_dev_dir
os.environ["FAKE_CLUSTER_DEV"] = local_ray_dir
docker_ray_dir = "/home/ray/anaconda3/lib/python3.7/site-packages/ray"
node_spec["volumes"] += [
f"{local_ray_dir}/autoscaler:{docker_ray_dir}/autoscaler:ro",
]
env_vars["FAKE_CLUSTER_DEV"] = local_ray_dir
if head:
node_spec["command"] = DOCKER_HEAD_CMD.format(**cmd_kwargs)
# Expose ports so we can connect to the cluster from outside
node_spec["ports"] = [
f"{host_gcs_port}:{DEFAULT_PORT}",
f"{host_object_manager_port}:8076",
f"{host_client_port}:10001",
]
# Mount status and config files for the head node
node_spec["volumes"] += [
f"{host_dir(node_state_path)}:{node_state_path}",
f"{host_dir(docker_status_path)}:{docker_status_path}",
f"{host_dir(docker_compose_path)}:{docker_compose_path}",
f"{host_dir(bootstrap_config_path)}:"
f"{bootstrap_cfg_path_on_container}",
f"{host_dir(private_key_path)}:{bootstrap_key_path_on_container}",
]
# Create file if it does not exist on local filesystem
for filename in [
node_state_path, docker_status_path, bootstrap_config_path
]:
if not os.path.exists(filename):
with open(filename, "wt") as f:
f.write("{}")
else:
node_spec["command"] = DOCKER_WORKER_CMD.format(**cmd_kwargs)
node_spec["depends_on"] = [FAKE_HEAD_NODE_ID]
# Mount shared directories and ssh access keys
node_spec["volumes"] += [
f"{host_dir(mounted_cluster_dir)}:/cluster/shared",
f"{host_dir(mounted_node_dir)}:/cluster/node",
f"{host_dir(public_key_path)}:/home/ray/.ssh/authorized_keys",
]
# Pass these environment variables (to the head node)
# These variables are propagated by the `docker compose` command.
env_vars.setdefault("RAY_TEMPDIR", os.environ.get("RAY_TEMPDIR", ""))
env_vars.setdefault("RAY_HOSTDIR", os.environ.get("RAY_HOSTDIR", ""))
node_spec["environment"] = [f"{k}={v}" for k, v in env_vars.items()]
return node_spec
class FakeMultiNodeProvider(NodeProvider):
"""A node provider that implements multi-node on a single machine.
@ -31,6 +211,7 @@ class FakeMultiNodeProvider(NodeProvider):
raise RuntimeError(
"FakeMultiNodeProvider requires ray to be started with "
"RAY_FAKE_CLUSTER=1 ray start ...")
self._nodes = {
FAKE_HEAD_NODE_ID: {
"tags": {
@ -59,6 +240,7 @@ class FakeMultiNodeProvider(NodeProvider):
ok = False
if ok:
nodes.append(node_id)
return nodes
def is_running(self, node_id):
@ -73,11 +255,14 @@ class FakeMultiNodeProvider(NodeProvider):
with self.lock:
return self._nodes[node_id]["tags"]
def external_ip(self, node_id):
def _get_ip(self, node_id: str) -> Optional[str]:
return node_id
def external_ip(self, node_id):
return self._get_ip(node_id)
def internal_ip(self, node_id):
return node_id
return self._get_ip(node_id)
def set_node_tags(self, node_id, tags):
raise AssertionError("Readonly node provider cannot be updated")
@ -119,11 +304,304 @@ class FakeMultiNodeProvider(NodeProvider):
def terminate_node(self, node_id):
with self.lock:
node = self._nodes.pop(node_id)["node"]
self._kill_ray_processes(node)
try:
node = self._nodes.pop(node_id)
except Exception as e:
raise e
def _kill_ray_processes(self, node):
node.kill_all_processes(check_alive=False, allow_graceful=True)
self._terminate_node(node)
def _terminate_node(self, node):
node["node"].kill_all_processes(check_alive=False, allow_graceful=True)
@staticmethod
def bootstrap_config(cluster_config):
return cluster_config
class FakeMultiNodeDockerProvider(FakeMultiNodeProvider):
"""A node provider that implements multi-node on a single machine.
This is used for laptop mode testing of multi node functionality
where each node has their own FS and IP."""
def __init__(self, provider_config, cluster_name):
super(FakeMultiNodeDockerProvider, self).__init__(
provider_config, cluster_name)
fake_head = copy.deepcopy(self._nodes)
self._project_name = self.provider_config["project_name"]
self._docker_image = self.provider_config["image"]
self._host_gcs_port = self.provider_config.get(
"host_gcs_port", FAKE_DOCKER_DEFAULT_GCS_PORT)
self._host_object_manager_port = self.provider_config.get(
"host_object_manager_port",
FAKE_DOCKER_DEFAULT_OBJECT_MANAGER_PORT)
self._host_client_port = self.provider_config.get(
"host_client_port", FAKE_DOCKER_DEFAULT_CLIENT_PORT)
self._head_resources = self.provider_config["head_resources"]
# subdirs:
# - ./shared (shared filesystem)
# - ./nodes/<node_id> (node-specific mounted filesystem)
self._volume_dir = self.provider_config["shared_volume_dir"]
self._mounted_cluster_dir = os.path.join(self._volume_dir, "shared")
if not self.in_docker_container:
# Only needed on host
os.makedirs(self._mounted_cluster_dir, mode=0o755, exist_ok=True)
self._boostrap_config_path = os.path.join(self._volume_dir,
"bootstrap_config.yaml")
self._private_key_path = os.path.join(self._volume_dir,
"bootstrap_key.pem")
self._public_key_path = os.path.join(self._volume_dir,
"bootstrap_key.pem.pub")
if not self.in_docker_container:
# Create private key
if not os.path.exists(self._private_key_path):
subprocess.check_output(
f"ssh-keygen -b 2048 -t rsa -q -N \"\" "
f"-f {self._private_key_path}",
shell=True)
# Create public key
if not os.path.exists(self._public_key_path):
subprocess.check_output(
f"ssh-keygen -y "
f"-f {self._private_key_path} "
f"> {self._public_key_path}",
shell=True)
self._docker_compose_config_path = os.path.join(
self._volume_dir, "docker-compose.yaml")
self._docker_compose_config = None
self._node_state_path = os.path.join(self._volume_dir, "nodes.json")
self._docker_status_path = os.path.join(self._volume_dir,
"status.json")
self._load_node_state()
if FAKE_HEAD_NODE_ID not in self._nodes:
# Reset
self._nodes = copy.deepcopy(fake_head)
self._nodes[FAKE_HEAD_NODE_ID][
"node_spec"] = self._create_node_spec_with_resources(
head=True,
node_id=FAKE_HEAD_NODE_ID,
resources=self._head_resources)
self._possibly_terminated_nodes = dict()
self._cleanup_interval = provider_config.get("cleanup_interval", 9.5)
self._docker_status = {}
self._update_docker_compose_config()
self._update_docker_status()
self._save_node_state()
@property
def in_docker_container(self):
return os.path.exists(os.path.join(self._volume_dir, ".in_docker"))
def _create_node_spec_with_resources(self, head: bool, node_id: str,
resources: Dict[str, Any]):
# Create shared directory
node_dir = os.path.join(self._volume_dir, "nodes", node_id)
os.makedirs(node_dir, mode=0o755, exist_ok=True)
resource_str = json.dumps(resources, indent=None)
return create_node_spec(
head=head,
docker_image=self._docker_image,
mounted_cluster_dir=self._mounted_cluster_dir,
mounted_node_dir=node_dir,
num_cpus=resources.pop("CPU", 0),
num_gpus=resources.pop("GPU", 0),
host_gcs_port=self._host_gcs_port,
host_object_manager_port=self._host_object_manager_port,
host_client_port=self._host_client_port,
resources=resources,
env_vars={
"RAY_OVERRIDE_NODE_ID_FOR_TESTING": node_id,
"RAY_OVERRIDE_RESOURCES": resource_str,
},
volume_dir=self._volume_dir,
node_state_path=self._node_state_path,
docker_status_path=self._docker_status_path,
docker_compose_path=self._docker_compose_config_path,
bootstrap_config_path=self._boostrap_config_path,
public_key_path=self._public_key_path,
private_key_path=self._private_key_path)
def _load_node_state(self) -> bool:
if not os.path.exists(self._node_state_path):
return False
try:
with open(self._node_state_path, "rt") as f:
nodes = json.load(f)
except Exception:
return False
if not nodes:
return False
self._nodes = nodes
return True
def _save_node_state(self):
with open(self._node_state_path, "wt") as f:
json.dump(self._nodes, f)
# Make sure this is always writeable from inside the containers
if not self.in_docker_container:
# Only chmod from the outer container
os.chmod(self._node_state_path, 0o777)
def _update_docker_compose_config(self):
config = copy.deepcopy(DOCKER_COMPOSE_SKELETON)
config["services"] = {}
for node_id, node in self._nodes.items():
config["services"][node_id] = node["node_spec"]
with open(self._docker_compose_config_path, "wt") as f:
yaml.safe_dump(config, f)
def _update_docker_status(self):
if not os.path.exists(self._docker_status_path):
return
with open(self._docker_status_path, "rt") as f:
self._docker_status = json.load(f)
def _update_nodes(self):
for node_id in list(self._nodes):
if not self._is_docker_running(node_id):
self._possibly_terminated_nodes.setdefault(
node_id, time.monotonic())
else:
self._possibly_terminated_nodes.pop(node_id, None)
self._cleanup_nodes()
def _cleanup_nodes(self):
for node_id, timestamp in list(
self._possibly_terminated_nodes.items()):
if time.monotonic() > timestamp + self._cleanup_interval:
if not self._is_docker_running(node_id):
self._nodes.pop(node_id, None)
self._possibly_terminated_nodes.pop(node_id, None)
self._save_node_state()
def _container_name(self, node_id):
node_status = self._docker_status.get(node_id, {})
timeout = time.monotonic() + 60
while not node_status:
if time.monotonic() > timeout:
raise RuntimeError(
f"Container for {node_id} never became available.")
time.sleep(1)
self._update_docker_status()
node_status = self._docker_status.get(node_id, {})
return node_status["Name"]
def _is_docker_running(self, node_id):
self._update_docker_status()
return self._docker_status.get(node_id, {}).get("State",
None) == "running"
def non_terminated_nodes(self, tag_filters):
self._update_nodes()
return super(FakeMultiNodeDockerProvider,
self).non_terminated_nodes(tag_filters)
def is_running(self, node_id):
with self.lock:
self._update_nodes()
return node_id in self._nodes and self._is_docker_running(node_id)
def is_terminated(self, node_id):
with self.lock:
self._update_nodes()
return node_id not in self._nodes and not self._is_docker_running(
node_id)
def get_command_runner(self,
log_prefix: str,
node_id: str,
auth_config: Dict[str, Any],
cluster_name: str,
process_runner: ModuleType,
use_internal_ip: bool,
docker_config: Optional[Dict[str, Any]] = None
) -> CommandRunnerInterface:
if self.in_docker_container:
return super(FakeMultiNodeProvider, self).get_command_runner(
log_prefix, node_id, auth_config, cluster_name, process_runner,
use_internal_ip)
# Else, host command runner:
common_args = {
"log_prefix": log_prefix,
"node_id": node_id,
"provider": self,
"auth_config": auth_config,
"cluster_name": cluster_name,
"process_runner": process_runner,
"use_internal_ip": use_internal_ip
}
docker_config["container_name"] = self._container_name(node_id)
docker_config["image"] = self._docker_image
return FakeDockerCommandRunner(docker_config, **common_args)
def _get_ip(self, node_id: str) -> Optional[str]:
for i in range(3):
self._update_docker_status()
ip = self._docker_status.get(node_id, {}).get("IP", None)
if ip:
return ip
time.sleep(3)
return None
def set_node_tags(self, node_id, tags):
assert node_id in self._nodes
self._nodes[node_id]["tags"].update(tags)
def create_node_with_resources(self, node_config, tags, count, resources):
with self.lock:
is_head = tags[TAG_RAY_NODE_KIND] == NODE_KIND_HEAD
if is_head:
next_id = FAKE_HEAD_NODE_ID
else:
next_id = self._next_hex_node_id()
self._nodes[next_id] = {
"tags": tags,
"node_spec": self._create_node_spec_with_resources(
head=is_head, node_id=next_id, resources=resources)
}
self._update_docker_compose_config()
self._save_node_state()
def create_node(self, node_config: Dict[str, Any], tags: Dict[str, str],
count: int) -> Optional[Dict[str, Any]]:
resources = self._head_resources
return self.create_node_with_resources(node_config, tags, count,
resources)
def _terminate_node(self, node):
self._update_docker_compose_config()
self._save_node_state()
@staticmethod
def bootstrap_config(cluster_config):

View file

@ -0,0 +1,230 @@
import json
import logging
import os
import shutil
import subprocess
import tempfile
import time
from typing import Dict, Any, Optional
import yaml
import ray
from ray.autoscaler._private.fake_multi_node.node_provider import \
FAKE_DOCKER_DEFAULT_CLIENT_PORT, FAKE_DOCKER_DEFAULT_GCS_PORT
from ray.util.ml_utils.dict import deep_update
logger = logging.getLogger(__name__)
class DockerCluster:
"""Docker cluster wrapper.
Creates a directory for starting a fake multinode docker cluster.
Includes APIs to update the cluster config as needed in tests,
and to start and connect to the cluster.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self._base_config_file = os.path.join(
os.path.dirname(__file__), "example_docker.yaml")
self._tempdir = None
self._config_file = None
self._nodes_file = None
self._partial_config = config
self._cluster_config = None
self._docker_image = None
self._monitor_script = os.path.join(
os.path.dirname(__file__), "docker_monitor.py")
self._monitor_process = None
@property
def config_file(self):
return self._config_file
@property
def cluster_config(self):
return self._cluster_config
@property
def gcs_port(self):
return self._cluster_config.get("provider", {}).get(
"host_gcs_port", FAKE_DOCKER_DEFAULT_GCS_PORT)
@property
def client_port(self):
return self._cluster_config.get("provider", {}).get(
"host_client_port", FAKE_DOCKER_DEFAULT_CLIENT_PORT)
def connect(self, client: bool = True, timeout: int = 120):
"""Connect to the docker-compose Ray cluster.
Assumes the cluster is at RAY_TESTHOST (defaults to
``127.0.0.1``).
Args:
client (bool): If True, uses Ray client to connect to the
cluster. If False, uses GCS to connect to the cluster.
timeout (int): Connection timeout in seconds.
"""
host = os.environ.get("RAY_TESTHOST", "127.0.0.1")
if client:
port = self.client_port
address = f"ray://{host}:{port}"
else:
port = self.gcs_port
address = f"{host}:{port}"
timeout_at = time.monotonic() + timeout
while time.monotonic() < timeout_at:
try:
ray.init(address)
self.wait_for_resources({"CPU": 1})
except Exception:
time.sleep(1)
continue
else:
break
try:
ray.cluster_resources()
except Exception as e:
raise RuntimeError(f"Timed out connecting to Ray: {e}")
@staticmethod
def wait_for_resources(resources: Dict[str, float], timeout: int = 60):
"""Wait until Ray cluster resources are available
Args:
resources (Dict[str, float]): Minimum resources needed before
this function returns.
timeout (int): Timeout in seconds.
"""
timeout = time.monotonic() + timeout
available = ray.cluster_resources()
while any(available.get(k, 0.) < v for k, v in resources.items()):
if time.monotonic() > timeout:
raise RuntimeError(
f"Timed out waiting for resources: {resources}")
time.sleep(1)
available = ray.cluster_resources()
def update_config(self, config: Optional[Dict[str, Any]] = None):
"""Update autoscaling config.
Does a deep update of the base config with a new configuration.
This can change autoscaling behavior.
Args:
config (Dict[str, Any]): Partial config to update current
config with.
"""
assert self._tempdir, "Call setup() first"
config = config or {}
if config:
self._partial_config = config
if not config.get("provider", {}).get("image"):
# No image specified, trying to parse from buildkite
self._docker_image = os.environ.get("RAY_DOCKER_IMAGE", None)
with open(self._base_config_file, "rt") as f:
cluster_config = yaml.safe_load(f)
if self._partial_config:
deep_update(
cluster_config, self._partial_config, new_keys_allowed=True)
if self._docker_image:
cluster_config["provider"]["image"] = self._docker_image
cluster_config["provider"]["shared_volume_dir"] = self._tempdir
self._cluster_config = cluster_config
with open(self._config_file, "wt") as f:
yaml.safe_dump(self._cluster_config, f)
logging.info(f"Updated cluster config to: {self._cluster_config}")
def maybe_pull_image(self):
if self._docker_image:
try:
images_str = subprocess.check_output(
f"docker image inspect {self._docker_image}", shell=True)
images = json.loads(images_str)
except Exception as e:
logger.error(
f"Error inspecting image {self._docker_image}: {e}")
return
if not images:
try:
subprocess.check_output(
f"docker pull {self._docker_image}", shell=True)
except Exception as e:
logger.error(
f"Error pulling image {self._docker_image}: {e}")
def setup(self):
"""Setup docker compose cluster environment.
Creates the temporary directory, writes the initial config file,
and pulls the docker image, if required.
"""
self._tempdir = tempfile.mkdtemp(
dir=os.environ.get("RAY_TEMPDIR", None))
os.chmod(self._tempdir, 0o777)
self._config_file = os.path.join(self._tempdir, "cluster.yaml")
self._nodes_file = os.path.join(self._tempdir, "nodes.json")
self.update_config()
self.maybe_pull_image()
def teardown(self):
"""Tear down docker compose cluster environment."""
shutil.rmtree(self._tempdir)
self._tempdir = None
self._config_file = None
def _start_monitor(self):
self._monitor_process = subprocess.Popen(
["python", self._monitor_script, self.config_file])
time.sleep(2)
def _stop_monitor(self):
if self._monitor_process:
self._monitor_process.wait(timeout=30)
if self._monitor_process.poll() is None:
self._monitor_process.terminate()
self._monitor_process = None
def start(self):
"""Start docker compose cluster.
Starts the monitor process and runs ``ray up``.
"""
self._start_monitor()
subprocess.check_output(
f"RAY_FAKE_CLUSTER=1 ray up -y {self.config_file}", shell=True)
def stop(self):
"""Stop docker compose cluster.
Runs ``ray down`` and stops the monitor process.
"""
if ray.is_initialized:
ray.shutdown()
subprocess.check_output(
f"RAY_FAKE_CLUSTER=1 ray down -y {self.config_file}", shell=True)
self._stop_monitor()

View file

@ -62,6 +62,12 @@ def _import_fake_multinode(provider_config):
return FakeMultiNodeProvider
def _import_fake_multinode_docker(provider_config):
from ray.autoscaler._private.fake_multi_node.node_provider import \
FakeMultiNodeDockerProvider
return FakeMultiNodeDockerProvider
def _import_kubernetes(provider_config):
from ray.autoscaler._private._kubernetes.node_provider import \
KubernetesNodeProvider
@ -80,6 +86,12 @@ def _import_aliyun(provider_config):
return AliyunNodeProvider
def _load_fake_multinode_docker_defaults_config():
import ray.autoscaler._private.fake_multi_node as ray_fake_multinode
return os.path.join(
os.path.dirname(ray_fake_multinode.__file__), "example_docker.yaml")
def _load_local_defaults_config():
import ray.autoscaler.local as ray_local
return os.path.join(os.path.dirname(ray_local.__file__), "defaults.yaml")
@ -124,6 +136,7 @@ def _import_external(provider_config):
_NODE_PROVIDERS = {
"local": _import_local,
"fake_multinode": _import_fake_multinode,
"fake_multinode_docker": _import_fake_multinode_docker,
"readonly": _import_readonly,
"aws": _import_aws,
"gcp": _import_gcp,
@ -137,6 +150,7 @@ _NODE_PROVIDERS = {
_PROVIDER_PRETTY_NAMES = {
"readonly": "Readonly (Manual Cluster Setup)",
"fake_multinode": "Fake Multinode",
"fake_multinode_docker": "Fake Multinode Docker",
"local": "Local",
"aws": "AWS",
"gcp": "GCP",
@ -148,6 +162,7 @@ _PROVIDER_PRETTY_NAMES = {
}
_DEFAULT_CONFIGS = {
"fake_multinode_docker": _load_fake_multinode_docker_defaults_config,
"local": _load_local_defaults_config,
"aws": _load_aws_defaults_config,
"gcp": _load_gcp_defaults_config,

View file

@ -182,6 +182,14 @@ py_test(
tags = ["team:ml", "tests_dir_L"],
)
py_test(
name = "test_multinode_sync",
size = "large",
srcs = ["tests/test_multinode_sync.py"],
deps = [":tune_lib"],
tags = ["team:ml", "multinode"],
)
py_test(
name = "test_progress_reporter",
size = "medium",

View file

@ -0,0 +1,58 @@
import sys
import time
import unittest
import ray
from ray.autoscaler._private.fake_multi_node.test_utils import DockerCluster
@ray.remote
def remote_task(val):
return val
class MultiNodeSyncTest(unittest.TestCase):
def setUp(self):
self.cluster = DockerCluster()
self.cluster.setup()
def tearDown(self):
self.cluster.stop()
self.cluster.teardown()
def testClusterAutoscaling(self):
"""Sanity check that multinode tests with autoscaling are working"""
self.cluster.update_config({
"provider": {
"head_resources": {
"CPU": 4,
"GPU": 0
}
},
})
self.cluster.start()
self.cluster.connect(client=True, timeout=120)
self.assertGreater(ray.cluster_resources().get("CPU", 0), 0)
# Trigger autoscaling
pg = ray.util.placement_group([{"CPU": 1, "GPU": 1}] * 2)
timeout = time.monotonic() + 120
while ray.cluster_resources().get("GPU", 0) < 2:
if time.monotonic() > timeout:
raise RuntimeError("Autoscaling failed or too slow.")
time.sleep(1)
# Schedule task with resources
self.assertEquals(
5,
ray.get(
remote_task.options(
num_cpus=1, num_gpus=1, placement_group=pg).remote(5)))
print("Autoscaling worked")
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -23,7 +23,7 @@ def merge_dicts(d1: dict, d2: dict) -> dict:
def deep_update(
original: dict,
new_dict: dict,
new_keys_allowed: str = False,
new_keys_allowed: bool = False,
allow_new_subkey_list: Optional[List[str]] = None,
override_all_if_type_changes: Optional[List[str]] = None) -> dict:
"""Updates original dict with values from new_dict recursively.