mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[operator] Minor cleanup (#7498)
This commit is contained in:
parent
b4e2d5317e
commit
08d4cb3822
7 changed files with 40 additions and 67 deletions
|
@ -21,7 +21,7 @@ spec:
|
|||
type: worker
|
||||
|
||||
# Command to start ray
|
||||
command: ray stop; ulimit -n 65536; ray start --object-manager-port=8076
|
||||
command: ray start --block --node-ip-address=$MY_POD_IP --address=$RAYCLUSTER_SAMPLE_SERVICE_HOST:$RAYCLUSTER_SAMPLE_SERVICE_PORT_REDIS --object-manager-port=12345 --node-manager-port=12346 --object-store-memory=100000000 --num-cpus=1
|
||||
|
||||
# custom labels. NOTE: do not define custom labels start with `raycluster.`, they may be used in controller.
|
||||
# Refer to https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
|
||||
|
@ -112,7 +112,7 @@ spec:
|
|||
key: value
|
||||
|
||||
# Command to start ray
|
||||
command: ray stop; ulimit -n 65536; ray start --object-manager-port=8076
|
||||
command: ray start --block --node-ip-address=$MY_POD_IP --address=$RAYCLUSTER_SAMPLE_SERVICE_HOST:$RAYCLUSTER_SAMPLE_SERVICE_PORT_REDIS --object-manager-port=12345 --node-manager-port=12346 --object-store-memory=100000000 --num-cpus=1
|
||||
|
||||
# use affinity to select nodes.Optional.
|
||||
# Refer to https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
|
||||
|
@ -194,7 +194,7 @@ spec:
|
|||
key: value
|
||||
|
||||
# Command to start ray
|
||||
command: ray stop; ulimit -n 65536; ray start --object-manager-port=8076
|
||||
command: ray start --head --block --redis-port=6379 --node-ip-address=$MY_POD_IP --object-manager-port=12345 --node-manager-port=12346 --object-store-memory=100000000 --num-cpus=1
|
||||
|
||||
# use affinity to select nodes.Optional.
|
||||
# Refer to https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
|
||||
|
|
|
@ -26,7 +26,7 @@ spec:
|
|||
raycluster.group.name: small-group
|
||||
|
||||
# Command to start ray
|
||||
command: ray stop; ulimit -n 65536; ray start --object-manager-port=8076
|
||||
command: ray start --block --node-ip-address=$MY_POD_IP --address=$RAYCLUSTER_SAMPLE_SERVICE_HOST:$RAYCLUSTER_SAMPLE_SERVICE_PORT_REDIS --object-manager-port=12345 --node-manager-port=12346 --object-store-memory=100000000 --num-cpus=1
|
||||
|
||||
# resource requirements
|
||||
# Refer to https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/
|
||||
|
@ -70,7 +70,7 @@ spec:
|
|||
raycluster.group.name: medium-group
|
||||
|
||||
# Command to start ray
|
||||
command: ray stop; ulimit -n 65536; ray start --object-manager-port=8076
|
||||
command: ray start --block --node-ip-address=$MY_POD_IP --address=$RAYCLUSTER_SAMPLE_SERVICE_HOST:$RAYCLUSTER_SAMPLE_SERVICE_PORT_REDIS --object-manager-port=12345 --node-manager-port=12346 --object-store-memory=100000000 --num-cpus=1
|
||||
|
||||
# resource requirements
|
||||
# Refer to https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/
|
||||
|
@ -114,7 +114,7 @@ spec:
|
|||
raycluster.group.name: head-group
|
||||
|
||||
# Command to start ray
|
||||
command: ray stop; ulimit -n 65536; ray start --object-manager-port=8076
|
||||
command: ray start --head --block --redis-port=6379 --node-ip-address=$MY_POD_IP --object-manager-port=12345 --node-manager-port=12346 --object-store-memory=100000000 --num-cpus=1
|
||||
|
||||
# resource requirements
|
||||
# Refer to https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/
|
||||
|
|
|
@ -26,7 +26,7 @@ spec:
|
|||
raycluster.group.name: small-group
|
||||
|
||||
# Command to start ray
|
||||
command: ray stop; ulimit -n 65536; ray start --object-manager-port=8076
|
||||
command: ray start --block --node-ip-address=$MY_POD_IP --address=$RAYCLUSTER_SAMPLE_SERVICE_HOST:$RAYCLUSTER_SAMPLE_SERVICE_PORT_REDIS --object-manager-port=12345 --node-manager-port=12346 --object-store-memory=100000000 --num-cpus=1
|
||||
|
||||
# resource requirements
|
||||
# Refer to https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/
|
||||
|
@ -56,7 +56,7 @@ spec:
|
|||
raycluster.group.name: head-group
|
||||
|
||||
# Command to start ray
|
||||
command: ray stop; ulimit -n 65536; ray start --object-manager-port=8076
|
||||
command: ray start --head --block --redis-port=6379 --node-ip-address=$MY_POD_IP --object-manager-port=12345 --node-manager-port=12346 --object-store-memory=100000000 --num-cpus=1
|
||||
|
||||
# resource requirements
|
||||
# Refer to https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/
|
||||
|
|
|
@ -22,30 +22,20 @@ func DefaultPodConfig(instance *rayiov1alpha1.RayCluster, podTypeName string, po
|
|||
}
|
||||
}
|
||||
|
||||
// Build a pod for the cluster instance.
|
||||
func BuildPod(conf *PodConfig) *corev1.Pod {
|
||||
// build label for cluster
|
||||
rayLabels := labelsForCluster(*conf.RayCluster, conf.PodName, conf.PodTypeName, conf.Extension.Labels)
|
||||
|
||||
// build container for pod, now only handle one container for each pod
|
||||
var containers []corev1.Container
|
||||
container := buildContainer(conf)
|
||||
containers = append(containers, container)
|
||||
|
||||
// create volume
|
||||
volumes := conf.Extension.Volumes
|
||||
// Build the containers for the pod (there is currently only one).
|
||||
containers := []corev1.Container{buildContainer(conf)}
|
||||
|
||||
spec := corev1.PodSpec{
|
||||
Volumes: volumes,
|
||||
Volumes: conf.Extension.Volumes,
|
||||
Containers: containers,
|
||||
Affinity: conf.Extension.Affinity,
|
||||
Tolerations: conf.Extension.Tolerations,
|
||||
ServiceAccountName: conf.RayCluster.Namespace,
|
||||
}
|
||||
|
||||
// build annotations and store podCompareHash for comparison
|
||||
annotations := conf.Extension.Annotations
|
||||
|
||||
pod := &corev1.Pod{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: "v1",
|
||||
|
@ -55,7 +45,7 @@ func BuildPod(conf *PodConfig) *corev1.Pod {
|
|||
Name: conf.PodName,
|
||||
Namespace: conf.RayCluster.Namespace,
|
||||
Labels: rayLabels,
|
||||
Annotations: annotations,
|
||||
Annotations: conf.Extension.Annotations,
|
||||
},
|
||||
Spec: spec,
|
||||
}
|
||||
|
@ -63,51 +53,38 @@ func BuildPod(conf *PodConfig) *corev1.Pod {
|
|||
return pod
|
||||
}
|
||||
|
||||
// Build container for pod.
|
||||
func buildContainer(conf *PodConfig) corev1.Container {
|
||||
|
||||
redisPort := defaultRedisPort
|
||||
httpServerPort := defaultHTTPServerPort
|
||||
jobManagerPort := defaultRedisPort
|
||||
|
||||
// assign image by typeName
|
||||
image := conf.RayCluster.Spec.Images.DefaultImage
|
||||
if conf.Extension.Image != "" {
|
||||
image = conf.Extension.Image
|
||||
}
|
||||
|
||||
volumeMounts := conf.Extension.VolumeMounts
|
||||
|
||||
// add instance name and namespace to container env to identify cluster pods
|
||||
var containerEnv []corev1.EnvVar
|
||||
containerEnv = conf.Extension.ContainerEnv
|
||||
containerEnv = append(containerEnv,
|
||||
// Add instance name and namespace to container env to identify cluster pods.
|
||||
// Add pod IP address to container env.
|
||||
containerEnv := append(conf.Extension.ContainerEnv,
|
||||
corev1.EnvVar{Name: namespace, Value: conf.RayCluster.Namespace},
|
||||
corev1.EnvVar{Name: clusterName, Value: conf.RayCluster.Name})
|
||||
corev1.EnvVar{Name: clusterName, Value: conf.RayCluster.Name},
|
||||
corev1.EnvVar{Name: "MY_POD_IP", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.podIP"}}},
|
||||
)
|
||||
|
||||
container := corev1.Container{
|
||||
return corev1.Container{
|
||||
Name: strings.ToLower(conf.PodTypeName),
|
||||
Image: image,
|
||||
Command: []string{"/bin/bash", "-c", "--"},
|
||||
Args: []string{conf.Extension.Command},
|
||||
Env: containerEnv,
|
||||
Resources: conf.Extension.Resources,
|
||||
VolumeMounts: volumeMounts,
|
||||
VolumeMounts: conf.Extension.VolumeMounts,
|
||||
ImagePullPolicy: conf.RayCluster.Spec.ImagePullPolicy,
|
||||
Ports: []corev1.ContainerPort{
|
||||
{
|
||||
ContainerPort: int32(redisPort),
|
||||
ContainerPort: int32(defaultRedisPort),
|
||||
Name: "redis",
|
||||
},
|
||||
{
|
||||
ContainerPort: int32(httpServerPort),
|
||||
ContainerPort: int32(defaultHTTPServerPort),
|
||||
Name: "http-server",
|
||||
},
|
||||
{
|
||||
ContainerPort: int32(jobManagerPort),
|
||||
Name: "job-manager",
|
||||
},
|
||||
},
|
||||
}
|
||||
return container
|
||||
}
|
||||
|
|
|
@ -20,9 +20,11 @@ func DefaultServiceConfig(instance rayiov1alpha1.RayCluster, podName string) *Se
|
|||
}
|
||||
}
|
||||
|
||||
// Build service for pod, for now only head pod will have service.
|
||||
// Build the service for a pod. Currently, there is only one service that allows
|
||||
// the worker nodes to connect to the head node.
|
||||
func ServiceForPod(conf *ServiceConfig) *corev1.Service {
|
||||
name := conf.PodName
|
||||
// Format the service name as "<cluster_name>-head."
|
||||
if strings.Contains(conf.PodName, Head) {
|
||||
name = utils.Before(conf.PodName, Head) + "head"
|
||||
}
|
||||
|
@ -34,8 +36,10 @@ func ServiceForPod(conf *ServiceConfig) *corev1.Service {
|
|||
},
|
||||
Spec: corev1.ServiceSpec{
|
||||
Ports: []corev1.ServicePort{{Name: "redis", Port: int32(defaultRedisPort)}},
|
||||
ClusterIP: "None",
|
||||
// select this raycluster's component
|
||||
// TODO(edoakes): ClusterIPNone (headless service) should work but I wasn't
|
||||
// able to get the environment variables for service discovery to work.
|
||||
// ClusterIP: corev1.ClusterIPNone,
|
||||
// This selector must match the label of the head node.
|
||||
Selector: map[string]string{
|
||||
rayclusterComponent: conf.PodName,
|
||||
},
|
||||
|
|
|
@ -68,7 +68,10 @@ func (r *RayClusterReconciler) Reconcile(request reconcile.Request) (reconcile.R
|
|||
}
|
||||
log.Error(err, "Read request instance error!")
|
||||
// Error reading the object - requeue the request.
|
||||
return reconcile.Result{}, ignoreNotFound(err)
|
||||
if !apierrs.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
|
||||
log.Info("Print instance - ", "Instance.ToString", instance)
|
||||
|
@ -103,7 +106,7 @@ func (r *RayClusterReconciler) Reconcile(request reconcile.Request) (reconcile.R
|
|||
|
||||
log.Info("Runtime Pods", "size", len(runtimePods.Items), "runtime pods namelist", runtimePodNameList)
|
||||
|
||||
// record pod need to be deleted
|
||||
// Record that the pod needs to be deleted.
|
||||
difference := runtimePodNameList.Difference(expectedPodNameList)
|
||||
|
||||
// fill replicas with runtime if exists or expectedPod if not exists
|
||||
|
@ -116,7 +119,7 @@ func (r *RayClusterReconciler) Reconcile(request reconcile.Request) (reconcile.R
|
|||
}
|
||||
}
|
||||
|
||||
// create service for head
|
||||
// Create the head node service.
|
||||
if needServicePodMap.Cardinality() > 0 {
|
||||
for elem := range needServicePodMap.Iterator().C {
|
||||
podName := elem.(string)
|
||||
|
@ -135,20 +138,17 @@ func (r *RayClusterReconciler) Reconcile(request reconcile.Request) (reconcile.R
|
|||
}
|
||||
}
|
||||
|
||||
// check pod and create one by one if not exist
|
||||
// Check if each pod exists and if not, create it.
|
||||
for i, replica := range replicas {
|
||||
// create pod if not exist
|
||||
if !utils.IsCreated(&replica) {
|
||||
log.Info("Creating pod", "index", i, "create pod", replica.Name)
|
||||
if err := r.Create(context.TODO(), &replica); err != nil {
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
// pod created, no more work possible for this round
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// delete pods to desired state
|
||||
// Delete pods if needed.
|
||||
if difference.Cardinality() > 0 {
|
||||
log.Info("difference", "pods", difference)
|
||||
for _, runtimePod := range runtimePods.Items {
|
||||
|
@ -208,10 +208,3 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|||
}).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func ignoreNotFound(err error) error {
|
||||
if apierrs.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -3,16 +3,15 @@ package utils
|
|||
import "testing"
|
||||
|
||||
func TestBefore(t *testing.T) {
|
||||
if Before("a","b") != ""{
|
||||
if Before("a", "b") != "" {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if Before("aaa","a") != ""{
|
||||
if Before("aaa", "a") != "" {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if Before("aab","b") != "aa"{
|
||||
if Before("aab", "b") != "aa" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue