mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Kuberay] Ray Autoscaler integration with Kuberay (MVP) (#21086)
This is a minimum viable product for Ray Autoscaler integration with Kuberay. It is not ready for prime time/general use, but should be enough for interested parties to get started (see the documentation in kuberay.md).
This commit is contained in:
parent
7d74a9face
commit
fbc51d6d0e
13 changed files with 828 additions and 0 deletions
|
@ -13,6 +13,7 @@ Ray with Cluster Managers
|
|||
:maxdepth: 2
|
||||
|
||||
kubernetes.rst
|
||||
kuberay.md
|
||||
yarn.rst
|
||||
slurm.rst
|
||||
lsf.rst
|
||||
|
|
96
doc/source/cluster/kuberay.md
Normal file
96
doc/source/cluster/kuberay.md
Normal file
|
@ -0,0 +1,96 @@
|
|||
# Deploying with Kuberay (experimental)
|
||||
|
||||
```{admonition} What is Kuberay?
|
||||
[Kuberay](https://github.com/ray-project/kuberay) is a set of tools for running Ray on Kubernetes.
|
||||
It has been used by some larger corporations to deploy Ray on their infrastructure.
|
||||
Going forward, we would like to make this way of deployment accessible and seamless for
|
||||
all Ray users and standardize Ray deployment on Kubernetes around Kuberay's operator.
|
||||
Presently you should consider this integration a minimal viable product that is not polished
|
||||
enough for general use and prefer the [Kubernetes integration](kubernetes.rst) for running
|
||||
Ray on Kubernetes. If you are brave enough to try the Kuberay integration out, this documentation
|
||||
is for you! We would love your feedback as a [Github issue](https://github.com/ray-project/ray/issues)
|
||||
including `[Kuberay]` in the title.
|
||||
```
|
||||
|
||||
Here we describe how you can deploy a Ray cluster on Kuberay. The following instructions are for
|
||||
Minikube but the deployment works the same way on a real Kubernetes cluster. You need to have at
|
||||
least 4 CPUs to run this example. First we make sure Minikube is initialized with
|
||||
|
||||
```shell
|
||||
minikube start
|
||||
```
|
||||
|
||||
Now you can deploy the Kuberay operator using
|
||||
|
||||
```shell
|
||||
./ray/python/ray/autoscaler/kuberay/init-config.sh
|
||||
kubectl apply -k "ray/python/ray/autoscaler/kuberay/config/default"
|
||||
kubectl apply -f "ray/python/ray/autoscaler/kuberay/kuberay-autoscaler.yaml"
|
||||
```
|
||||
|
||||
You can verify that the operator has been deployed using
|
||||
|
||||
```shell
|
||||
kubectl -n ray-system get pods
|
||||
```
|
||||
|
||||
Now let's deploy a new Ray cluster:
|
||||
|
||||
```shell
|
||||
kubectl create -f ray/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml
|
||||
```
|
||||
|
||||
## Using the autoscaler
|
||||
|
||||
Let's now try out the autoscaler. We can run the following command to get a
|
||||
Python interpreter in the head pod:
|
||||
|
||||
```shell
|
||||
kubectl exec `kubectl get pods -o custom-columns=POD:metadata.name | grep raycluster-complete-head` -it -c ray-head -- python
|
||||
```
|
||||
|
||||
In the Python interpreter, run the following snippet to scale up the cluster:
|
||||
|
||||
```python
|
||||
import ray.autoscaler.sdk
|
||||
ray.init("auto")
|
||||
ray.autoscaler.sdk.request_resources(num_cpus=4)
|
||||
```
|
||||
|
||||
## Uninstalling the Kuberay operator
|
||||
|
||||
You can uninstall the Kuberay operator using
|
||||
```shell
|
||||
kubectl delete -f "ray/python/ray/autoscaler/kuberay/kuberay-autoscaler.yaml"
|
||||
kubectl delete -k "ray/python/ray/autoscaler/kuberay/config/default"
|
||||
```
|
||||
|
||||
Note that all running Ray clusters will automatically be terminated.
|
||||
|
||||
## Developing the Kuberay integration (advanced)
|
||||
|
||||
If you also want to change the underlying Kuberay operator, please refer to the instructions
|
||||
in [the Kuberay development documentation](https://github.com/ray-project/kuberay/blob/master/ray-operator/DEVELOPMENT.md). In that case you should push the modified operator to your docker account or registry and
|
||||
follow the instructions in `ray/python/ray/autoscaler/kuberay/init-config.sh`.
|
||||
|
||||
The remainder of the instructions will cover how to change the autoscaler code.
|
||||
|
||||
In order to maximize development iteration speed, we recommend using a Linux machine with Python 3.7 for
|
||||
development, since that will simplify building wheels incrementally.
|
||||
Make the desired modification to Ray and/or the autoscaler and build the Ray wheels by running
|
||||
the following command in the `ray/python` directory:
|
||||
|
||||
```shell
|
||||
python setup.py bdist_wheel
|
||||
```
|
||||
|
||||
Then in the `ray/docker/kuberay-autoscaler` directory run:
|
||||
|
||||
```shell
|
||||
cp ../../python/dist/ray-2.0.0.dev0-cp37-cp37m-linux_x86_64.whl ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl
|
||||
docker build --build-arg WHEEL_PATH="ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl" -t rayproject/kuberay-autoscaler --no-cache .
|
||||
docker push rayproject/kuberay-autoscaler
|
||||
```
|
||||
|
||||
where you replace `rayproject/kuberay-autoscaler` with the desired image path in your own docker account (normally
|
||||
`<username>/kuberay-autoscaler`). Please also make sure to update the image in `ray-cluster.complete.yaml`.
|
8
docker/kuberay-autoscaler/Dockerfile
Normal file
8
docker/kuberay-autoscaler/Dockerfile
Normal file
|
@ -0,0 +1,8 @@
|
|||
FROM rayproject/ray:nightly
|
||||
ARG WHEEL_PATH
|
||||
RUN $HOME/anaconda3/bin/pip uninstall -y ray
|
||||
COPY $WHEEL_PATH .
|
||||
RUN $HOME/anaconda3/bin/pip --no-cache-dir install "$WHEEL_PATH"[all]
|
||||
|
||||
COPY run_autoscaler.py /home/ray/run_autoscaler.py
|
||||
COPY run_autoscaler_with_retries.py /home/ray/run_autoscaler_with_retries.py
|
61
docker/kuberay-autoscaler/run_autoscaler.py
Normal file
61
docker/kuberay-autoscaler/run_autoscaler.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
import argparse
|
||||
import logging
|
||||
import os
|
||||
|
||||
import ray
|
||||
from ray import ray_constants
|
||||
from ray._private.ray_logging import setup_component_logger
|
||||
from ray._private.services import get_node_ip_address
|
||||
from ray.autoscaler._private.monitor import Monitor
|
||||
import yaml
|
||||
|
||||
AUTOSCALING_CONFIG_PATH = "/autoscaler/ray_bootstrap_config.yaml"
|
||||
|
||||
|
||||
def setup_logging() -> None:
|
||||
"""Log to standard autoscaler log location (logs viewable in UI) and
|
||||
pod stdout (logs viewable with `kubectl logs <head-pod> -c autoscaler`).
|
||||
"""
|
||||
# Write logs at info level to monitor.log.
|
||||
setup_component_logger(
|
||||
logging_level=ray_constants.LOGGER_LEVEL, # info
|
||||
logging_format=ray_constants.LOGGER_FORMAT,
|
||||
log_dir=os.path.join(ray._private.utils.get_ray_temp_dir(),
|
||||
ray.node.SESSION_LATEST, "logs"),
|
||||
filename=ray_constants.MONITOR_LOG_FILE_NAME, # monitor.log
|
||||
max_bytes=ray_constants.LOGGING_ROTATE_BYTES,
|
||||
backup_count=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
|
||||
)
|
||||
|
||||
# Also log to stdout for debugging with `kubectl logs`.
|
||||
root_logger = logging.getLogger("")
|
||||
root_logger.setLevel(logging.INFO)
|
||||
|
||||
root_handler = logging.StreamHandler()
|
||||
root_handler.setLevel(logging.INFO)
|
||||
root_handler.setFormatter(logging.Formatter(ray_constants.LOGGER_FORMAT))
|
||||
|
||||
root_logger.addHandler(root_handler)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
setup_logging()
|
||||
|
||||
parser = argparse.ArgumentParser(description="Kuberay Autoscaler")
|
||||
parser.add_argument(
|
||||
"--redis-password",
|
||||
required=False,
|
||||
type=str,
|
||||
default=None,
|
||||
help="The password to use for Redis")
|
||||
args = parser.parse_args()
|
||||
|
||||
cluster_name = yaml.safe_load(
|
||||
open(AUTOSCALING_CONFIG_PATH).read())["cluster_name"]
|
||||
head_ip = get_node_ip_address()
|
||||
Monitor(
|
||||
address=f"{head_ip}:6379",
|
||||
redis_password=args.redis_password,
|
||||
autoscaling_config=AUTOSCALING_CONFIG_PATH,
|
||||
monitor_ip=head_ip,
|
||||
).run()
|
29
docker/kuberay-autoscaler/run_autoscaler_with_retries.py
Normal file
29
docker/kuberay-autoscaler/run_autoscaler_with_retries.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
here = os.path.dirname(os.path.abspath(__file__))
|
||||
run_autoscaler_script = os.path.join(here, "run_autoscaler.py")
|
||||
|
||||
BACKOFF_S = 5
|
||||
|
||||
if __name__ == "__main__":
|
||||
"""Keep trying to start the autoscaler until it runs.
|
||||
We need to retry until the Ray head is running.
|
||||
|
||||
This script also has the effect of restarting the autoscaler if it fails.
|
||||
|
||||
Autoscaler-starting attempts are run in subprocesses out of fear that a
|
||||
failed Monitor.start() attempt could leave dangling half-initialized global
|
||||
Python state.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
# We are forwarding all the command line arguments of
|
||||
# run_autoscaler_with_retries.py to run_autoscaler.py.
|
||||
subprocess.run(["python", f"{run_autoscaler_script}"] +
|
||||
sys.argv[1:]) # noqa: B1
|
||||
except subprocess.SubprocessError:
|
||||
print(f"Restarting autoscaler in {BACKOFF_S} seconds.")
|
||||
time.sleep(BACKOFF_S)
|
0
python/ray/autoscaler/_private/kuberay/__init__.py
Normal file
0
python/ray/autoscaler/_private/kuberay/__init__.py
Normal file
293
python/ray/autoscaler/_private/kuberay/node_provider.py
Normal file
293
python/ray/autoscaler/_private/kuberay/node_provider.py
Normal file
|
@ -0,0 +1,293 @@
|
|||
import json
|
||||
import logging
|
||||
import requests
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler.tags import (NODE_KIND_HEAD, NODE_KIND_WORKER,
|
||||
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED,
|
||||
TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE)
|
||||
|
||||
# Terminology:
|
||||
|
||||
# Labels and Tags
|
||||
# We call the Kuberay labels "labels" and the Ray autoscaler tags "tags".
|
||||
# The labels are prefixed by "ray.io". Tags are prefixed by "ray-".
|
||||
# We convert between the two but do not mix them.
|
||||
|
||||
# Worker Groups and Available Node Types
|
||||
# In Kuberay the different node types are called "worker groups", in the
|
||||
# the Ray autoscaler they are called "available node types".
|
||||
|
||||
# Design:
|
||||
|
||||
# Each modification the autoscaler wants to make is posted to the API server goal state
|
||||
# (e.g. if the autoscaler wants to scale up, it increases the number of
|
||||
# replicas of the worker group it wants to scale, if it wants to scale down
|
||||
# it decreases the number of replicas and adds the exact pods that should be
|
||||
# terminated to the scaleStrategy). In order to guarantee consistency, the NodeProvider
|
||||
# then waits until Kuberay's reconciliation loop creates the pod specifications in the
|
||||
# API server and then returns control back to the autoscaler. The waiting period
|
||||
# is typically small, on the order of a few seconds. We make sure that only one
|
||||
# such modification is in process by serializing all modification operations with
|
||||
# a lock in the NodeProvider.
|
||||
|
||||
# Note: Log handlers set up in autoscaling monitor entrypoint.
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
provider_exists = False
|
||||
|
||||
|
||||
def to_label_selector(tags: Dict[str, str]) -> str:
|
||||
"""Convert tags to label selector to embed in query to K8s API server."""
|
||||
label_selector = ""
|
||||
for k, v in tags.items():
|
||||
if label_selector != "":
|
||||
label_selector += ","
|
||||
label_selector += "{}={}".format(k, v)
|
||||
return label_selector
|
||||
|
||||
|
||||
def status_tag(pod: Dict[str, Any]) -> str:
|
||||
"""Convert pod state to Ray autoscaler status tag."""
|
||||
if ("containerStatuses" not in pod["status"]
|
||||
or not pod["status"]["containerStatuses"]):
|
||||
return "pending"
|
||||
|
||||
state = pod["status"]["containerStatuses"][0]["state"]
|
||||
|
||||
if "pending" in state:
|
||||
return "pending"
|
||||
if "running" in state:
|
||||
return STATUS_UP_TO_DATE
|
||||
if "waiting" in state:
|
||||
return "waiting"
|
||||
if "terminated" in state:
|
||||
return STATUS_UPDATE_FAILED
|
||||
raise ValueError("Unexpected container state.")
|
||||
|
||||
|
||||
def make_node_tags(labels: Dict[str, str], status_tag: str) -> Dict[str, str]:
|
||||
"""Convert Kuberay labels to Ray autoscaler tags."""
|
||||
tags = {"ray-node-status": status_tag}
|
||||
|
||||
if labels["ray.io/node-type"] == "head":
|
||||
tags[TAG_RAY_NODE_KIND] = NODE_KIND_HEAD
|
||||
tags[TAG_RAY_USER_NODE_TYPE] = "head-group"
|
||||
else:
|
||||
tags[TAG_RAY_NODE_KIND] = NODE_KIND_WORKER
|
||||
tags[TAG_RAY_USER_NODE_TYPE] = labels["ray.io/group"]
|
||||
|
||||
return tags
|
||||
|
||||
|
||||
class KuberayNodeProvider(NodeProvider): # type: ignore
|
||||
def __init__(
|
||||
self,
|
||||
provider_config: Dict[str, Any],
|
||||
cluster_name: str,
|
||||
_allow_multiple: bool = False,
|
||||
):
|
||||
logger.info("Creating KuberayNodeProvider.")
|
||||
self.namespace = provider_config["namespace"]
|
||||
self.cluster_name = cluster_name
|
||||
self._lock = threading.RLock()
|
||||
|
||||
with open("/var/run/secrets/kubernetes.io/serviceaccount/token"
|
||||
) as secret:
|
||||
token = secret.read()
|
||||
|
||||
self.headers = {
|
||||
"Authorization": "Bearer " + token,
|
||||
}
|
||||
self.verify = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
|
||||
|
||||
# Disallow multiple node providers, unless explicitly allowed for testing.
|
||||
global provider_exists
|
||||
if not _allow_multiple:
|
||||
assert (not provider_exists
|
||||
), "Only one KuberayNodeProvider allowed per process."
|
||||
assert (provider_config.get("disable_node_updaters", False) is
|
||||
True), "Must disable node updaters to use KuberayNodeProvider."
|
||||
provider_exists = True
|
||||
|
||||
super().__init__(provider_config, cluster_name)
|
||||
|
||||
def _url(self, path: str) -> str:
|
||||
"""Convert resource path to REST URL for Kubernetes API server."""
|
||||
if path.startswith("pods"):
|
||||
api_group = "/api/v1"
|
||||
elif path.startswith("rayclusters"):
|
||||
api_group = "/apis/ray.io/v1alpha1"
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"Tried to access unknown entity at {}".format(path))
|
||||
return ("https://kubernetes.default:443" + api_group + "/namespaces/" +
|
||||
self.namespace + "/" + path)
|
||||
|
||||
def _get(self, path: str) -> Dict[str, Any]:
|
||||
"""Wrapper for REST GET of resource with proper headers."""
|
||||
result = requests.get(
|
||||
self._url(path), headers=self.headers, verify=self.verify)
|
||||
assert result.status_code == 200
|
||||
return result.json()
|
||||
|
||||
def _patch(self, path: str,
|
||||
payload: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Wrapper for REST PATCH of resource with proper headers."""
|
||||
result = requests.patch(
|
||||
self._url(path),
|
||||
json.dumps(payload),
|
||||
headers={
|
||||
**self.headers, "Content-type": "application/json-patch+json"
|
||||
},
|
||||
verify=self.verify)
|
||||
assert result.status_code == 200
|
||||
return result.json()
|
||||
|
||||
def _get_worker_group(self, raycluster: Dict[str, Any],
|
||||
group_name: str) -> Tuple[int, Dict[str, Any]]:
|
||||
"""Extract group index and group definition from RayCluster."""
|
||||
group_index = None
|
||||
group_spec = None
|
||||
worker_group_specs = raycluster["spec"]["workerGroupSpecs"]
|
||||
for index, spec in enumerate(worker_group_specs):
|
||||
if spec["groupName"] == group_name:
|
||||
group_index = index
|
||||
group_spec = spec
|
||||
break
|
||||
assert group_index is not None and group_spec is not None
|
||||
return group_index, group_spec
|
||||
|
||||
def _wait_for_pods(self, group_name: str, replicas: int) -> None:
|
||||
"""Wait until `replicas` pods of type group_name are posted."""
|
||||
label_filters = to_label_selector({
|
||||
"ray.io/cluster": self.cluster_name,
|
||||
"ray.io/group": group_name
|
||||
})
|
||||
while True:
|
||||
pods = self._get("pods?labelSelector=" +
|
||||
requests.utils.quote(label_filters))
|
||||
logger.info(
|
||||
"Currently have {} replicas of group {}, requested {}.".format(
|
||||
len(pods["items"]), group_name, replicas))
|
||||
if len(pods["items"]) == replicas:
|
||||
break
|
||||
else:
|
||||
logger.info(
|
||||
"Waiting for reconciler, number of replicas is {}, expected {}.".
|
||||
format(len(pods["items"]), replicas))
|
||||
time.sleep(10.0)
|
||||
|
||||
def create_node(self, node_config: Dict[str, Any], tags: Dict[str, str],
|
||||
count: int) -> Dict[str, Dict[str, str]]:
|
||||
"""Creates a number of nodes within the namespace."""
|
||||
with self._lock:
|
||||
url = "rayclusters/{}".format(self.cluster_name)
|
||||
raycluster = self._get(url)
|
||||
group_name = tags["ray-user-node-type"]
|
||||
group_index, group_spec = self._get_worker_group(
|
||||
raycluster, group_name)
|
||||
path = f"/spec/workerGroupSpecs/{group_index}/replicas"
|
||||
payload = [{
|
||||
"op": "test",
|
||||
"path": path,
|
||||
"value": group_spec["replicas"]
|
||||
}, {
|
||||
"op": "replace",
|
||||
"path": path,
|
||||
"value": group_spec["replicas"] + count
|
||||
}]
|
||||
self._patch(url, payload)
|
||||
self._wait_for_pods(group_name, group_spec["replicas"] + count)
|
||||
return {}
|
||||
|
||||
def internal_ip(self, node_id: str) -> str:
|
||||
"""Get internal IP of a node (= Kubernetes pod)."""
|
||||
data = self._get("pods/{}".format(node_id))
|
||||
return data["status"].get("podIP", "IP not yet assigned")
|
||||
|
||||
def node_tags(self, node_id: str) -> Dict[str, str]:
|
||||
"""Get tags of a node (= Kubernetes pod)."""
|
||||
data = self._get("pods/{}".format(node_id))
|
||||
return make_node_tags(data["metadata"]["labels"], status_tag(data))
|
||||
|
||||
def non_terminated_nodes(self, tag_filters: Dict[str, str]) -> List[str]:
|
||||
"""Return a list of node ids filtered by the specified tags dict."""
|
||||
label_filters = to_label_selector({
|
||||
"ray.io/cluster": self.cluster_name,
|
||||
})
|
||||
data = self._get("pods?labelSelector=" +
|
||||
requests.utils.quote(label_filters))
|
||||
result = []
|
||||
for pod in data["items"]:
|
||||
labels = pod["metadata"]["labels"]
|
||||
tags = make_node_tags(labels, status_tag(pod))
|
||||
if tag_filters.items() <= tags.items():
|
||||
result.append(pod["metadata"]["name"])
|
||||
return result
|
||||
|
||||
def terminate_node(self, node_id: str) -> None:
|
||||
"""Terminates the specified node (= Kubernetes pod)."""
|
||||
self.terminate_nodes([node_id])
|
||||
|
||||
def terminate_nodes(self,
|
||||
node_ids: List[str]) -> Dict[str, Dict[str, str]]:
|
||||
"""Batch terminates the specified nodes (= Kubernetes pods)."""
|
||||
with self._lock:
|
||||
# Split node_ids into groups according to node type and terminate
|
||||
# them individually. Note that in most cases, node_ids contains
|
||||
# a single element and therefore it is most likely not worth
|
||||
# optimizing this code to batch the requests to the API server.
|
||||
groups = {}
|
||||
label_filters = to_label_selector({
|
||||
"ray.io/cluster": self.cluster_name
|
||||
})
|
||||
pods = self._get("pods?labelSelector=" +
|
||||
requests.utils.quote(label_filters))
|
||||
for pod in pods["items"]:
|
||||
if pod["metadata"]["name"] in node_ids:
|
||||
groups.setdefault(
|
||||
pod["metadata"]["labels"]["ray.io/group"],
|
||||
[]).append(pod["metadata"]["name"])
|
||||
|
||||
url = "rayclusters/{}".format(self.cluster_name)
|
||||
raycluster = self._get(url)
|
||||
for group_name, nodes in groups.items():
|
||||
group_index, group_spec = self._get_worker_group(
|
||||
raycluster, group_name)
|
||||
prefix = f"/spec/workerGroupSpecs/{group_index}"
|
||||
payload = [{
|
||||
"op": "test",
|
||||
"path": prefix + "/replicas",
|
||||
"value": group_spec["replicas"]
|
||||
}, {
|
||||
"op": "replace",
|
||||
"path": prefix + "/replicas",
|
||||
"value": group_spec["replicas"] - len(nodes)
|
||||
}, {
|
||||
"op": "replace",
|
||||
"path": prefix + "/scaleStrategy",
|
||||
"value": {
|
||||
"workersToDelete": nodes
|
||||
}
|
||||
}]
|
||||
self._patch(url, payload)
|
||||
|
||||
for group_name, nodes in groups.items():
|
||||
group_index, group_spec = self._get_worker_group(
|
||||
raycluster, group_name)
|
||||
prefix = f"/spec/workerGroupSpecs/{group_index}"
|
||||
self._wait_for_pods(group_name,
|
||||
group_spec["replicas"] - len(nodes))
|
||||
# Clean up workersToDelete
|
||||
self._patch(url, [{
|
||||
"op": "replace",
|
||||
"path": prefix + "/scaleStrategy",
|
||||
"value": {
|
||||
"workersToDelete": []
|
||||
}
|
||||
}])
|
||||
return {}
|
|
@ -74,6 +74,12 @@ def _import_kubernetes(provider_config):
|
|||
return KubernetesNodeProvider
|
||||
|
||||
|
||||
def _import_kuberay(provider_config):
|
||||
from ray.autoscaler._private.kuberay.node_provider import \
|
||||
KuberayNodeProvider
|
||||
return KuberayNodeProvider
|
||||
|
||||
|
||||
def _import_staroid(provider_config):
|
||||
from ray.autoscaler._private.staroid.node_provider import \
|
||||
StaroidNodeProvider
|
||||
|
@ -143,6 +149,7 @@ _NODE_PROVIDERS = {
|
|||
"azure": _import_azure,
|
||||
"staroid": _import_staroid,
|
||||
"kubernetes": _import_kubernetes,
|
||||
"kuberay": _import_kuberay,
|
||||
"aliyun": _import_aliyun,
|
||||
"external": _import_external # Import an external module
|
||||
}
|
||||
|
@ -157,6 +164,7 @@ _PROVIDER_PRETTY_NAMES = {
|
|||
"azure": "Azure",
|
||||
"staroid": "Staroid",
|
||||
"kubernetes": "Kubernetes",
|
||||
"kuberay": "Kuberay",
|
||||
"aliyun": "Aliyun",
|
||||
"external": "External"
|
||||
}
|
||||
|
|
31
python/ray/autoscaler/kuberay/init-config.sh
Executable file
31
python/ray/autoscaler/kuberay/init-config.sh
Executable file
|
@ -0,0 +1,31 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Clone pinned Kuberay commit to temporary directory, copy the CRD definitions
|
||||
# into the autoscaler folder.
|
||||
|
||||
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
|
||||
|
||||
DIR=$(mktemp -d -t "kuberay-XXXXXX")
|
||||
|
||||
pushd "$DIR" || exit
|
||||
git clone https://github.com/ray-project/kuberay/
|
||||
pushd "kuberay" || exit
|
||||
# If you changed the Kuberay CRD, you need to update this commit to point
|
||||
# to the new CRD. The following always need to be compatible: The used CRDs,
|
||||
# the docker image of the Kuberay operator and the KuberayNodeProvider.
|
||||
# This is normally not a problem since the KuberayNodeProvider uses a
|
||||
# stable part of the CRD definition and the Kuberay operator and the
|
||||
# CRDs are in the https://github.com/ray-project/kuberay/ so they
|
||||
# get updated together. It is important to keep this in mind when making
|
||||
# changes. The CRD is designed to be stable so one operator can run many
|
||||
# different versions of Ray.
|
||||
git checkout 6f87ca64c107cd51d3ab955faf4be198e0094536
|
||||
# Here is where we specify the docker image that is used for the operator.
|
||||
# If you want to use your own version of Kuberay, you should change the content
|
||||
# of kuberay-autoscaler.patch to point to your operator.
|
||||
# This would normally better be done with kustomization, but we don't want to make
|
||||
# kustomization a dependency for running this.
|
||||
git apply "$SCRIPT_DIR/kuberay-autoscaler.patch"
|
||||
cp -r ray-operator/config "$SCRIPT_DIR/"
|
||||
popd || exit
|
||||
popd || exit
|
13
python/ray/autoscaler/kuberay/kuberay-autoscaler.patch
Normal file
13
python/ray/autoscaler/kuberay/kuberay-autoscaler.patch
Normal file
|
@ -0,0 +1,13 @@
|
|||
diff --git a/ray-operator/config/default/kustomization.yaml b/ray-operator/config/default/kustomization.yaml
|
||||
index 333106d..fdf62d6 100644
|
||||
--- a/ray-operator/config/default/kustomization.yaml
|
||||
+++ b/ray-operator/config/default/kustomization.yaml
|
||||
@@ -21,6 +21,6 @@ bases:
|
||||
|
||||
images:
|
||||
- name: kuberay/operator
|
||||
- newName: kuberay/operator
|
||||
- newTag: nightly
|
||||
+ newName: rayproject/kuberay-operator
|
||||
+ newTag: latest
|
||||
|
65
python/ray/autoscaler/kuberay/kuberay-autoscaler.yaml
Normal file
65
python/ray/autoscaler/kuberay/kuberay-autoscaler.yaml
Normal file
|
@ -0,0 +1,65 @@
|
|||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: autoscaler-config
|
||||
data:
|
||||
ray_bootstrap_config.yaml: |
|
||||
cluster_name: raycluster-complete
|
||||
min_workers: 0
|
||||
max_workers: 10
|
||||
idle_timeout_minutes: 1
|
||||
provider:
|
||||
type: kuberay
|
||||
namespace: default
|
||||
disable_node_updaters: True
|
||||
disable_launch_config_check: True
|
||||
head_node_type: head-group
|
||||
available_node_types:
|
||||
head-group:
|
||||
resources: {"CPU": 1}
|
||||
node_config: {}
|
||||
max_workers: 1
|
||||
small-group:
|
||||
resources: {"CPU": 1}
|
||||
node_config: {}
|
||||
min_workers: 0
|
||||
max_workers: 10
|
||||
file_mounts: {}
|
||||
cluster_synced_files: []
|
||||
file_mounts_sync_continuously: False
|
||||
initialization_commands: []
|
||||
setup_commands: []
|
||||
head_setup_commands: []
|
||||
worker_setup_commands: []
|
||||
head_start_ray_commands: []
|
||||
worker_start_ray_commands: []
|
||||
auth: {}
|
||||
head_node: {}
|
||||
worker_nodes: {}
|
||||
---
|
||||
kind: Role
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
namespace: default
|
||||
name: autoscaler-role
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["pods"]
|
||||
verbs: ["get", "list"]
|
||||
- apiGroups: ["ray.io"]
|
||||
resources: ["rayclusters"]
|
||||
verbs: ["get", "patch"]
|
||||
---
|
||||
kind: RoleBinding
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
namespace: default
|
||||
name: autoscaler-rolebinding
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: autoscaler-sa
|
||||
namespace: default
|
||||
roleRef:
|
||||
kind: Role
|
||||
name: autoscaler-role
|
||||
apiGroup: rbac.authorization.k8s.io
|
220
python/ray/autoscaler/kuberay/ray-cluster.complete.yaml
Normal file
220
python/ray/autoscaler/kuberay/ray-cluster.complete.yaml
Normal file
|
@ -0,0 +1,220 @@
|
|||
# This is adapted from https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray-cluster.complete.yaml
|
||||
# It is a general RayCluster that has most fields in it for maximum flexibility in the Ray/Kuberay integration MVP.
|
||||
apiVersion: ray.io/v1alpha1
|
||||
kind: RayCluster
|
||||
metadata:
|
||||
labels:
|
||||
controller-tools.k8s.io: "1.0"
|
||||
# An unique identifier for the head node and workers of this cluster.
|
||||
name: raycluster-complete
|
||||
spec:
|
||||
rayVersion: '1.9.2'
|
||||
enableInTreeAutoscaling: false
|
||||
######################headGroupSpecs#################################
|
||||
# head group template and specs, (perhaps 'group' is not needed in the name)
|
||||
headGroupSpec:
|
||||
# Kubernetes Service Type, valid values are 'ClusterIP', 'NodePort' and 'LoadBalancer'
|
||||
serviceType: ClusterIP
|
||||
# the pod replicas in this group typed head (assuming there could be more than 1 in the future)
|
||||
replicas: 1
|
||||
# logical group name, for this called head-group, also can be functional
|
||||
# pod type head or worker
|
||||
# rayNodeType: head # Not needed since it is under the headgroup
|
||||
# the following params are used to complete the ray start: ray start --head --block --redis-port=6379 ...
|
||||
rayStartParams:
|
||||
port: '6379'
|
||||
object-manager-port: '9999'
|
||||
node-manager-port: '9998'
|
||||
object-store-memory: '100000000'
|
||||
redis-password: '5241590000000000'
|
||||
dashboard-host: '0.0.0.0'
|
||||
num-cpus: '1' # can be auto-completed from the limits
|
||||
node-ip-address: $MY_POD_IP # auto-completed as the head pod IP
|
||||
block: 'true'
|
||||
#pod template
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
# custom labels. NOTE: do not define custom labels start with `raycluster.`, they may be used in controller.
|
||||
# Refer to https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
|
||||
rayCluster: raycluster-sample # will be injected if missing
|
||||
rayNodeType: head # will be injected if missing, must be head or worker
|
||||
groupName: headgroup # will be injected if missing
|
||||
# annotations for pod
|
||||
annotations:
|
||||
key: value
|
||||
spec:
|
||||
# This is needed to give the autoscaler side car permissions to query and update
|
||||
# definitions in the Kubernetes API server (see kuberay-autoscaler.yaml)
|
||||
serviceAccountName: autoscaler-sa
|
||||
containers:
|
||||
# The Ray head pod
|
||||
- name: ray-head
|
||||
image: rayproject/ray:latest
|
||||
imagePullPolicy: Always
|
||||
env:
|
||||
- name: CPU_REQUEST
|
||||
valueFrom:
|
||||
resourceFieldRef:
|
||||
containerName: ray-head
|
||||
resource: requests.cpu
|
||||
- name: CPU_LIMITS
|
||||
valueFrom:
|
||||
resourceFieldRef:
|
||||
containerName: ray-head
|
||||
resource: limits.cpu
|
||||
- name: MEMORY_LIMITS
|
||||
valueFrom:
|
||||
resourceFieldRef:
|
||||
containerName: ray-head
|
||||
resource: limits.memory
|
||||
- name: MEMORY_REQUESTS
|
||||
valueFrom:
|
||||
resourceFieldRef:
|
||||
containerName: ray-head
|
||||
resource: requests.memory
|
||||
- name: MY_POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
ports:
|
||||
- containerPort: 6379
|
||||
lifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh","-c","ray stop"]
|
||||
resources:
|
||||
limits:
|
||||
cpu: "1"
|
||||
memory: "1G"
|
||||
requests:
|
||||
cpu: "500m"
|
||||
memory: "512Mi"
|
||||
volumeMounts:
|
||||
- mountPath: /tmp/ray
|
||||
name: ray-logs
|
||||
# The Ray autoscaler sidecar to the head pod
|
||||
- name: autoscaler
|
||||
image: rayproject/kuberay-autoscaler
|
||||
imagePullPolicy: Always
|
||||
command: ["/bin/sh"]
|
||||
args: ["-c", "$HOME/anaconda3/bin/python /home/ray/run_autoscaler_with_retries.py --redis-password 5241590000000000"]
|
||||
resources:
|
||||
limits:
|
||||
cpu: 500m
|
||||
memory: 1024Mi
|
||||
requests:
|
||||
cpu: 250m
|
||||
memory: 512Mi
|
||||
volumeMounts:
|
||||
- mountPath: /autoscaler/
|
||||
name: autoscaler-config
|
||||
- mountPath: /tmp/ray
|
||||
name: ray-logs
|
||||
volumes:
|
||||
# You set volumes at the Pod level, then mount them into containers inside that Pod
|
||||
- name: autoscaler-config
|
||||
configMap:
|
||||
name: autoscaler-config
|
||||
items:
|
||||
- key: ray_bootstrap_config.yaml
|
||||
path: ray_bootstrap_config.yaml
|
||||
- name: ray-logs
|
||||
emptyDir: {}
|
||||
workerGroupSpecs:
|
||||
# the pod replicas in this group typed worker
|
||||
- replicas: 1
|
||||
minReplicas: 1
|
||||
maxReplicas: 300
|
||||
# logical group name, for this called small-group, also can be functional
|
||||
groupName: small-group
|
||||
# if worker pods need to be added, we can simply increment the replicas
|
||||
# if worker pods need to be removed, we decrement the replicas, and populate the podsToDelete list
|
||||
# the operator will remove pods from the list until the number of replicas is satisfied
|
||||
# when a pod is confirmed to be deleted, its name will be removed from the list below
|
||||
#scaleStrategy:
|
||||
# workersToDelete:
|
||||
# - raycluster-complete-worker-small-group-bdtwh
|
||||
# - raycluster-complete-worker-small-group-hv457
|
||||
# - raycluster-complete-worker-small-group-k8tj7
|
||||
# the following params are used to complete the ray start: ray start --block --node-ip-address= ...
|
||||
rayStartParams:
|
||||
redis-password: '5241590000000000'
|
||||
node-ip-address: $MY_POD_IP
|
||||
block: 'true'
|
||||
#pod template
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
key: value
|
||||
# annotations for pod
|
||||
annotations:
|
||||
key: value
|
||||
spec:
|
||||
initContainers:
|
||||
# the env var $RAY_IP is set by the operator if missing, with the value of the head service name
|
||||
- name: init-myservice
|
||||
image: busybox:1.28
|
||||
command: ['sh', '-c', "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for myservice; sleep 2; done"]
|
||||
containers:
|
||||
- name: machine-learning # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
|
||||
image: rayproject/ray
|
||||
# environment variables to set in the container.Optional.
|
||||
# Refer to https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/
|
||||
env:
|
||||
- name: RAY_DISABLE_DOCKER_CPU_WARNING
|
||||
value: "1"
|
||||
- name: TYPE
|
||||
value: "worker"
|
||||
- name: CPU_REQUEST
|
||||
valueFrom:
|
||||
resourceFieldRef:
|
||||
containerName: machine-learning
|
||||
resource: requests.cpu
|
||||
- name: CPU_LIMITS
|
||||
valueFrom:
|
||||
resourceFieldRef:
|
||||
containerName: machine-learning
|
||||
resource: limits.cpu
|
||||
- name: MEMORY_LIMITS
|
||||
valueFrom:
|
||||
resourceFieldRef:
|
||||
containerName: machine-learning
|
||||
resource: limits.memory
|
||||
- name: MEMORY_REQUESTS
|
||||
valueFrom:
|
||||
resourceFieldRef:
|
||||
containerName: machine-learning
|
||||
resource: requests.memory
|
||||
- name: MY_POD_NAME
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.name
|
||||
- name: MY_POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
ports:
|
||||
- containerPort: 80
|
||||
lifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh","-c","ray stop"]
|
||||
# use volumeMounts.Optional.
|
||||
# Refer to https://kubernetes.io/docs/concepts/storage/volumes/
|
||||
volumeMounts:
|
||||
- mountPath: /var/log
|
||||
name: log-volume
|
||||
resources:
|
||||
limits:
|
||||
cpu: "1"
|
||||
memory: "512Mi"
|
||||
requests:
|
||||
cpu: "500m"
|
||||
memory: "256Mi"
|
||||
# use volumes
|
||||
# Refer to https://kubernetes.io/docs/concepts/storage/volumes/
|
||||
volumes:
|
||||
- name: log-volume
|
||||
emptyDir: {}
|
||||
######################status#################################
|
|
@ -94,6 +94,9 @@ class AutoscalingConfigTest(unittest.TestCase):
|
|||
if "fake_multi_node" in config_path:
|
||||
# not supported with ray up
|
||||
continue
|
||||
if "kuberay" in config_path:
|
||||
# not supported with ray up
|
||||
continue
|
||||
with open(config_path) as f:
|
||||
config = yaml.safe_load(f)
|
||||
config = prepare_config(config)
|
||||
|
|
Loading…
Add table
Reference in a new issue