func main() { ...
// enableNodeDeployment:Enables deploying the external-provisioner together with a CSI driver on nodes to manage node-local volumes. // 启动将external-provisioner和CSI 驱动部署在节点上,用以管理节点本地的卷 node := os.Getenv("NODE_NAME") if *enableNodeDeployment && node == "" { klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.") }
...
// 卷快照Clientset // snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1Client snapClient, err := snapclientset.NewForConfig(config) ...
// 通过uds连接csi driver server grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager) ... // 探活,直到driver ready. Identity.Probe err = ctrl.Probe(grpcClient, *operationTimeout) ...
// Identity.GetPluginInfo // Autodetect provisioner name provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout) ...
// 兼容的逻辑,将in-Tree的存储插件迁移到CSI,如果支持,metricsManager和grpcClient将重建 translator := csitrans.New() supportsMigrationFromInTreePluginName := "" if translator.IsMigratedCSIDriverByName(provisionerName) { ... // Create a new connection with the metrics manager with migrated label metricsManager = metrics.NewCSIMetricsManagerWithOptions(provisionerName, // Will be provided via default gatherer. metrics.WithProcessStartTime(false), metrics.WithMigration()) migratedGrpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager) ... grpcClient = migratedGrpcClient ... }
// Prepare http endpoint for metrics + leader election healthz ...
// 调用Identity.GetPluginCapabilities 获取插件能力 // 调用Controller.ControllerGetCapabilities 获取Controller能力 pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout) if err != nil { klog.Fatalf("Error getting CSI driver capabilities: %s", err) }
// 为provisioner生成唯一ID // Generate a unique ID for this provisioner timeStamp := time.Now().UnixNano() / int64(time.Millisecond) identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName // ? if *enableNodeDeployment { identity = identity + "-" + node }
// 创建InformerFactory factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer) var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity
// 创建sc和pvc的lister // ------------------------------- // Listers // Create informer to prevent hit the API server for all resource request scLister := factory.Storage().V1().StorageClasses().Lister() claimLister := factory.Core().V1().PersistentVolumeClaims().Lister() // CSI Controller service支持Publish/Unpublish Volume时创建va的lister var vaLister storagelistersv1.VolumeAttachmentLister if controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] { klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments") vaLister = factory.Storage().V1().VolumeAttachments().Lister() } else { klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments") } // 启动Node Deployment模式的时候,初始化nodeDeployment,并获取节点信息 var nodeDeployment *ctrl.NodeDeployment if *enableNodeDeployment { nodeDeployment = &ctrl.NodeDeployment{ NodeName: node, ClaimInformer: factory.Core().V1().PersistentVolumeClaims(), ImmediateBinding: *nodeDeploymentImmediateBinding, BaseDelay: *nodeDeploymentBaseDelay, MaxDelay: *nodeDeploymentMaxDelay, } nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout) if err != nil { klog.Fatalf("Failed to get node info from CSI driver: %v", err) } nodeDeployment.NodeInfo = *nodeInfo }
// 构建拓扑信息 var nodeLister listersv1.NodeLister var csiNodeLister storagelistersv1.CSINodeLister if ctrl.SupportsTopology(pluginCapabilities) { if nodeDeployment != nil { // Avoid watching in favor of fake, static objects. This is particularly relevant for // Node objects, which can generate significant traffic. csiNode := &storagev1.CSINode{ ObjectMeta: metav1.ObjectMeta{ Name: nodeDeployment.NodeName, }, Spec: storagev1.CSINodeSpec{ Drivers: []storagev1.CSINodeDriver{ { Name: provisionerName, NodeID: nodeDeployment.NodeInfo.NodeId, }, }, }, } node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeDeployment.NodeName, }, } if nodeDeployment.NodeInfo.AccessibleTopology != nil { for key := range nodeDeployment.NodeInfo.AccessibleTopology.Segments { csiNode.Spec.Drivers[0].TopologyKeys = append(csiNode.Spec.Drivers[0].TopologyKeys, key) } node.Labels = nodeDeployment.NodeInfo.AccessibleTopology.Segments } klog.Infof("using local topology with Node = %+v and CSINode = %+v", node, csiNode)
// We make those fake objects available to the topology code via informers which // never change. stoppedFactory := informers.NewSharedInformerFactory(clientset, 1000*time.Hour) csiNodes := stoppedFactory.Storage().V1().CSINodes() nodes := stoppedFactory.Core().V1().Nodes() csiNodes.Informer().GetStore().Add(csiNode) nodes.Informer().GetStore().Add(node) csiNodeLister = csiNodes.Lister() nodeLister = nodes.Lister()
} else { csiNodeLister = factory.Storage().V1().CSINodes().Lister() nodeLister = factory.Core().V1().Nodes().Lister() } }
// 创建pvc informer // ------------------------------- // PersistentVolumeClaims informer rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax) claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
// Setup options provisionerOptions := []func(*controller.ProvisionController) error{ controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead. controller.FailedProvisionThreshold(0), controller.FailedDeleteThreshold(0), controller.RateLimiter(rateLimiter), controller.Threadiness(int(*workerThreads)), controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()), controller.ClaimsInformer(claimInformer), controller.NodesLister(nodeLister), }
if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) { provisionerOptions = append(provisionerOptions, controller.AddFinalizer(true)) }
if supportsMigrationFromInTreePluginName != "" { provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName})) }
// 创建provisioner,实现了Provisioner接口 // Create the provisioner: it implements the Provisioner interface expected by // the controller csiProvisioner := ctrl.NewCSIProvisioner( clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology, *immediateTopology, translator, scLister, csiNodeLister, nodeLister, claimLister, vaLister, *extraCreateMetadata, *defaultFSType, nodeDeployment, *controllerPublishReadOnly, )
// 启用时,通过调用CSI Driver Contorler.GetCapacity获得的容量生成CSIStorageCapacity对象 // enableCapacity:This enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call. var capacityController *capacity.Controller if *enableCapacity { ...
// Wrap Provision and Delete to detect when it is time to refresh capacity. csiProvisioner = capacity.NewProvisionWrapper(csiProvisioner, capacityController) }
provisionController = controller.NewProvisionController( clientset, provisionerName, csiProvisioner, provisionerOptions..., )
// 克隆方面的逻辑,待深究 csiClaimController := ctrl.NewCloningProtectionController( clientset, claimLister, claimInformer, claimQueue, controllerCapabilities, )
// 暴露metrics和分析接口 // Start HTTP server, regardless whether we are the leader or not. if addr != "" { ... }
// 启动 run := func(ctx context.Context) { factory.Start(ctx.Done()) if factoryForNamespace != nil { // Starting is enough, the capacity controller will // wait for sync. factoryForNamespace.Start(ctx.Done()) } cacheSyncResult := factory.WaitForCacheSync(ctx.Done()) for _, v := range cacheSyncResult { if !v { klog.Fatalf("Failed to sync Informers!") } }
if capacityController != nil { go capacityController.Run(ctx, int(*capacityThreads)) } if csiClaimController != nil { go csiClaimController.Run(ctx, int(*finalizerThreads)) } provisionController.Run(ctx) }
if !*enableLeaderElection { // 运行 run(context.TODO()) } else { ... // 选主运行 le := leaderelection.NewLeaderElection(leClientset, lockName, run) ...
if err := le.Run(); err != nil { klog.Fatalf("failed to initialize leader election: %v", err) } }
}
|