摘要:
记录k8s的operator-hub中的redis-operator的redis-cluster的CreateRedisLeaderService处理
时序图:
核心函数:
CreateRedisLeaderService
// CreateRedisLeaderService method will create service for Redis Leader func CreateRedisLeaderService(cr *redisv1beta1.RedisCluster) error { prop := RedisClusterService{ RedisServiceRole: "leader", } return prop.CreateRedisClusterService(cr) }
RedisClusterService:CreateRedisClusterService
// CreateRedisClusterService method will create service for Redis func (service RedisClusterService) CreateRedisClusterService(cr *redisv1beta1.RedisCluster) error { serviceName := cr.ObjectMeta.Name + "-" + service.RedisServiceRole logger := serviceLogger(cr.Namespace, serviceName) labels := getRedisLabels(serviceName, "cluster", service.RedisServiceRole, cr.ObjectMeta.Labels) annotations := generateServiceAnots(cr.ObjectMeta) if cr.Spec.RedisExporter != nil && cr.Spec.RedisExporter.Enabled { enableMetrics = true } objectMetaInfo := generateObjectMetaInformation(serviceName, cr.Namespace, labels, annotations) headlessObjectMetaInfo := generateObjectMetaInformation(serviceName+"-headless", cr.Namespace, labels, annotations) err := CreateOrUpdateService(cr.Namespace, headlessObjectMetaInfo, redisClusterAsOwner(cr), false, true) if err != nil { logger.Error(err, "Cannot create headless service for Redis", "Setup.Type", service.RedisServiceRole) return err } err = CreateOrUpdateService(cr.Namespace, objectMetaInfo, redisClusterAsOwner(cr), enableMetrics, false) if err != nil { logger.Error(err, "Cannot create service for Redis", "Setup.Type", service.RedisServiceRole) return err } return nil }
CreateOrUpdateService
// CreateOrUpdateService method will create or update Redis service func CreateOrUpdateService(namespace string, serviceMeta metav1.ObjectMeta, ownerDef metav1.OwnerReference, enableMetrics, headless bool) error { logger := serviceLogger(namespace, serviceMeta.Name) serviceDef := generateServiceDef(serviceMeta, enableMetrics, ownerDef, headless) storedService, err := getService(namespace, serviceMeta.Name) if err != nil { if errors.IsNotFound(err) { if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(serviceDef); err != nil { logger.Error(err, "Unable to patch redis service with compare annotations") } return createService(namespace, serviceDef) } return err } return patchService(storedService, serviceDef, namespace) }
getService
// getService is a method to get service is Kubernetes func getService(namespace string, service string) (*corev1.Service, error) { logger := serviceLogger(namespace, service) getOpts := metav1.GetOptions{ TypeMeta: generateMetaInformation("Service", "v1"), } serviceInfo, err := generateK8sClient().CoreV1().Services(namespace).Get(context.TODO(), service, getOpts) if err != nil { logger.Info("Redis service get action is failed") return nil, err } logger.Info("Redis service get action is successful") return serviceInfo, nil }
createService
// createService is a method to create service is Kubernetes func createService(namespace string, service *corev1.Service) error { logger := serviceLogger(namespace, service.Name) _, err := generateK8sClient().CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{}) if err != nil { logger.Error(err, "Redis service creation is failed") return err } logger.Info("Redis service creation is successful") return nil }
patchService
// patchService will patch Redis Kubernetes service func patchService(storedService *corev1.Service, newService *corev1.Service, namespace string) error { logger := serviceLogger(namespace, storedService.Name) // We want to try and keep this atomic as possible. newService.ResourceVersion = storedService.ResourceVersion newService.CreationTimestamp = storedService.CreationTimestamp newService.ManagedFields = storedService.ManagedFields if newService.Spec.Type == generateServiceType("ClusterIP") { newService.Spec.ClusterIP = storedService.Spec.ClusterIP } patchResult, err := patch.DefaultPatchMaker.Calculate(storedService, newService, patch.IgnoreStatusFields(), patch.IgnoreField("kind"), patch.IgnoreField("apiVersion"), ) if err != nil { logger.Error(err, "Unable to patch redis service with comparison object") return err } if !patchResult.IsEmpty() { logger.Info("Changes in service Detected, Updating...", "patch", string(patchResult.Patch)) for key, value := range storedService.Annotations { if _, present := newService.Annotations[key]; !present { newService.Annotations[key] = value } } if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(newService); err != nil { logger.Error(err, "Unable to patch redis service with comparison object") return err } logger.Info("Syncing Redis service with defined properties") return updateService(namespace, newService) } logger.Info("Redis service is already in-sync") return nil }
updateService
// updateService is a method to update service is Kubernetes func updateService(namespace string, service *corev1.Service) error { logger := serviceLogger(namespace, service.Name) _, err := generateK8sClient().CoreV1().Services(namespace).Update(context.TODO(), service, metav1.UpdateOptions{}) if err != nil { logger.Error(err, "Redis service update failed") return err } logger.Info("Redis service updated successfully") return nil }