[kuberay] Test Ray client and update autoscaler image (#24195)

This PR adds KubeRay e2e testing for Ray client and updates the suggested autoscaler image to one running the merge commit of PR #23883 .
This commit is contained in:
Dmitri Gekhtman 2022-04-27 18:02:12 -07:00 committed by GitHub
parent cc864401fb
commit d68c1ecaf9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 258 additions and 85 deletions

View file

@ -391,6 +391,13 @@
conditions: ["RAY_CI_LINUX_WHEELS_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- echo "--- Setting up Python 3.7 environment."
- PYTHON=3.7 ./ci/env/install-dependencies.sh
# Specifying PYTHON=3.7 above somehow messes up the Ray install.
# Uninstall and re-install Ray so that we can use Ray Client.
# (Remove thirdparty_files to sidestep an issue with psutil.)
- pip uninstall -y ray && rm -rf /ray/python/ray/thirdparty_files
- pip install -e /ray/python
- echo "--- Setting up local kind cluster."
- ./ci/k8s/prep-k8s-environment.sh
- echo "--- Building py37-cpu Ray image for the test."

View file

@ -57,6 +57,12 @@ ray.init("auto")
ray.autoscaler.sdk.request_resources(num_cpus=4)
```
> **_NOTE:_** The example config ray-cluster.complete.yaml specifies rayproject/ray:8c5fe4
> as the Ray autoscaler image. This image carries the latest improvements to KubeRay autoscaling
> support. This autoscaler image is confirmed to be compatible with Ray versions >= 1.11.0.
> Once Ray autoscaler support is stable, the recommended pattern will be to use the same
> Ray version in the autoscaler and Ray containers.
## Uninstalling the KubeRay operator
You can uninstall the KubeRay operator using
@ -83,7 +89,7 @@ Here is one procedure to test development autoscaler code.
```dockerfile
# Use the latest Ray master as base.
FROM rayproject/ray:nightly
# Invalidate cache so that fresh code is pulled in the next step.
# Invalidate the cache so that fresh code is pulled in the next step.
ARG BUILD_DATE
# Retrieve your development code.
RUN git clone -b <my-dev-branch> https://github.com/<my-git-handle>/ray

View file

@ -8,7 +8,7 @@ metadata:
# An unique identifier for the head node and workers of this cluster.
name: raycluster-complete
spec:
rayVersion: '1.11.0'
rayVersion: '1.12.0'
enableInTreeAutoscaling: false
######################headGroupSpecs#################################
# head group template and specs, (perhaps 'group' is not needed in the name)
@ -58,8 +58,18 @@ spec:
containers:
# The Ray head pod
- name: ray-head
image: rayproject/ray:1.11.0
image: rayproject/ray:1.12.0
imagePullPolicy: Always
# The KubeRay operator uses the ports specified on the ray-head container
# to configure a service targeting the ports.
# The name of the service is <ray cluster name>-head-svc.
ports:
- containerPort: 6379
name: gcs
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
env:
- name: CPU_REQUEST
valueFrom:
@ -85,8 +95,6 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
ports:
- containerPort: 6379
lifecycle:
preStop:
exec:
@ -103,8 +111,12 @@ spec:
name: ray-logs
# The Ray autoscaler sidecar to the head pod
- name: autoscaler
# TODO: Use released Ray version starting with Ray 1.12.0.
image: rayproject/ray:413fe0
# The autoscaler image used carries the latest improvements to KubeRay autoscaling
# support.
# It is confirmed (via kuberay/test_autoscaling_e2e.py) to be compatible with all
# Ray versions since Ray 1.11.0.
# TODO: Use released Ray version when autoscaling support is stable.
image: rayproject/ray:8c5fe4
imagePullPolicy: Always
env:
- name: RAY_CLUSTER_NAMESPACE
@ -178,7 +190,7 @@ spec:
image: busybox:1.28
command: ['sh', '-c', "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for myservice; sleep 2; done"]
containers:
- name: machine-learning # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
image: rayproject/ray:1.11.0
# environment variables to set in the container.Optional.
# Refer to https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/
@ -190,22 +202,22 @@ spec:
- name: CPU_REQUEST
valueFrom:
resourceFieldRef:
containerName: machine-learning
containerName: ray-worker
resource: requests.cpu
- name: CPU_LIMITS
valueFrom:
resourceFieldRef:
containerName: machine-learning
containerName: ray-worker
resource: limits.cpu
- name: MEMORY_LIMITS
valueFrom:
resourceFieldRef:
containerName: machine-learning
containerName: ray-worker
resource: limits.memory
- name: MEMORY_REQUESTS
valueFrom:
resourceFieldRef:
containerName: machine-learning
containerName: ray-worker
resource: requests.memory
- name: MY_POD_NAME
valueFrom:

View file

@ -1,12 +1,18 @@
import ray
@ray.remote(num_gpus=1, num_cpus=1)
class GPUActor:
def where_am_i(self):
assert len(ray.get_gpu_ids()) == 1
return "on-a-gpu-node"
def main():
"""Requests placement of a GPU actor."""
@ray.remote(num_gpus=1, num_cpus=1)
class GPUActor:
def where_am_i(self):
assert len(ray.get_gpu_ids()) == 1
return "on-a-gpu-node"
GPUActor.options(name="gpu_actor", lifetime="detached").remote()
ray.init("auto", namespace="gpu-test")
GPUActor.options(name="gpu_actor", lifetime="detached").remote()
if __name__ == "__main__":
ray.init("auto", namespace="gpu-test")
main()

View file

@ -1,6 +1,14 @@
import ray
ray.init("auto", namespace="gpu-test")
gpu_actor = ray.get_actor("gpu_actor")
actor_response = ray.get(gpu_actor.where_am_i.remote())
print(actor_response)
def main():
"""Confirms placement of a GPU actor."""
gpu_actor = ray.get_actor("gpu_actor")
actor_response = ray.get(gpu_actor.where_am_i.remote())
return actor_response
if __name__ == "__main__":
ray.init("auto", namespace="gpu-test")
out = main()
print(out)

View file

@ -1,6 +1,13 @@
import ray
ray.init("auto", namespace="gpu-test")
ray.autoscaler.sdk.request_resources(num_cpus=0)
gpu_actor = ray.get_actor("gpu_actor")
ray.kill(gpu_actor)
def main():
"""Removes CPU request, removes GPU actor."""
ray.autoscaler.sdk.request_resources(num_cpus=0)
gpu_actor = ray.get_actor("gpu_actor")
ray.kill(gpu_actor)
if __name__ == "__main__":
ray.init("auto", namespace="gpu-test")
main()

View file

@ -1,4 +1,11 @@
import ray
ray.init("auto")
ray.autoscaler.sdk.request_resources(num_cpus=2)
def main():
"""Submits CPU request."""
ray.autoscaler.sdk.request_resources(num_cpus=2)
if __name__ == "__main__":
ray.init("auto")
main()

View file

@ -1,9 +1,16 @@
import ray
ray.init("auto")
# Workers and head are annotated as having 5 "Custom2" capacity each,
# so this should trigger upscaling of two workers.
# (One of the bundles will be "placed" on the head.)
ray.autoscaler.sdk.request_resources(
bundles=[{"Custom2": 3}, {"Custom2": 3}, {"Custom2": 3}]
)
def main():
"""Submits custom resource request."""
# Workers and head are annotated as having 5 "Custom2" capacity each,
# so this should trigger upscaling of two workers.
# (One of the bundles will be "placed" on the head.)
ray.autoscaler.sdk.request_resources(
bundles=[{"Custom2": 3}, {"Custom2": 3}, {"Custom2": 3}]
)
if __name__ == "__main__":
ray.init("auto")
main()

View file

@ -14,6 +14,7 @@ from ray.tests.kuberay.utils import (
get_pod,
get_pod_names,
get_raycluster,
ray_client_port_forward,
kubectl_exec_python_script,
wait_for_pods,
wait_for_pod_to_start,
@ -21,6 +22,9 @@ from ray.tests.kuberay.utils import (
wait_for_crd,
)
from ray.tests.kuberay.scripts import gpu_actor_placement, gpu_actor_validation
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
@ -29,10 +33,13 @@ logging.basicConfig(
# This image will be used for both the Ray nodes and the autoscaler.
# The CI should pass an image built from the test branch.
RAY_IMAGE = os.environ.get("RAY_IMAGE", "rayproject/ray:413fe0")
RAY_IMAGE = os.environ.get("RAY_IMAGE", "rayproject/ray:8c5fe4")
# By default, use the same image for the autoscaler and Ray containers.
AUTOSCALER_IMAGE = os.environ.get("AUTOSCALER_IMAGE", RAY_IMAGE)
# Set to IfNotPresent in kind CI.
PULL_POLICY = os.environ.get("PULL_POLICY", "Always")
logger.info(f"Using image `{RAY_IMAGE}` for autoscaler and Ray nodes.")
logger.info(f"Using image `{RAY_IMAGE}` for Ray containers.")
logger.info(f"Using image `{AUTOSCALER_IMAGE}` for Autoscaler containers.")
logger.info(f"Using pull policy `{PULL_POLICY}` for all images.")
# The default "rayproject/ray:413fe0" is the currently pinned autoscaler image
# (to be replaced with rayproject/ray:1.12.0 upon 1.12.0 release).
@ -84,44 +91,21 @@ class KubeRayAutoscalingTest(unittest.TestCase):
logger.info("Making sure RayCluster CRD has been registered.")
wait_for_crd("rayclusters.ray.io")
def _get_ray_cr_config_file(self) -> str:
"""Formats a RayCluster CR based on the example in the Ray documentation.
- Replaces Ray node and autoscaler images in example CR with the test image.
- Set image pull policies to IfNotPresent.
- Writes modified CR to temp file.
- Returns temp file's name.
"""
# Set Ray and autoscaler images.
with open(EXAMPLE_CLUSTER_PATH) as example_cluster_file:
ray_cr_config_str = example_cluster_file.read()
ray_images = [
word for word in ray_cr_config_str.split() if "rayproject/ray:" in word
]
for ray_image in ray_images:
ray_cr_config_str = ray_cr_config_str.replace(ray_image, RAY_IMAGE)
# CI should set pull policies to IfNotPresent to ensure no issues using a local
# test image on kind.
ray_cr_config_str = ray_cr_config_str.replace("Always", PULL_POLICY)
raycluster_cr_file = tempfile.NamedTemporaryFile(delete=False)
raycluster_cr_file.write(ray_cr_config_str.encode())
raycluster_cr_file.close()
return raycluster_cr_file.name
def _get_ray_cr_config(
self, min_replicas=0, max_replicas=300, replicas=0
) -> Dict[str, Any]:
"""Get Ray CR config yaml.
Use configurable replica fields for a CPU workerGroup.
- Use configurable replica fields for a CPU workerGroup.
Also add a GPU-annotated group for testing GPU upscaling.
- Add a GPU-annotated group for testing GPU upscaling.
- Fill in Ray image, autoscaler image, and image pull policies from env
variables.
"""
with open(self._get_ray_cr_config_file()) as ray_config_file:
ray_config_str = ray_config_file.read()
config = yaml.safe_load(ray_config_str)
with open(EXAMPLE_CLUSTER_PATH) as ray_cr_config_file:
ray_cr_config_str = ray_cr_config_file.read()
config = yaml.safe_load(ray_cr_config_str)
cpu_group = config["spec"]["workerGroupSpecs"][0]
cpu_group["replicas"] = replicas
cpu_group["minReplicas"] = min_replicas
@ -138,6 +122,31 @@ class KubeRayAutoscalingTest(unittest.TestCase):
gpu_group["groupName"] = "fake-gpu-group"
config["spec"]["workerGroupSpecs"].append(gpu_group)
# Substitute images.
for group_spec in config["spec"]["workerGroupSpecs"] + [
config["spec"]["headGroupSpec"]
]:
containers = group_spec["template"]["spec"]["containers"]
ray_container = containers[0]
# Confirm the first container in the example config is the Ray container.
assert ray_container["name"] in ["ray-head", "ray-worker"]
ray_container["image"] = RAY_IMAGE
for container in containers:
container["imagePullPolicy"] = PULL_POLICY
head_containers = config["spec"]["headGroupSpec"]["template"]["spec"][
"containers"
]
autoscaler_container = [
container
for container in head_containers
if container["name"] == "autoscaler"
].pop()
autoscaler_container["image"] = AUTOSCALER_IMAGE
return config
def _apply_ray_cr(
@ -254,12 +263,11 @@ class KubeRayAutoscalingTest(unittest.TestCase):
)
# 2. Trigger GPU upscaling by requesting placement of a GPU actor.
logger.info("Scheduling an Actor with GPU demands.")
kubectl_exec_python_script(
script_name="gpu_actor_placement.py",
pod=head_pod,
container="ray-head",
namespace="default",
)
# Use Ray client to validate that it works against KubeRay.
with ray_client_port_forward(
head_service="raycluster-complete-head-svc", ray_namespace="gpu-test"
):
gpu_actor_placement.main()
# 3. Confirm new pod number and presence of fake GPU worker.
logger.info("Confirming fake GPU worker up-scaling.")
wait_for_pods(goal_num_pods=4, namespace="default")
@ -272,12 +280,10 @@ class KubeRayAutoscalingTest(unittest.TestCase):
# 4. Confirm that the GPU actor is up and that Ray believes
# the node the actor is on has a GPU.
logger.info("Confirming GPU actor placement.")
out = kubectl_exec_python_script(
script_name="gpu_actor_validation.py",
pod=head_pod,
container="ray-head",
namespace="default",
)
with ray_client_port_forward(
head_service="raycluster-complete-head-svc", ray_namespace="gpu-test"
):
out = gpu_actor_validation.main()
# Confirms the actor was placed on a GPU-annotated node.
# (See gpu_actor_validation.py for details.)
assert "on-a-gpu-node" in out

View file

@ -1,13 +1,17 @@
"""Utilities for e2e tests of KubeRay/Ray integration.
For consistency, all K8s interactions use kubectl through subprocess calls.
"""
import atexit
import contextlib
import logging
from pathlib import Path
import pathlib
import subprocess
import time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Generator, List, Optional
import yaml
import ray
logger = logging.getLogger(__name__)
@ -78,7 +82,6 @@ def get_pod_names(namespace: str) -> List[str]:
return []
else:
return get_pods_output.split("\n")
pass
def wait_for_pod_to_start(
@ -208,7 +211,7 @@ def kubectl_exec_python_script(
Prints and return kubectl's output as a string.
"""
script_path = Path(__file__).resolve().parent / "scripts" / script_name
script_path = pathlib.Path(__file__).resolve().parent / "scripts" / script_name
with open(script_path) as script_file:
script_string = script_file.read()
return kubectl_exec(["python", "-c", script_string], pod, namespace, container)
@ -227,3 +230,107 @@ def get_raycluster(raycluster: str, namespace: str) -> Dict[str, Any]:
.strip()
)
return yaml.safe_load(get_raycluster_output)
def _get_service_port(service: str, namespace: str, target_port: int) -> int:
"""Given a K8s service and a port targetted by the service, returns the
corresponding port exposed by the service.
Args:
service: Name of a K8s service.
namespace: Namespace to which the service belongs.
target_port: Port targeted by the service.
Returns:
service_port: The port exposed by the service.
"""
service_str = (
subprocess.check_output(
["kubectl", "-n", namespace, "get", "service", service, "-o", "yaml"]
)
.decode()
.strip()
)
service_dict = yaml.safe_load(service_str)
service_ports: List = service_dict["spec"]["ports"]
matching_ports = [
port for port in service_ports if port["targetPort"] == target_port
]
assert matching_ports
service_port = matching_ports[0]["port"]
return service_port
@contextlib.contextmanager
def _kubectl_port_forward(
service: str, namespace: str, target_port: int, local_port: Optional[int] = None
) -> Generator[int, None, None]:
"""Context manager which creates a kubectl port-forward process targeting a
K8s service.
Terminates the port-forwarding process upon exit.
Args:
service: Name of a K8s service.
namespace: Namespace to which the service belongs.
target_port: The port targeted by the service.
local_port: Forward from this port. Optional. By default, uses the port exposed
by the service.
Yields:
The local port. The service can then be accessed at 127.0.0.1:<local_port>.
"""
# First, figure out which port the service exposes for the given target port.
service_port = _get_service_port(service, namespace, target_port)
if not local_port:
local_port = service_port
process = subprocess.Popen(
[
"kubectl",
"-n",
namespace,
"port-forward",
f"service/{service}",
f"{local_port}:{service_port}",
]
)
def terminate_process():
process.terminate()
# Wait 10 seconds for the process to terminate.
# This cleans up the zombie entry from the process table.
# 10 seconds is a deliberately excessive amount of time to wait.
process.wait(timeout=10)
# Ensure clean-up in case of interrupt.
atexit.register(terminate_process)
# terminate_process is ok to execute multiple times.
try:
yield local_port
finally:
terminate_process()
@contextlib.contextmanager
def ray_client_port_forward(
head_service: str,
k8s_namespace: str = "default",
ray_namespace: Optional[str] = None,
ray_client_port: int = 10001,
):
"""Context manager which manages a Ray client connection using kubectl port-forward.
Args:
head_service: The name of the Ray head K8s service.
k8s_namespace: K8s namespace the Ray cluster belongs to.
ray_namespace: The Ray namespace to connect to.
ray_client_port: The port on which the Ray head is running the Ray client
server.
"""
with _kubectl_port_forward(
service=head_service, namespace=k8s_namespace, target_port=ray_client_port
) as local_port:
with ray.init(f"ray://127.0.0.1:{local_port}", namespace=ray_namespace):
yield