[autoscaler] Support creating services in k8s backend (#8659)

This commit is contained in:
Edward Oakes 2020-05-29 15:19:21 -05:00 committed by GitHub
parent 6b04664645
commit 30ed20405a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 3 deletions

View file

@ -21,6 +21,10 @@ def using_existing_msg(resource_type, name):
return "using existing {} '{}'".format(resource_type, name)
def updating_existing_msg(resource_type, name):
return "updating existing {} '{}'".format(resource_type, name)
def not_found_msg(resource_type, name):
return "{} '{}' not found, attempting to create it".format(
resource_type, name)
@ -43,6 +47,7 @@ def bootstrap_kubernetes(config):
_configure_autoscaler_service_account(namespace, config["provider"])
_configure_autoscaler_role(namespace, config["provider"])
_configure_autoscaler_role_binding(namespace, config["provider"])
_configure_services(namespace, config["provider"])
return config
@ -151,3 +156,36 @@ def _configure_autoscaler_role_binding(namespace, provider_config):
logger.info(log_prefix + not_found_msg(binding_field, name))
auth_api().create_namespaced_role_binding(namespace, binding)
logger.info(log_prefix + created_msg(binding_field, name))
def _configure_services(namespace, provider_config):
service_field = "services"
if service_field not in provider_config:
logger.info(log_prefix + not_provided_msg(service_field))
return
services = provider_config[service_field]
for service in services:
if "namespace" not in service["metadata"]:
service["metadata"]["namespace"] = namespace
elif service["metadata"]["namespace"] != namespace:
raise InvalidNamespaceError(service_field, namespace)
name = service["metadata"]["name"]
field_selector = "metadata.name={}".format(name)
services = core_api().list_namespaced_service(
namespace, field_selector=field_selector).items
if len(services) > 0:
assert len(services) == 1
existing_service = services[0]
if service == existing_service:
logger.info(log_prefix + using_existing_msg("service", name))
return
else:
logger.info(log_prefix +
updating_existing_msg("service", name))
core_api().patch_namespaced_service(name, namespace, service)
else:
logger.info(log_prefix + not_found_msg("service", name))
core_api().create_namespaced_service(namespace, service)
logger.info(log_prefix + created_msg("service", name))

View file

@ -81,6 +81,41 @@ provider:
name: autoscaler
apiGroup: rbac.authorization.k8s.io
services:
# Service that maps to the head node of the Ray cluster.
- apiVersion: v1
kind: Service
metadata:
# NOTE: If you're running multiple Ray clusters with services
# on one Kubernetes cluster, they must have unique service
# names.
name: ray-head
spec:
# This selector must match the head node pod's selector below.
selector:
component: ray-head
ports:
- protocol: TCP
port: 8000
targetPort: 8000
# Service that maps to the worker nodes of the Ray cluster.
- apiVersion: v1
kind: Service
metadata:
# NOTE: If you're running multiple Ray clusters with services
# on one Kubernetes cluster, they must have unique service
# names.
name: ray-workers
spec:
# This selector must match the worker node pods' selector below.
selector:
component: ray-worker
ports:
- protocol: TCP
port: 8000
targetPort: 8000
# Kubernetes pod config for the head node pod.
head_node:
apiVersion: v1
@ -88,6 +123,11 @@ head_node:
metadata:
# Automatically generates a name for the pod with this prefix.
generateName: ray-head-
# Must match the head node service selector above if a head node
# service is required.
labels:
component: ray-head
spec:
# Change this if you altered the autoscaler_service_account above
# or want to provide your own.
@ -160,6 +200,11 @@ worker_nodes:
metadata:
# Automatically generates a name for the pod with this prefix.
generateName: ray-worker-
# Must match the worker node service selector above if a worker node
# service is required.
labels:
component: ray-worker
spec:
serviceAccountName: default

View file

@ -62,14 +62,18 @@ class KubernetesNodeProvider(NodeProvider):
return pod.status.pod_ip
def set_node_tags(self, node_id, tags):
body = {"metadata": {"labels": tags}}
core_api().patch_namespaced_pod(node_id, self.namespace, body)
pod = core_api().read_namespaced_pod_status(node_id, self.namespace)
pod.metadata.labels.update(tags)
core_api().patch_namespaced_pod(node_id, self.namespace, pod)
def create_node(self, node_config, tags, count):
pod_spec = node_config.copy()
tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name
pod_spec["metadata"]["namespace"] = self.namespace
pod_spec["metadata"]["labels"] = tags
if "labels" in pod_spec["metadata"]:
pod_spec["metadata"]["labels"].update(tags)
else:
pod_spec["metadata"]["labels"] = tags
logger.info(log_prefix + "calling create_namespaced_pod "
"(count={}).".format(count))
for _ in range(count):