From 972dddd77669a866659a12d5f803e5cc4c58a622 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Thu, 3 Oct 2019 10:17:00 -0700 Subject: [PATCH] [autoscaler] Kubernetes autoscaler backend (#5492) * Add Kubernetes NodeProvider to autoscaler * Split off SSHCommandRunner * Add KubernetesCommandRunner * Cleanup * More config options * Check if auth present * More auth checks * Better output * Always bootstrap config * All working * Add k8s-rsync comment * Clean up manual k8s examples * Fix up submit.yaml * Automatically configure permissisons * Fix get_node_provider arg * Fix permissions * Fill in empty auth * Remove ray-cluster from this PR * No hard dep on kubernetes library * Move permissions into autoscaler config * lint * Fix indentation * namespace validation * Use cluster name tag * Remove kubernetes from setup.py * Comment in example configs * Same default autoscaling config as aws * Add Kubernetes quickstart * lint * Revert changes to submit.yaml (other PR) * Install kubernetes in travis * address comments * Improve autoscaling doc * kubectl command in setup * Force use_internal_ips * comments * backend env in docs * Change namespace config * comments * comments * Fix yaml test --- ci/travis/install-dependencies.sh | 8 +- doc/source/autoscaling.rst | 204 +++++--- python/ray/autoscaler/autoscaler.py | 11 +- python/ray/autoscaler/commands.py | 24 +- python/ray/autoscaler/kubernetes/__init__.py | 42 ++ python/ray/autoscaler/kubernetes/config.py | 157 ++++++ .../autoscaler/kubernetes/example-full.yaml | 232 +++++++++ .../kubernetes/example-minimal.yaml | 58 +++ .../autoscaler/kubernetes/kubectl-rsync.sh | 25 + .../autoscaler/kubernetes/node_provider.py | 87 ++++ python/ray/autoscaler/node_provider.py | 16 +- python/ray/autoscaler/updater.py | 479 +++++++++++------- python/setup.py | 2 + 13 files changed, 1074 insertions(+), 271 deletions(-) create mode 100644 python/ray/autoscaler/kubernetes/__init__.py create mode 100644 python/ray/autoscaler/kubernetes/config.py create mode 100644 python/ray/autoscaler/kubernetes/example-full.yaml create mode 100644 python/ray/autoscaler/kubernetes/example-minimal.yaml create mode 100755 python/ray/autoscaler/kubernetes/kubectl-rsync.sh create mode 100644 python/ray/autoscaler/kubernetes/node_provider.py diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index c11f488db..da866ef6d 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -25,7 +25,7 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx tabulate psutil + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx tabulate psutil kubernetes elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then sudo apt-get update sudo apt-get install -y python-dev python-numpy build-essential curl unzip tmux gdb @@ -35,14 +35,14 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then export PATH="$HOME/miniconda/bin:$PATH" pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \ feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \ - uvicorn dataclasses pygments werkzeug + uvicorn dataclasses pygments werkzeug kubernetes elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then # Install miniconda. wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx tabulate psutil + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx tabulate psutil kubernetes elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then # Install miniconda. wget https://repo.continuum.io/miniconda/Miniconda3-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv @@ -50,7 +50,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \ feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \ - uvicorn dataclasses pygments werkzeug + uvicorn dataclasses pygments werkzeug kubernetes elif [[ "$LINT" == "1" ]]; then sudo apt-get update sudo apt-get install -y build-essential curl unzip diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index d68fa1659..7de09b845 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -1,20 +1,24 @@ Automatic Cluster Setup ======================= -This document provides instructions for launching a Ray cluster either privately, on AWS, or on GCP. +Ray comes with a built-in autoscaler that makes deploying a Ray cluster simple, just run ``ray up`` from your local machine to start or update a cluster in the cloud or on an on-premise cluster. Once the Ray cluster is running, you can manually SSH into it or use provided commands like ``ray attach``, ``ray rsync-up``, and ``ray-exec`` to access it and run Ray programs. -The ``ray up`` command starts or updates a Ray cluster from your personal computer. Once the cluster is up, you can then SSH into it to run Ray programs. +Setup +----- -Quick start (AWS) ------------------ +This section provides instructions for configuring the autoscaler to launch a Ray cluster on AWS/GCP, an existing Kubernetes cluster, or on a private cluster of host machines. + +Once you have finished configuring the autoscaler to create a cluster, see the Quickstart guide below for more details on how to get started running Ray programs on it. + +AWS +~~~ First, install boto (``pip install boto3``) and configure your AWS credentials in ``~/.aws/credentials``, as described in `the boto docs `__. -Then you're ready to go. The provided `ray/python/ray/autoscaler/aws/example-full.yaml `__ cluster config file will create a small cluster with a m5.large head node (on-demand) configured to autoscale up to two m5.large `spot workers `__. +Once boto is configured to manage resources on your AWS account, you should be ready to run the autoscaler. The provided `ray/python/ray/autoscaler/aws/example-full.yaml `__ cluster config file will create a small cluster with an m5.large head node (on-demand) configured to autoscale up to two m5.large `spot workers `__. -Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="auto")``. +Test that it works by running the following commands from your local machine: .. code-block:: bash @@ -22,22 +26,22 @@ SSH into the head node, ``source activate tensorflow_p36``, and then run Ray pro # out the command that can be used to SSH into the cluster head node. $ ray up ray/python/ray/autoscaler/aws/example-full.yaml - # Reconfigure autoscaling behavior without interrupting running jobs - $ ray up ray/python/ray/autoscaler/aws/example-full.yaml \ - --max-workers=N --no-restart + # Get a remote screen on the head node. + $ ray attach ray/python/ray/autoscaler/aws/example-full.yaml + $ source activate tensorflow_p36 + $ # Try running a Ray program with 'ray.init(address="auto")'. - # Teardown the cluster + # Tear down the cluster. $ ray down ray/python/ray/autoscaler/aws/example-full.yaml -Quick start (GCP) ------------------ +GCP +~~~ First, install the Google API client (``pip install google-api-python-client``), set up your GCP credentials, and create a new GCP project. -Then you're ready to go. The provided `ray/python/ray/autoscaler/gcp/example-full.yaml `__ cluster config file will create a small cluster with a n1-standard-2 head node (on-demand) configured to autoscale up to two n1-standard-2 `preemptible workers `__. Note that you'll need to fill in your project id in those templates. +Once the API client is configured to manage resources on your GCP account, you should be ready to run the autoscaler. The provided `ray/python/ray/autoscaler/gcp/example-full.yaml `__ cluster config file will create a small cluster with a n1-standard-2 head node (on-demand) configured to autoscale up to two n1-standard-2 `preemptible workers `__. Note that you'll need to fill in your project id in those templates. -Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node and then run Ray programs with ``ray.init(address="auto")``. +Test that it works by running the following commands from your local machine: .. code-block:: bash @@ -45,37 +49,112 @@ SSH into the head node and then run Ray programs with ``ray.init(address="auto") # out the command that can be used to SSH into the cluster head node. $ ray up ray/python/ray/autoscaler/gcp/example-full.yaml - # Reconfigure autoscaling behavior without interrupting running jobs - $ ray up ray/python/ray/autoscaler/gcp/example-full.yaml \ - --max-workers=N --no-restart + # Get a remote screen on the head node. + $ ray attach ray/python/ray/autoscaler/gcp/example-full.yaml + $ source activate tensorflow_p36 + $ # Try running a Ray program with 'ray.init(address="auto")'. - # Teardown the cluster + # Tear down the cluster. $ ray down ray/python/ray/autoscaler/gcp/example-full.yaml -Quick start (Private Cluster) ------------------------------ +Kubernetes +~~~~~~~~~~ -This is used when you have a list of machine IP addresses to connect in a Ray cluster. You can get started by filling out the fields in the provided `ray/python/ray/autoscaler/local/example-full.yaml `__. -Be sure to specify the proper ``head_ip``, list of ``worker_ips``, and the ``ssh_user`` field. +The autoscaler can also be used to start Ray clusters on an existing Kubernetes cluster. First, install the Kubernetes API client (``pip install kubernetes``), then make sure your Kubernetes credentials are set up properly to access the cluster (if a command like ``kubectl get pods`` succeeds, you should be good to go). -Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node and then run Ray programs with ``ray.init(address="auto")``. +Once you have ``kubectl`` configured locally to access the remote cluster, you should be ready to run the autoscaler. The provided `ray/python/ray/autoscaler/kubernetes/example-full.yaml `__ cluster config file will create a small cluster of one pod for the head node configured to autoscale up to two worker node pods, with all pods requiring 1 CPU and 0.5GiB of memory. + +Test that it works by running the following commands from your local machine: .. code-block:: bash # Create or update the cluster. When the command finishes, it will print - # out the command that can be used to SSH into the cluster head node. + # out the command that can be used to get a remote shell into the head node. + $ ray up ray/python/ray/autoscaler/kubernetes/example-full.yaml + + # List the pods running in the cluster. You shoud only see one head node + # until you start running an application, at which point worker nodes + # should be started. Don't forget to include the Ray namespace in your + # 'kubectl' commands ('ray' by default). + $ kubectl -n ray get pods + + # Get a remote screen on the head node. + $ ray attach ray/python/ray/autoscaler/kubernetes/example-full.yaml + $ # Try running a Ray program with 'ray.init(address="auto")'. + + # Tear down the cluster + $ ray down ray/python/ray/autoscaler/kubernetes/example-full.yaml + +Private Cluster +~~~~~~~~~~~~~~~ + +The autoscaler can also be used to run a Ray cluster on a private cluster of hosts, specified as a list of machine IP addresses to connect to. You can get started by filling out the fields in the provided `ray/python/ray/autoscaler/local/example-full.yaml `__. +Be sure to specify the proper ``head_ip``, list of ``worker_ips``, and the ``ssh_user`` field. + +Test that it works by running the following commands from your local machine: + +.. code-block:: bash + + # Create or update the cluster. When the command finishes, it will print + # out the command that can be used to get a remote shell into the head node. $ ray up ray/python/ray/autoscaler/local/example-full.yaml - # Reconfigure autoscaling behavior without interrupting running jobs - $ ray up ray/python/ray/autoscaler/local/example-full.yaml \ - --max-workers=N --no-restart + # Get a remote screen on the head node. + $ ray attach ray/python/ray/autoscaler/local/example-full.yaml + $ # Try running a Ray program with 'ray.init(address="auto")'. - # Teardown the cluster + # Tear down the cluster $ ray down ray/python/ray/autoscaler/local/example-full.yaml +External Node Provider +~~~~~~~~~~~~~~~~~~~~~~ + +Ray also supports external node providers (check `node_provider.py `__ implementation). +You can specify the external node provider using the yaml config: + +.. code-block:: yaml + + provider: + type: external + module: mypackage.myclass + +The module needs to be in the format `package.provider_class` or `package.sub_package.provider_class`. + +Additional Cloud Providers +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To use Ray autoscaling on other Cloud providers or cluster management systems, you can implement the ``NodeProvider`` interface (~100 LOC) and register it in `node_provider.py `__. Contributions are welcome! + +Quickstart +---------- + +Starting and updating a cluster +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When you run ``ray up`` with an existing cluster, the command checks if the local configuration differs from the applied configuration of the cluster. This includes any changes to synced files specified in the ``file_mounts`` section of the config. If so, the new files and config will be uploaded to the cluster. Following that, Ray services will be restarted. + +You can also run ``ray up`` to restart a cluster if it seems to be in a bad state (this will restart all Ray services even if there are no config changes). + +If you don't want the update to restart services (e.g., because the changes don't require a restart), pass ``--no-restart`` to the update call. + +.. code-block:: bash + + # Replace '' with one of: 'aws', 'gcp', 'kubernetes', or 'local'. + $ BACKEND= + + # Create or update the cluster. + $ ray up ray/python/ray/autoscaler/$BACKEND/example-full.yaml + + # Reconfigure autoscaling behavior without interrupting running jobs. + $ ray up ray/python/ray/autoscaler/$BACKEND/example-full.yaml \ + --max-workers=N --no-restart + + # Tear down the cluster. + $ ray down ray/python/ray/autoscaler/$BACKEND/example-full.yaml + + Running commands on new and existing clusters ---------------------------------------------- +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="auto")``. @@ -108,10 +187,10 @@ You can also use ``ray submit`` to execute Python scripts on clusters. This will $ ray submit cluster.yaml --tmux --start --stop tune_experiment.py -Attaching to the cluster ------------------------- +Attaching to a running cluster +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -You can use ``ray attach`` to attach to an interactive console on the cluster. +You can use ``ray attach`` to attach to an interactive screen session on the cluster. .. code-block:: bash @@ -126,16 +205,16 @@ You can use ``ray attach`` to attach to an interactive console on the cluster. Port-forwarding applications ----------------------------- +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -To run connect to applications running on the cluster (e.g. Jupyter notebook) using a web browser, you can use the port-forward option for ``ray exec``. The local port opened is the same as the remote port: +If you want to run applications on the cluster that are accessible from a web browser (e.g., Jupyter notebook), you can use the ``--port-forward`` option for ``ray exec``. The local port opened is the same as the remote port. .. code-block:: bash $ ray exec cluster.yaml --port-forward=8899 'source ~/anaconda3/bin/activate tensorflow_p36 && jupyter notebook --port=8899' Manually synchronizing files ----------------------------- +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To download or upload files to the cluster head node, use ``ray rsync_down`` or ``ray rsync_up``: @@ -144,29 +223,20 @@ To download or upload files to the cluster head node, use ``ray rsync_down`` or $ ray rsync_down cluster.yaml '/path/on/cluster' '/local/path' $ ray rsync_up cluster.yaml '/local/path' '/path/on/cluster' -Updating your cluster ---------------------- - -When you run ``ray up`` with an existing cluster, the command checks if the local configuration differs from the applied configuration of the cluster. This includes any changes to synced files specified in the ``file_mounts`` section of the config. If so, the new files and config will be uploaded to the cluster. Following that, Ray services will be restarted. - -You can also run ``ray up`` to restart a cluster if it seems to be in a bad state (this will restart all Ray services even if there are no config changes). - -If you don't want the update to restart services (e.g. because the changes don't require a restart), pass ``--no-restart`` to the update call. - Security --------- +~~~~~~~~ -By default, the nodes will be launched into their own security group, with traffic allowed only between nodes in the same group. A new SSH key will also be created and saved to your local machine for access to the cluster. +On cloud providers, nodes will be launched into their own security group by default, with traffic allowed only between nodes in the same group. A new SSH key will also be created and saved to your local machine for access to the cluster. Autoscaling ------------ +~~~~~~~~~~~ -Ray clusters come with a load-based auto-scaler. When cluster resource usage exceeds a configurable threshold (80% by default), new nodes will be launched up the specified ``max_workers`` limit. When nodes are idle for more than a timeout, they will be removed, down to the ``min_workers`` limit. The head node is never removed. +Ray clusters come with a load-based autoscaler. When cluster resource usage exceeds a configurable threshold (80% by default), new nodes will be launched up the specified ``max_workers`` limit. When nodes are idle for more than a timeout, they will be removed, down to the ``min_workers`` limit. The head node is never removed. The default idle timeout is 5 minutes. This is to prevent excessive node churn which could impact performance and increase costs (in AWS / GCP there is a minimum billing charge of 1 minute per instance, after which usage is billed by the second). Monitoring cluster status -------------------------- +~~~~~~~~~~~~~~~~~~~~~~~~~ You can monitor cluster usage and auto-scaling status by tailing the autoscaling logs in ``/tmp/ray/session_*/logs/monitor*``. @@ -176,18 +246,18 @@ The Ray autoscaler also reports per-node status in the form of instance tags. In .. image:: autoscaler-status.png Customizing cluster setup -------------------------- +~~~~~~~~~~~~~~~~~~~~~~~~~ You are encouraged to copy the example YAML file and modify it to your needs. This may include adding additional setup commands to install libraries or sync local data files. -.. note:: After you have customized the nodes, it is also a good idea to create a new machine image and use that in the config file. This reduces worker setup time, improving the efficiency of auto-scaling. +.. note:: After you have customized the nodes, it is also a good idea to create a new machine image (or docker container) and use that in the config file. This reduces worker setup time, improving the efficiency of auto-scaling. The setup commands you use should ideally be *idempotent*, that is, can be run more than once. This allows Ray to update nodes after they have been created. You can usually make commands idempotent with small modifications, e.g. ``git clone foo`` can be rewritten as ``test -e foo || git clone foo`` which checks if the repo is already cloned first. Most of the example YAML file is optional. Here is a `reference minimal YAML file `__, and you can find the defaults for `optional fields in this YAML file `__. Syncing git branches --------------------- +~~~~~~~~~~~~~~~~~~~~ A common use case is syncing a particular local git branch to all workers of the cluster. However, if you just put a `git checkout ` in the setup commands, the autoscaler won't know when to rerun the command to pull in updates. There is a nice workaround for this by including the git SHA in the input (the hash of the file will change if the branch is updated): @@ -208,7 +278,7 @@ This tells ``ray up`` to sync the current git branch SHA from your personal comp 3. Update files on your Ray cluster with ``ray up`` Common cluster configurations ------------------------------ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ``example-full.yaml`` configuration is enough to get started with Ray, but for more compute intensive workloads you will want to change the instance types to e.g. use GPU or larger compute instance by editing the yaml file. Here are a few common configurations: @@ -270,28 +340,8 @@ with GPU worker nodes instead. MarketType: spot InstanceType: p2.xlarge - -External Node Provider --------------------------- - -Ray also supports external node providers (check `node_provider.py `__ implementation). -You can specify the external node provider using the yaml config: - -.. code-block:: yaml - - provider: - type: external - module: mypackage.myclass - -The module needs to be in the format `package.provider_class` or `package.sub_package.provider_class`. - -Additional Cloud providers --------------------------- - -To use Ray autoscaling on other Cloud providers or cluster management systems, you can implement the ``NodeProvider`` interface (~100 LOC) and register it in `node_provider.py `__. Contributions are welcome! - Questions or Issues? --------------------- +~~~~~~~~~~~~~~~~~~~~ You can post questions or issues or feedback through the following channels: diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index fc25046b0..f6b389134 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -77,6 +77,12 @@ CLUSTER_CONFIG_SCHEMA = { "head_ip": (str, OPTIONAL), # local cluster head node "worker_ips": (list, OPTIONAL), # local cluster worker nodes "use_internal_ips": (bool, OPTIONAL), # don't require public ips + "namespace": (str, OPTIONAL), # k8s namespace, if using k8s + + # k8s autoscaler permissions, if using k8s + "autoscaler_service_account": (dict, OPTIONAL), + "autoscaler_role": (dict, OPTIONAL), + "autoscaler_role_binding": (dict, OPTIONAL), "extra_config": (dict, OPTIONAL), # provider-specific config # Whether to try to reuse previously stopped nodes instead of @@ -89,10 +95,10 @@ CLUSTER_CONFIG_SCHEMA = { # How Ray will authenticate with newly launched nodes. "auth": ( { - "ssh_user": (str, REQUIRED), # e.g. ubuntu + "ssh_user": (str, OPTIONAL), # e.g. ubuntu "ssh_private_key": (str, OPTIONAL), }, - REQUIRED), + OPTIONAL), # Docker configuration. If this is specified, all setup and start commands # will be executed in the container. @@ -812,6 +818,7 @@ def fillout_defaults(config): defaults.update(config) merge_setup_commands(defaults) dockerize_if_needed(defaults) + defaults["auth"] = defaults.get("auth", {}) return defaults diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 24bc1a2b5..43ec645d3 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -76,13 +76,12 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name): config = yaml.safe_load(open(config_file).read()) if override_cluster_name is not None: config["cluster_name"] = override_cluster_name - validate_config(config) config = fillout_defaults(config) + validate_config(config) confirm("This will destroy your cluster", yes) provider = get_node_provider(config["provider"], config["cluster_name"]) - try: def remaining_nodes(): @@ -215,9 +214,10 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, logger.info("get_or_create_head_node: Updating files on head node...") # Rewrite the auth config so that the head node can update the workers - remote_key_path = "~/ray_bootstrap_key.pem" remote_config = copy.deepcopy(config) - remote_config["auth"]["ssh_private_key"] = remote_key_path + if config["provider"]["type"] != "kubernetes": + remote_key_path = "~/ray_bootstrap_key.pem" + remote_config["auth"]["ssh_private_key"] = remote_key_path # Adjust for new file locations new_mounts = {} @@ -232,9 +232,12 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, remote_config_file.write(json.dumps(remote_config)) remote_config_file.flush() config["file_mounts"].update({ - remote_key_path: config["auth"]["ssh_private_key"], "~/ray_bootstrap_config.yaml": remote_config_file.name }) + if config["provider"]["type"] != "kubernetes": + config["file_mounts"].update({ + remote_key_path: config["auth"]["ssh_private_key"], + }) if restart_only: init_commands = [] @@ -278,7 +281,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, "Head node up-to-date, IP address is: {}".format(head_node_ip)) monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*" - use_docker = bool(config["docker"]["container_name"]) + use_docker = "docker" in config and bool( + config["docker"]["container_name"]) if override_cluster_name: modifiers = " --cluster-name={}".format( quote(override_cluster_name)) @@ -291,10 +295,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, print("To open a console on the cluster:\n\n" " ray attach {}{}\n".format(config_file, modifiers)) - print("To ssh manually to the cluster, run:\n\n" - " ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"], - config["auth"]["ssh_user"], - head_node_ip)) + print("To get a remote shell to the cluster manually, run:\n\n" + " {}\n".format(updater.cmd_runner.remote_shell_command_str())) finally: provider.cleanup() @@ -424,7 +426,7 @@ def _exec(updater, cmd, screen, tmux, port_forward=None): quote(cmd + "; exec bash") ] cmd = " ".join(cmd) - updater.ssh_cmd( + updater.cmd_runner.run( cmd, allocate_tty=True, exit_on_fail=True, diff --git a/python/ray/autoscaler/kubernetes/__init__.py b/python/ray/autoscaler/kubernetes/__init__.py new file mode 100644 index 000000000..b6328f6e2 --- /dev/null +++ b/python/ray/autoscaler/kubernetes/__init__.py @@ -0,0 +1,42 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import kubernetes +from kubernetes.config.config_exception import ConfigException + +_configured = False +_core_api = None +_auth_api = None + + +def _load_config(): + global _configured + if _configured: + return + try: + kubernetes.config.load_incluster_config() + except ConfigException: + kubernetes.config.load_kube_config() + _configured = True + + +def core_api(): + global _core_api + if _core_api is None: + _load_config() + _core_api = kubernetes.client.CoreV1Api() + + return _core_api + + +def auth_api(): + global _auth_api + if _auth_api is None: + _load_config() + _auth_api = kubernetes.client.RbacAuthorizationV1Api() + + return _auth_api + + +log_prefix = "KubernetesNodeProvider: " diff --git a/python/ray/autoscaler/kubernetes/config.py b/python/ray/autoscaler/kubernetes/config.py new file mode 100644 index 000000000..a16eb33a0 --- /dev/null +++ b/python/ray/autoscaler/kubernetes/config.py @@ -0,0 +1,157 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging + +from kubernetes import client + +from ray.autoscaler.kubernetes import auth_api, core_api, log_prefix + +logger = logging.getLogger(__name__) + + +class InvalidNamespaceError(ValueError): + def __init__(self, field_name, namespace): + self.message = ("Namespace of {} config doesn't match provided " + "namespace '{}'. Either set it to {} or remove the " + "field".format(field_name, namespace, namespace)) + + def __str__(self): + return self.message + + +def using_existing_msg(resource_type, name): + return "using existing {} '{}'".format(resource_type, name) + + +def not_found_msg(resource_type, name): + return "{} '{}' not found, attempting to create it".format( + resource_type, name) + + +def created_msg(resource_type, name): + return "successfully created {} '{}'".format(resource_type, name) + + +def not_provided_msg(resource_type): + return "no {} config provided, must already exist".format(resource_type) + + +def bootstrap_kubernetes(config): + if not config["provider"]["use_internal_ips"]: + return ValueError("Exposing external IP addresses for ray pods isn't " + "currently supported. Please set " + "'use_internal_ips' to false.") + namespace = _configure_namespace(config["provider"]) + _configure_autoscaler_service_account(namespace, config["provider"]) + _configure_autoscaler_role(namespace, config["provider"]) + _configure_autoscaler_role_binding(namespace, config["provider"]) + return config + + +def _configure_namespace(provider_config): + namespace_field = "namespace" + if namespace_field not in provider_config: + raise ValueError("Must specify namespace in Kubernetes config.") + + namespace = provider_config[namespace_field] + field_selector = "metadata.name={}".format(namespace) + namespaces = core_api().list_namespace(field_selector=field_selector).items + if len(namespaces) > 0: + assert len(namespaces) == 1 + logger.info(log_prefix + + using_existing_msg(namespace_field, namespace)) + return namespace + + logger.info(log_prefix + not_found_msg(namespace_field, namespace)) + namespace_config = client.V1Namespace( + metadata=client.V1ObjectMeta(name=namespace)) + core_api().create_namespace(namespace_config) + logger.info(log_prefix + created_msg(namespace_field, namespace)) + return namespace + + +def _configure_autoscaler_service_account(namespace, provider_config): + account_field = "autoscaler_service_account" + if account_field not in provider_config: + logger.info(log_prefix + not_provided_msg(account_field)) + return + + account = provider_config[account_field] + if "namespace" not in account["metadata"]: + account["metadata"]["namespace"] = namespace + elif account["metadata"]["namespace"] != namespace: + raise InvalidNamespaceError(account_field, namespace) + + name = account["metadata"]["name"] + field_selector = "metadata.name={}".format(name) + accounts = core_api().list_namespaced_service_account( + namespace, field_selector=field_selector).items + if len(accounts) > 0: + assert len(accounts) == 1 + logger.info(log_prefix + using_existing_msg(account_field, name)) + return + + logger.info(log_prefix + not_found_msg(account_field, name)) + core_api().create_namespaced_service_account(namespace, account) + logger.info(log_prefix + created_msg(account_field, name)) + + +def _configure_autoscaler_role(namespace, provider_config): + role_field = "autoscaler_role" + if role_field not in provider_config: + logger.info(log_prefix + not_provided_msg(role_field)) + return + + role = provider_config[role_field] + if "namespace" not in role["metadata"]: + role["metadata"]["namespace"] = namespace + elif role["metadata"]["namespace"] != namespace: + raise InvalidNamespaceError(role_field, namespace) + + name = role["metadata"]["name"] + field_selector = "metadata.name={}".format(name) + accounts = auth_api().list_namespaced_role( + namespace, field_selector=field_selector).items + if len(accounts) > 0: + assert len(accounts) == 1 + logger.info(log_prefix + using_existing_msg(role_field, name)) + return + + logger.info(log_prefix + not_found_msg(role_field, name)) + auth_api().create_namespaced_role(namespace, role) + logger.info(log_prefix + created_msg(role_field, name)) + + +def _configure_autoscaler_role_binding(namespace, provider_config): + binding_field = "autoscaler_role_binding" + if binding_field not in provider_config: + logger.info(log_prefix + not_provided_msg(binding_field)) + return + + binding = provider_config[binding_field] + if "namespace" not in binding["metadata"]: + binding["metadata"]["namespace"] = namespace + elif binding["metadata"]["namespace"] != namespace: + raise InvalidNamespaceError(binding_field, namespace) + for subject in binding["subjects"]: + if "namespace" not in subject: + subject["namespace"] = namespace + elif subject["namespace"] != namespace: + raise InvalidNamespaceError( + binding_field + " subject '{}'".format(subject["name"]), + namespace) + + name = binding["metadata"]["name"] + field_selector = "metadata.name={}".format(name) + accounts = auth_api().list_namespaced_role_binding( + namespace, field_selector=field_selector).items + if len(accounts) > 0: + assert len(accounts) == 1 + logger.info(log_prefix + using_existing_msg(binding_field, name)) + return + + logger.info(log_prefix + not_found_msg(binding_field, name)) + auth_api().create_namespaced_role_binding(namespace, binding) + logger.info(log_prefix + created_msg(binding_field, name)) diff --git a/python/ray/autoscaler/kubernetes/example-full.yaml b/python/ray/autoscaler/kubernetes/example-full.yaml new file mode 100644 index 000000000..df6c97f06 --- /dev/null +++ b/python/ray/autoscaler/kubernetes/example-full.yaml @@ -0,0 +1,232 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: default + +# The minimum number of workers nodes to launch in addition to the head +# node. This number should be >= 0. +min_workers: 0 + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. +max_workers: 2 + +# The initial number of worker nodes to launch in addition to the head +# node. When the cluster is first brought up (or when it is refreshed with a +# subsequent `ray up`) this number of nodes will be started. +initial_workers: 0 + +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +# This value must be less than 1.0 for scaling to happen. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Kubernetes resources that need to be configured for the autoscaler to be +# able to manage the Ray cluster. If any of the provided resources don't +# exist, the autoscaler will attempt to create them. If this fails, you may +# not have the required permissions and will have to request them to be +# created by your cluster administrator. +provider: + type: kubernetes + + # Exposing external IP addresses for ray pods isn't currently supported. + use_internal_ips: true + + # Namespace to use for all resources created. + namespace: ray + + # ServiceAccount created by the autoscaler for the head node pod that it + # runs in. If this field isn't provided, the head pod config below must + # contain a user-created service account with the proper permissions. + autoscaler_service_account: + apiVersion: v1 + kind: ServiceAccount + metadata: + name: autoscaler + + # Role created by the autoscaler for the head node pod that it runs in. + # If this field isn't provided, the role referenced in + # autoscaler_role_binding must exist and have at least these permissions. + autoscaler_role: + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: autoscaler + rules: + - apiGroups: [""] + resources: ["pods", "pods/status", "pods/exec"] + verbs: ["get", "watch", "list", "create", "delete", "patch"] + + # RoleBinding created by the autoscaler for the head node pod that it runs + # in. If this field isn't provided, the head pod config below must contain + # a user-created service account with the proper permissions. + autoscaler_role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + name: autoscaler + subjects: + - kind: ServiceAccount + name: autoscaler + roleRef: + kind: Role + name: autoscaler + apiGroup: rbac.authorization.k8s.io + +# Kubernetes pod config for the head node pod. +head_node: + apiVersion: v1 + kind: Pod + metadata: + # Automatically generates a name for the pod with this prefix. + generateName: ray-head- + spec: + # Change this if you altered the autoscaler_service_account above + # or want to provide your own. + serviceAccountName: autoscaler + + # Restarting the head node automatically is not currently supported. + # If the head node goes down, `ray up` must be run again. + restartPolicy: Never + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumes: + - name: dshm + emptyDir: + medium: Memory + + containers: + - name: ray-node + imagePullPolicy: Always + # You are free (and encouraged) to use your own container image, + # but it should have the following installed: + # - rsync (used for `ray rsync` commands and file mounts) + # - screen (used for `ray attach`) + # - kubectl (used by the autoscaler to manage worker pods) + image: rayproject/autoscaler + # Do not change this command - it keeps the pod alive until it is + # explicitly killed. + command: ["/bin/bash", "-c", "--"] + args: ["trap : TERM INT; sleep infinity & wait;"] + ports: + - containerPort: 6379 # Redis port. + - containerPort: 6380 # Redis port. + - containerPort: 6381 # Redis port. + - containerPort: 12345 # Ray internal communication. + - containerPort: 12346 # Ray internal communication. + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - mountPath: /dev/shm + name: dshm + resources: + requests: + cpu: 1000m + memory: 512Mi + env: + # This is used in the head_start_ray_commands below so that + # Ray can spawn the correct number of processes. Omitting this + # may lead to degraded performance. + - name: MY_CPU_REQUEST + valueFrom: + resourceFieldRef: + resource: requests.cpu + +# Kubernetes pod config for worker node pods. +worker_nodes: + apiVersion: v1 + kind: Pod + metadata: + # Automatically generates a name for the pod with this prefix. + generateName: ray-worker- + spec: + serviceAccountName: default + + # Worker nodes will be managed automatically by the head node, so + # do not change the restart policy. + restartPolicy: Never + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumes: + - name: dshm + emptyDir: + medium: Memory + + containers: + - name: ray-node + imagePullPolicy: Always + # You are free (and encouraged) to use your own container image, + # but it should have the following installed: + # - rsync (used for `ray rsync` commands and file mounts) + image: rayproject/autoscaler + # Do not change this command - it keeps the pod alive until it is + # explicitly killed. + command: ["/bin/bash", "-c", "--"] + args: ["trap : TERM INT; sleep infinity & wait;"] + ports: + - containerPort: 12345 # Ray internal communication. + - containerPort: 12346 # Ray internal communication. + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - mountPath: /dev/shm + name: dshm + resources: + requests: + cpu: 1000m + memory: 512Mi + env: + # This is used in the head_start_ray_commands below so that + # Ray can spawn the correct number of processes. Omitting this + # may lead to degraded performance. + - name: MY_CPU_REQUEST + valueFrom: + resourceFieldRef: + resource: requests.cpu + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { +# "/path1/on/remote/machine": "/path1/on/local/machine", +# "/path2/on/remote/machine": "/path2/on/local/machine", +} + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: [] + +# List of shell commands to run to set up nodes. +setup_commands: [] + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --num-cpus=$MY_CPU_REQUEST --redis-address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/python/ray/autoscaler/kubernetes/example-minimal.yaml b/python/ray/autoscaler/kubernetes/example-minimal.yaml new file mode 100644 index 000000000..62cf855db --- /dev/null +++ b/python/ray/autoscaler/kubernetes/example-minimal.yaml @@ -0,0 +1,58 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: minimal + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. min_workers default to 0. +max_workers: 1 + +# Kubernetes resources that need to be configured for the autoscaler to be +# able to manage the Ray cluster. If any of the provided resources don't +# exist, the autoscaler will attempt to create them. If this fails, you may +# not have the required permissions and will have to request them to be +# created by your cluster administrator. +provider: + type: kubernetes + + # Exposing external IP addresses for ray pods isn't currently supported. + use_internal_ips: true + + # Namespace to use for all resources created. + namespace: ray + + # ServiceAccount created by the autoscaler for the head node pod that it + # runs in. If this field isn't provided, the head pod config below must + # contain a user-created service account with the proper permissions. + autoscaler_service_account: + apiVersion: v1 + kind: ServiceAccount + metadata: + name: autoscaler + + # Role created by the autoscaler for the head node pod that it runs in. + # If this field isn't provided, the role referenced in + # autoscaler_role_binding must exist and have at least these permissions. + autoscaler_role: + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: autoscaler + rules: + - apiGroups: [""] + resources: ["pods", "pods/status", "pods/exec"] + verbs: ["get", "watch", "list", "create", "delete", "patch"] + + # RoleBinding created by the autoscaler for the head node pod that it runs + # in. If this field isn't provided, the head pod config below must contain + # a user-created service account with the proper permissions. + autoscaler_role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + name: autoscaler + subjects: + - kind: ServiceAccount + name: autoscaler + roleRef: + kind: Role + name: autoscaler + apiGroup: rbac.authorization.k8s.io diff --git a/python/ray/autoscaler/kubernetes/kubectl-rsync.sh b/python/ray/autoscaler/kubernetes/kubectl-rsync.sh new file mode 100755 index 000000000..eb756c2d7 --- /dev/null +++ b/python/ray/autoscaler/kubernetes/kubectl-rsync.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Helper script to use kubectl as a remote shell for rsync to sync files +# to/from pods that have rsync installed. Taken from: +# https://serverfault.com/questions/741670/rsync-files-to-a-kubernetes-pod/746352 + +if [ -z "$KRSYNC_STARTED" ]; then + export KRSYNC_STARTED=true + exec rsync --blocking-io --rsh "$0" $@ +fi + +# Running as --rsh +namespace='' +pod=$1 +shift + +# If use uses pod@namespace rsync passes as: {us} -l pod namespace ... +if [ "X$pod" = "X-l" ]; then + pod=$1 + shift + namespace="-n $1" + shift +fi + +exec kubectl $namespace exec -i $pod -- "$@" diff --git a/python/ray/autoscaler/kubernetes/node_provider.py b/python/ray/autoscaler/kubernetes/node_provider.py new file mode 100644 index 000000000..c5af0ce80 --- /dev/null +++ b/python/ray/autoscaler/kubernetes/node_provider.py @@ -0,0 +1,87 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging + +from ray.autoscaler.kubernetes import core_api, log_prefix +from ray.autoscaler.node_provider import NodeProvider +from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME + +logger = logging.getLogger(__name__) + + +def to_label_selector(tags): + label_selector = "" + for k, v in tags.items(): + if label_selector != "": + label_selector += "," + label_selector += "{}={}".format(k, v) + return label_selector + + +class KubernetesNodeProvider(NodeProvider): + def __init__(self, provider_config, cluster_name): + NodeProvider.__init__(self, provider_config, cluster_name) + self.cluster_name = cluster_name + self.namespace = provider_config["namespace"] + + def non_terminated_nodes(self, tag_filters): + # Match pods that are in the 'Pending' or 'Running' phase. + # Unfortunately there is no OR operator in field selectors, so we + # have to match on NOT any of the other phases. + field_selector = ",".join([ + "status.phase!=Failed", + "status.phase!=Unknown", + "status.phase!=Succeeded", + "status.phase!=Terminating", + ]) + + tag_filters[TAG_RAY_CLUSTER_NAME] = self.cluster_name + label_selector = to_label_selector(tag_filters) + pod_list = core_api().list_namespaced_pod( + self.namespace, + field_selector=field_selector, + label_selector=label_selector) + + return [pod.metadata.name for pod in pod_list.items] + + def is_running(self, node_id): + pod = core_api().read_namespaced_pod_status(node_id, self.namespace) + return pod.status.phase == "Running" + + def is_terminated(self, node_id): + pod = core_api().read_namespaced_pod_status(node_id, self.namespace) + return pod.status.phase not in ["Running", "Pending"] + + def node_tags(self, node_id): + pod = core_api().read_namespaced_pod_status(node_id, self.namespace) + return pod.metadata.labels + + def external_ip(self, node_id): + raise NotImplementedError("Must use internal IPs with kubernetes.") + + def internal_ip(self, node_id): + pod = core_api().read_namespaced_pod_status(node_id, self.namespace) + return pod.status.pod_ip + + def set_node_tags(self, node_id, tags): + body = {"metadata": {"labels": tags}} + core_api().patch_namespaced_pod(node_id, self.namespace, body) + + def create_node(self, node_config, tags, count): + pod_spec = node_config.copy() + tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name + pod_spec["metadata"]["namespace"] = self.namespace + pod_spec["metadata"]["labels"] = tags + logger.info(log_prefix + "calling create_namespaced_pod " + "(count={}).".format(count)) + for _ in range(count): + core_api().create_namespaced_pod(self.namespace, pod_spec) + + def terminate_node(self, node_id): + core_api().delete_namespaced_pod(node_id, self.namespace) + + def terminate_nodes(self, node_ids): + for node_id in node_ids: + self.terminate_node(node_id) diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index d8ffd0b24..c9b0c9164 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -28,12 +28,24 @@ def import_local(): return bootstrap_local, LocalNodeProvider +def import_kubernetes(): + from ray.autoscaler.kubernetes.config import bootstrap_kubernetes + from ray.autoscaler.kubernetes.node_provider import KubernetesNodeProvider + return bootstrap_kubernetes, KubernetesNodeProvider + + def load_local_example_config(): import ray.autoscaler.local as ray_local return os.path.join( os.path.dirname(ray_local.__file__), "example-full.yaml") +def load_kubernetes_example_config(): + import ray.autoscaler.kubernetes as ray_kubernetes + return os.path.join( + os.path.dirname(ray_kubernetes.__file__), "example-full.yaml") + + def load_aws_example_config(): import ray.autoscaler.aws as ray_aws return os.path.join(os.path.dirname(ray_aws.__file__), "example-full.yaml") @@ -58,7 +70,7 @@ NODE_PROVIDERS = { "aws": import_aws, "gcp": import_gcp, "azure": None, # TODO: support more node providers - "kubernetes": None, + "kubernetes": import_kubernetes, "docker": None, "external": import_external # Import an external module } @@ -68,7 +80,7 @@ DEFAULT_CONFIGS = { "aws": load_aws_example_config, "gcp": load_gcp_example_config, "azure": None, # TODO: support more node providers - "kubernetes": None, + "kubernetes": load_kubernetes_example_config, "docker": None, } diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 7e9934898..981985e29 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -25,40 +25,142 @@ logger = logging.getLogger(__name__) # How long to wait for a node to start, in seconds NODE_START_WAIT_S = 300 -SSH_CHECK_INTERVAL = 5 +READY_CHECK_INTERVAL = 5 HASH_MAX_LENGTH = 10 +KUBECTL_RSYNC = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "kubernetes/kubectl-rsync.sh") -def get_default_ssh_options(private_key, connect_timeout, ssh_control_path): - OPTS = [ - ("ConnectTimeout", "{}s".format(connect_timeout)), - ("StrictHostKeyChecking", "no"), - ("ControlMaster", "auto"), - ("ControlPath", "{}/%C".format(ssh_control_path)), - ("ControlPersist", "10s"), - ] - - return ["-i", private_key] + [ - x for y in (["-o", "{}={}".format(k, v)] for k, v in OPTS) for x in y - ] +def with_interactive(cmd): + force_interactive = ("true && source ~/.bashrc && " + "export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ") + return ["bash", "--login", "-c", "-i", force_interactive + cmd] -class NodeUpdater(object): - """A process for syncing files and running init commands on a node.""" +class KubernetesCommandRunner(object): + def __init__(self, log_prefix, namespace, node_id, auth_config, + process_runner): - def __init__(self, - node_id, - provider_config, - provider, - auth_config, - cluster_name, - file_mounts, - initialization_commands, - setup_commands, - ray_start_commands, - runtime_hash, - process_runner=subprocess, - use_internal_ip=False): + self.log_prefix = log_prefix + self.process_runner = process_runner + self.node_id = node_id + self.namespace = namespace + self.kubectl = ["kubectl", "-n", self.namespace] + + def run(self, + cmd, + timeout=120, + redirect=None, + allocate_tty=False, + exit_on_fail=False, + port_forward=None): + + logger.info(self.log_prefix + "Running {}...".format(cmd)) + + if port_forward: + port_forward_cmd = self.kubectl + [ + "port-forward", self.node_id, + str(port_forward) + ] + port_forward_process = subprocess.Popen(port_forward_cmd) + # Give port-forward a grace period to run and print output before + # running the actual command. This is a little ugly, but it should + # work in most scenarios and nothing should go very wrong if the + # command starts running before the port forward starts. + time.sleep(1) + + final_cmd = self.kubectl + [ + "exec", "-it" if allocate_tty else "-i", self.node_id, "--" + ] + with_interactive(cmd) + try: + self.process_runner.check_call( + final_cmd, stdout=redirect, stderr=redirect) + except subprocess.CalledProcessError: + if exit_on_fail: + quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) + logger.error(self.log_prefix + + "Command failed: \n\n {}\n".format(quoted_cmd)) + sys.exit(1) + else: + raise + finally: + # Clean up the port forward process. First, try to let it exit + # gracefull with SIGTERM. If that doesn't work after 1s, send + # SIGKILL. + if port_forward: + port_forward_process.terminate() + for _ in range(10): + time.sleep(0.1) + port_forward_process.poll() + if port_forward_process.returncode: + break + logger.info(self.log_prefix + + "Waiting for port forward to die...") + else: + logger.warning(self.log_prefix + + "Killing port forward with SIGKILL.") + port_forward_process.kill() + + def run_rsync_up(self, source, target, redirect=None): + if target.startswith("~"): + target = "/root" + target[1:] + + try: + self.process_runner.check_call( + [ + KUBECTL_RSYNC, + "-avz", + source, + "{}@{}:{}".format(self.node_id, self.namespace, target), + ], + stdout=redirect, + stderr=redirect) + except Exception as e: + logger.warning(self.log_prefix + + "rsync failed: '{}'. Falling back to 'kubectl cp'" + .format(e)) + self.process_runner.check_call( + self.kubectl + [ + "cp", source, "{}/{}:{}".format(self.namespace, + self.node_id, target) + ], + stdout=redirect, + stderr=redirect) + + def run_rsync_down(self, source, target, redirect=None): + if target.startswith("~"): + target = "/root" + target[1:] + + try: + self.process_runner.check_call( + [ + KUBECTL_RSYNC, + "-avz", + "{}@{}:{}".format(self.node_id, self.namespace, source), + target, + ], + stdout=redirect, + stderr=redirect) + except Exception as e: + logger.warning(self.log_prefix + + "rsync failed: '{}'. Falling back to 'kubectl cp'" + .format(e)) + self.process_runner.check_call( + self.kubectl + [ + "cp", "{}/{}:{}".format(self.namespace, self.node_id, + source), target + ], + stdout=redirect, + stderr=redirect) + + def remote_shell_command_str(self): + return "{} exec -it {} bash".format(" ".join(self.kubectl), + self.node_id) + + +class SSHCommandRunner(object): + def __init__(self, log_prefix, node_id, provider, auth_config, + cluster_name, process_runner, use_internal_ip): ssh_control_hash = hashlib.md5(cluster_name.encode()).hexdigest() ssh_user_hash = hashlib.md5(getuser().encode()).hexdigest() @@ -66,24 +168,29 @@ class NodeUpdater(object): ssh_user_hash[:HASH_MAX_LENGTH], ssh_control_hash[:HASH_MAX_LENGTH]) - self.daemon = True + self.log_prefix = log_prefix self.process_runner = process_runner self.node_id = node_id - self.use_internal_ip = (use_internal_ip or provider_config.get( - "use_internal_ips", False)) + self.use_internal_ip = use_internal_ip self.provider = provider self.ssh_private_key = auth_config["ssh_private_key"] self.ssh_user = auth_config["ssh_user"] self.ssh_control_path = ssh_control_path self.ssh_ip = None - self.file_mounts = { - remote: os.path.expanduser(local) - for remote, local in file_mounts.items() - } - self.initialization_commands = initialization_commands - self.setup_commands = setup_commands - self.ray_start_commands = ray_start_commands - self.runtime_hash = runtime_hash + + def get_default_ssh_options(self, connect_timeout): + OPTS = [ + ("ConnectTimeout", "{}s".format(connect_timeout)), + ("StrictHostKeyChecking", "no"), + ("ControlMaster", "auto"), + ("ControlPath", "{}/%C".format(self.ssh_control_path)), + ("ControlPersist", "10s"), + ] + + return ["-i", self.ssh_private_key] + [ + x for y in (["-o", "{}={}".format(k, v)] for k, v in OPTS) + for x in y + ] def get_node_ip(self): if self.use_internal_ip: @@ -94,8 +201,7 @@ class NodeUpdater(object): def wait_for_ip(self, deadline): while time.time() < deadline and \ not self.provider.is_terminated(self.node_id): - logger.info("NodeUpdater: " - "Waiting for IP of {}...".format(self.node_id)) + logger.info(self.log_prefix + "Waiting for IP...") ip = self.get_node_ip() if ip is not None: return ip @@ -110,7 +216,7 @@ class NodeUpdater(object): # We assume that this never changes. # I think that's reasonable. deadline = time.time() + NODE_START_WAIT_S - with LogTimer("NodeUpdater: {}: Got IP".format(self.node_id)): + with LogTimer(self.log_prefix + "Got IP"): ip = self.wait_for_ip(deadline) assert ip is not None, "Unable to find IP of node" @@ -134,24 +240,125 @@ class NodeUpdater(object): stdout=redirect, stderr=redirect) except subprocess.CalledProcessError as e: - logger.warning(e) + logger.warning(self.log_prefix + str(e)) + + def run(self, + cmd, + timeout=120, + redirect=None, + allocate_tty=False, + exit_on_fail=False, + port_forward=None): + + self.set_ssh_ip_if_required() + + logger.info(self.log_prefix + + "Running {} on {}...".format(cmd, self.ssh_ip)) + ssh = ["ssh"] + if allocate_tty: + ssh.append("-tt") + + if port_forward: + ssh += ["-L", "{}:localhost:{}".format(port_forward, port_forward)] + + final_cmd = ssh + self.get_default_ssh_options(timeout) + [ + "{}@{}".format(self.ssh_user, self.ssh_ip) + ] + with_interactive(cmd) + try: + self.process_runner.check_call( + final_cmd, stdout=redirect, stderr=redirect) + except subprocess.CalledProcessError: + if exit_on_fail: + quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) + logger.error(self.log_prefix + + "Command failed: \n\n {}\n".format(quoted_cmd)) + sys.exit(1) + else: + raise + + def run_rsync_up(self, source, target, redirect=None): + self.set_ssh_ip_if_required() + self.process_runner.check_call( + [ + "rsync", "--rsh", + " ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz", + source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target) + ], + stdout=redirect, + stderr=redirect) + + def rsync_down(self, source, target, redirect=None): + self.set_ssh_ip_if_required() + self.process_runner.check_call( + [ + "rsync", "--rsh", + " ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz", + "{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target + ], + stdout=redirect, + stderr=redirect) + + def remote_shell_command_str(self): + return "ssh -i {} {}@{}\n".format(self.ssh_private_key, self.ssh_user, + self.ssh_ip) + + +class NodeUpdater(object): + """A process for syncing files and running init commands on a node.""" + + def __init__(self, + node_id, + provider_config, + provider, + auth_config, + cluster_name, + file_mounts, + initialization_commands, + setup_commands, + ray_start_commands, + runtime_hash, + process_runner=subprocess, + use_internal_ip=False): + + self.log_prefix = "NodeUpdater: {}: ".format(node_id) + if provider_config["type"] == "kubernetes": + self.cmd_runner = KubernetesCommandRunner( + self.log_prefix, provider.namespace, node_id, auth_config, + process_runner) + else: + use_internal_ip = (use_internal_ip or provider_config.get( + "use_internal_ips", False)) + self.cmd_runner = SSHCommandRunner( + self.log_prefix, node_id, provider, auth_config, cluster_name, + process_runner, use_internal_ip) + + self.daemon = True + self.process_runner = process_runner + self.node_id = node_id + self.provider = provider + self.file_mounts = { + remote: os.path.expanduser(local) + for remote, local in file_mounts.items() + } + self.initialization_commands = initialization_commands + self.setup_commands = setup_commands + self.ray_start_commands = ray_start_commands + self.runtime_hash = runtime_hash def run(self): - logger.info("NodeUpdater: " - "{}: Updating to {}".format(self.node_id, - self.runtime_hash)) + logger.info(self.log_prefix + + "Updating to {}".format(self.runtime_hash)) try: - m = "{}: Applied config {}".format(self.node_id, self.runtime_hash) - with LogTimer("NodeUpdater: {}".format(m)): + with LogTimer(self.log_prefix + + "Applied config {}".format(self.runtime_hash)): self.do_update() except Exception as e: error_str = str(e) if hasattr(e, "cmd"): error_str = "(Exit Status {}) {}".format( e.returncode, " ".join(e.cmd)) - logger.error("NodeUpdater: " - "{}: Error updating {}".format( - self.node_id, error_str)) + logger.error(self.log_prefix + + "Error updating {}".format(error_str)) self.provider.set_node_tags( self.node_id, {TAG_RAY_NODE_STATUS: STATUS_UPDATE_FAILED}) raise e @@ -164,35 +371,6 @@ class NodeUpdater(object): self.exitcode = 0 - def wait_for_ssh(self, deadline): - logger.info("NodeUpdater: " - "{}: Waiting for SSH...".format(self.node_id)) - - while time.time() < deadline and \ - not self.provider.is_terminated(self.node_id): - try: - logger.debug("NodeUpdater: " - "{}: Waiting for SSH...".format(self.node_id)) - - # Setting redirect=False allows the user to see errors like - # unix_listener: path "/tmp/rkn_ray_ssh_sockets/..." too long - # for Unix domain socket. - self.ssh_cmd("uptime", connect_timeout=5, redirect=False) - - return True - - except Exception as e: - retry_str = str(e) - if hasattr(e, "cmd"): - retry_str = "(Exit Status {}): {}".format( - e.returncode, " ".join(e.cmd)) - logger.debug("NodeUpdater: " - "{}: SSH not up, retrying: {}".format( - self.node_id, retry_str)) - time.sleep(SSH_CHECK_INTERVAL) - - return False - def sync_file_mounts(self, sync_cmd): # Rsync file mounts for remote_path, local_path in self.file_mounts.items(): @@ -203,26 +381,46 @@ class NodeUpdater(object): if not remote_path.endswith("/"): remote_path += "/" - m = "{}: Synced {} to {}".format(self.node_id, local_path, - remote_path) - with LogTimer("NodeUpdater {}".format(m)): - self.ssh_cmd( - "mkdir -p {}".format(os.path.dirname(remote_path)), - redirect=None, - ) + with LogTimer(self.log_prefix + + "Synced {} to {}".format(local_path, remote_path)): + self.cmd_runner.run("mkdir -p {}".format( + os.path.dirname(remote_path))) sync_cmd(local_path, remote_path, redirect=None) + def wait_ready(self, deadline): + with LogTimer(self.log_prefix + "Got remote shell"): + logger.info(self.log_prefix + "Waiting for remote shell...") + + while time.time() < deadline and \ + not self.provider.is_terminated(self.node_id): + try: + logger.debug(self.log_prefix + + "Waiting for remote shell...") + + # Setting redirect=False allows the user to see errors like + # unix_listener: path "/tmp/rkn_ray_ssh_sockets/..." too + # long for Unix domain socket. + self.cmd_runner.run("uptime", timeout=5, redirect=False) + + return True + + except Exception as e: + retry_str = str(e) + if hasattr(e, "cmd"): + retry_str = "(Exit Status {}): {}".format( + e.returncode, " ".join(e.cmd)) + logger.debug(self.log_prefix + + "Node not up, retrying: {}".format(retry_str)) + time.sleep(READY_CHECK_INTERVAL) + + assert False, "Unable to connect to node" + def do_update(self): self.provider.set_node_tags( self.node_id, {TAG_RAY_NODE_STATUS: STATUS_WAITING_FOR_SSH}) deadline = time.time() + NODE_START_WAIT_S - self.set_ssh_ip_if_required() - - # Wait for SSH access - with LogTimer("NodeUpdater: " "{}: Got SSH".format(self.node_id)): - ssh_ok = self.wait_for_ssh(deadline) - assert ssh_ok, "Unable to SSH to node" + self.wait_ready(deadline) node_tags = self.provider.node_tags(self.node_id) if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash: @@ -237,97 +435,28 @@ class NodeUpdater(object): # Run init commands self.provider.set_node_tags( self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SETTING_UP}) - m = "{}: Initialization commands completed".format(self.node_id) - with LogTimer("NodeUpdater: {}".format(m)): + with LogTimer(self.log_prefix + + "Initialization commands completed"): for cmd in self.initialization_commands: - self.ssh_cmd(cmd) + self.cmd_runner.run(cmd) - m = "{}: Setup commands completed".format(self.node_id) - with LogTimer("NodeUpdater: {}".format(m)): + with LogTimer(self.log_prefix + "Setup commands completed"): for cmd in self.setup_commands: - self.ssh_cmd(cmd) + self.cmd_runner.run(cmd) - m = "{}: Ray start commands completed".format(self.node_id) - with LogTimer("NodeUpdater: {}".format(m)): + with LogTimer(self.log_prefix + "Ray start commands completed"): for cmd in self.ray_start_commands: - self.ssh_cmd(cmd) + self.cmd_runner.run(cmd) def rsync_up(self, source, target, redirect=None): - logger.info("NodeUpdater: " - "{}: Syncing {} to {}...".format(self.node_id, source, - target)) - self.set_ssh_ip_if_required() - self.process_runner.check_call( - [ - "rsync", "-e", " ".join(["ssh"] + get_default_ssh_options( - self.ssh_private_key, 120, self.ssh_control_path)), "-avz", - source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target) - ], - stdout=redirect or sys.stdout, - stderr=redirect or sys.stderr) + logger.info(self.log_prefix + + "Syncing {} to {}...".format(source, target)) + self.cmd_runner.run_rsync_up(source, target, redirect=None) def rsync_down(self, source, target, redirect=None): - logger.info("NodeUpdater: " - "{}: Syncing {} from {}...".format(self.node_id, source, - target)) - self.set_ssh_ip_if_required() - self.process_runner.check_call( - [ - "rsync", "-e", " ".join(["ssh"] + get_default_ssh_options( - self.ssh_private_key, 120, self.ssh_control_path)), "-avz", - "{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target - ], - stdout=redirect or sys.stdout, - stderr=redirect or sys.stderr) - - def ssh_cmd(self, - cmd, - connect_timeout=120, - redirect=None, - allocate_tty=False, - emulate_interactive=True, - exit_on_fail=False, - port_forward=None): - - self.set_ssh_ip_if_required() - - logger.info("NodeUpdater: Running {} on {}...".format( - cmd, self.ssh_ip)) - ssh = ["ssh"] - if allocate_tty: - ssh.append("-tt") - if emulate_interactive: - force_interactive = ( - "true && source ~/.bashrc && " - "export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ") - cmd = "bash --login -c -i {}".format( - quote(force_interactive + cmd)) - - if port_forward is None: - ssh_opt = [] - else: - ssh_opt = [ - "-L", "{}:localhost:{}".format(port_forward, port_forward) - ] - - final_cmd = ssh + ssh_opt + get_default_ssh_options( - self.ssh_private_key, connect_timeout, self.ssh_control_path) + [ - "{}@{}".format(self.ssh_user, self.ssh_ip), cmd - ] - try: - self.process_runner.check_call( - final_cmd, - stdout=redirect or sys.stdout, - stderr=redirect or sys.stderr) - except subprocess.CalledProcessError: - if exit_on_fail: - # Only reason we need this exit flag here is because here we - # know the final command and can print it nicely before exit() - logger.error("Command failed: \n\n {}\n".format( - " ".join(final_cmd))) - sys.exit(1) - else: - raise + logger.info(self.log_prefix + + "Syncing {} from {}...".format(source, target)) + self.cmd_runner.run_rsync_down(source, target, redirect=None) class NodeUpdaterThread(NodeUpdater, Thread): diff --git a/python/setup.py b/python/setup.py index 395eb2e2b..a951aa666 100644 --- a/python/setup.py +++ b/python/setup.py @@ -42,6 +42,8 @@ ray_autoscaler_files = [ "ray/autoscaler/aws/example-full.yaml", "ray/autoscaler/gcp/example-full.yaml", "ray/autoscaler/local/example-full.yaml", + "ray/autoscaler/kubernetes/example-full.yaml", + "ray/autoscaler/kubernetes/kubectl-rsync.sh", ] ray_project_files = [