mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
220 lines
7.9 KiB
Go
220 lines
7.9 KiB
Go
package controllers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
mapset "github.com/deckarep/golang-set"
|
|
"github.com/go-logr/logr"
|
|
_ "k8s.io/api/apps/v1beta1"
|
|
rayiov1alpha1 "ray-operator/api/v1alpha1"
|
|
"ray-operator/controllers/common"
|
|
_ "ray-operator/controllers/common"
|
|
"ray-operator/controllers/utils"
|
|
"strings"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
|
|
"sigs.k8s.io/controller-runtime/pkg/source"
|
|
)
|
|
|
|
var log = logf.Log.WithName("RayCluster-Controller")
|
|
|
|
// newReconciler returns a new reconcile.Reconciler
|
|
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
|
|
return &RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}
|
|
}
|
|
|
|
var _ reconcile.Reconciler = &RayClusterReconciler{}
|
|
|
|
// RayClusterReconciler reconciles a RayCluster object
|
|
type RayClusterReconciler struct {
|
|
client.Client
|
|
Log logr.Logger
|
|
Scheme *runtime.Scheme
|
|
}
|
|
|
|
// Reconcile reads that state of the cluster for a RayCluster object and makes changes based on it
|
|
// and what is in the RayCluster.Spec
|
|
// Automatically generate RBAC rules to allow the Controller to read and write workloads
|
|
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
|
|
|
|
func (r *RayClusterReconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
|
|
_ = r.Log.WithValues("raycluster", request.NamespacedName)
|
|
log.Info("Reconciling RayCluster", "cluster name", request.Name)
|
|
|
|
// Fetch the RayCluster instance
|
|
instance := &rayiov1alpha1.RayCluster{}
|
|
err := r.Get(context.TODO(), request.NamespacedName, instance)
|
|
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
// Object not found, return. Created objects are automatically garbage collected.
|
|
// For additional cleanup logic use finalizers.
|
|
return reconcile.Result{}, nil
|
|
}
|
|
log.Error(err, "Read request instance error!")
|
|
// Error reading the object - requeue the request.
|
|
if !apierrs.IsNotFound(err) {
|
|
return reconcile.Result{}, err
|
|
}
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
log.Info("Print instance - ", "Instance.ToString", instance)
|
|
|
|
// Build pods for instance
|
|
expectedPods := r.buildPods(instance)
|
|
|
|
expectedPodNameList := mapset.NewSet()
|
|
expectedPodMap := make(map[string]corev1.Pod)
|
|
needServicePodMap := mapset.NewSet()
|
|
for _, pod := range expectedPods {
|
|
expectedPodNameList.Add(pod.Name)
|
|
expectedPodMap[pod.Name] = pod
|
|
if strings.EqualFold(pod.Labels[common.ClusterPodType], common.Head) {
|
|
needServicePodMap.Add(pod.Name)
|
|
}
|
|
}
|
|
|
|
log.Info("Build pods according to the ray cluster instance", "size", len(expectedPods), "podNames", expectedPodNameList)
|
|
|
|
runtimePods := corev1.PodList{}
|
|
if err = r.List(context.TODO(), &runtimePods, client.InNamespace(instance.Namespace), client.MatchingLabels{common.RayClusterOwnerKey: request.Name}); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
runtimePodNameList := mapset.NewSet()
|
|
runtimePodMap := make(map[string]corev1.Pod)
|
|
for _, runtimePod := range runtimePods.Items {
|
|
runtimePodNameList.Add(runtimePod.Name)
|
|
runtimePodMap[runtimePod.Name] = runtimePod
|
|
}
|
|
|
|
log.Info("Runtime Pods", "size", len(runtimePods.Items), "runtime pods namelist", runtimePodNameList)
|
|
|
|
// Record that the pod needs to be deleted.
|
|
difference := runtimePodNameList.Difference(expectedPodNameList)
|
|
|
|
// fill replicas with runtime if exists or expectedPod if not exists
|
|
var replicas []corev1.Pod
|
|
for _, pod := range expectedPods {
|
|
if runtimePodNameList.Contains(pod.Name) {
|
|
replicas = append(replicas, runtimePodMap[pod.Name])
|
|
} else {
|
|
replicas = append(replicas, pod)
|
|
}
|
|
}
|
|
|
|
// Create the head node service.
|
|
if needServicePodMap.Cardinality() > 0 {
|
|
for elem := range needServicePodMap.Iterator().C {
|
|
podName := elem.(string)
|
|
svcConf := common.DefaultServiceConfig(*instance, podName)
|
|
rayPodSvc := common.ServiceForPod(svcConf)
|
|
blockOwnerDeletion := true
|
|
ownerReference := metav1.OwnerReference{
|
|
APIVersion: instance.APIVersion,
|
|
Kind: instance.Kind,
|
|
Name: instance.Name,
|
|
UID: instance.UID,
|
|
BlockOwnerDeletion: &blockOwnerDeletion,
|
|
}
|
|
rayPodSvc.OwnerReferences = append(rayPodSvc.OwnerReferences, ownerReference)
|
|
if errSvc := r.Create(context.TODO(), rayPodSvc); errSvc != nil {
|
|
if errors.IsAlreadyExists(errSvc) {
|
|
log.Info("Pod service already exist,no need to create")
|
|
} else {
|
|
log.Error(errSvc, "Pod Service create error!", "Pod.Service.Error", errSvc)
|
|
return reconcile.Result{}, errSvc
|
|
}
|
|
} else {
|
|
log.Info("Pod Service create anew successfully", "podName", rayPodSvc.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check if each pod exists and if not, create it.
|
|
for i, replica := range replicas {
|
|
if !utils.IsCreated(&replica) {
|
|
log.Info("Creating pod", "index", i, "create pod", replica.Name)
|
|
if err := r.Create(context.TODO(), &replica); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete pods if needed.
|
|
if difference.Cardinality() > 0 {
|
|
log.Info("difference", "pods", difference)
|
|
for _, runtimePod := range runtimePods.Items {
|
|
if difference.Contains(runtimePod.Name) {
|
|
log.Info("Deleting pod", "namespace", runtimePod.Namespace, "name", runtimePod.Name)
|
|
if err := r.Delete(context.TODO(), &runtimePod); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
if strings.EqualFold(runtimePod.Labels[common.ClusterPodType], common.Head) {
|
|
svcConf := common.DefaultServiceConfig(*instance, runtimePod.Name)
|
|
raySvcHead := common.ServiceForPod(svcConf)
|
|
log.Info("delete head service", "headName", runtimePod.Name)
|
|
if err := r.Delete(context.TODO(), raySvcHead); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
// Build cluster instance pods.
|
|
func (r *RayClusterReconciler) buildPods(instance *rayiov1alpha1.RayCluster) []corev1.Pod {
|
|
var pods []corev1.Pod
|
|
if instance.Spec.Extensions != nil && len(instance.Spec.Extensions) > 0 {
|
|
for _, extension := range instance.Spec.Extensions {
|
|
var i int32 = 0
|
|
for i = 0; i < *extension.Replicas; i++ {
|
|
podType := fmt.Sprintf("%v", extension.Type)
|
|
podName := instance.Name + common.DashSymbol + extension.GroupName + common.DashSymbol + podType + common.DashSymbol + utils.FormatInt32(i)
|
|
podConf := common.DefaultPodConfig(instance, podType, podName)
|
|
podConf.Extension = extension
|
|
pod := common.BuildPod(podConf)
|
|
// Set raycluster instance as the owner and controller
|
|
if err := controllerutil.SetControllerReference(instance, pod, r.Scheme); err != nil {
|
|
log.Error(err, "Failed to set controller reference for raycluster pod")
|
|
}
|
|
pods = append(pods, *pod)
|
|
}
|
|
}
|
|
} else {
|
|
log.Info("RayCluster extensions are nil or empty")
|
|
}
|
|
|
|
return pods
|
|
}
|
|
|
|
// SetupWithManager builds the reconciler.
|
|
func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&rayiov1alpha1.RayCluster{}).
|
|
Watches(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
|
|
IsController: true,
|
|
OwnerType: &rayiov1alpha1.RayCluster{},
|
|
}).
|
|
Complete(r)
|
|
}
|