searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

CSI规范与开发

2023-04-28 06:54:46
68
0

CSI规范

CSI是Container Storage Interface(容器存储接口)的简写。CSI规范见 https://github.com/container-storage-interface/spec

CSI是一个行业标准,存储供应商(SP)按照规范开发的插件可在支持该规范的容器编排(CO)系统中运行。言下之意即CSI非CO绑定的,支持CSI的CO包括Kubernetes、Mesos等。

架构里面会涉及到Node Plugin和Controller Plugin(可选):

  • Node Plugin,为CSI RPC提供服务的gRPC端点,该RPC必须在节点上运行,SP配置的卷将在该节点上发布。Node Plugin必须在节点上运行,发布(mount/unmount) SP 提供的卷。
  • Controller Plugin,为CSI RPC提供服务的gRPC端点。Controller Plugin可以运行任何地方,干创建和删除卷等事情。

 

Kubernetes CSI开发

Kubernetes CSI 开发文档:https://kubernetes-csi.github.io/docs/introduction.html

Kubernetes 有自己的架构和组件,CSI的实现也基于自身的kubelet和Kubernetes API,利用kubelet和Kubernetes API与CSI Driver通信

  • Kubelet到CSI驱动程序的通信,利用kubelet的插件机制实现与CSI Driver通信,通常是CSI Driver的Node Plugin
    • Kubelet通过UDS(Unix Domain Socket)直接向CSI驱动程序发出CSI调用(如NodeStageVolume、NodePublishVolume等),以装载和卸载卷。
    • Kubelet通过Kubelet插件注册机制发现CSI驱动程序(以及用于与CSI驱动进行交互的UDS)。
    • 部署在Kubernetes上的所有CSI驱动程序必须在每个受支持的节点上使用kubelet插件注册机制进行注册。
  • Master到CSI驱动程序的通信,利用Kubernetes API实现与CSI Driver交互,通常是CSI Driver的Controller Plugin
    • Kubernetes主组件不与CSI驱动程序直接通信(通过UDS或其他方式)。
    • Kubernetes Master组件仅与KubernetesAPI交互。
    • 需要依赖于Kubernetes API的操作(如卷创建、卷连接、卷快照等)的CSI驱动程序必须监视Kubernetes API并针对其触发适当的CSI操作。

实现Kubernetes CSI Driver时推荐使用下列组件:

  • Kubernetes CSI Sidecar ContainersSidecar Containter目的是简化开发和部署CSI Driver。例如external-provisioner就是一个Controller,用于watch PVC,并调用CSI Driver endpoint创建卷。
  • Kubernetes CSI objects
  • CSI Driver Testing tools

要使用此机制实现CSI驱动程序,CSI驱动软件开发人员应:

  1. 创建一个容器化应用程序,实现CSI规范中描述的标识、节点和可选的控制器服务(CSI驱动程序容器)。更多内容参考 开发CSI驱动 。
  1. 使用csi-sanity进行单元测试。更多内容参考 驱动 - 单元测试
  1. 定义Kubernetes API YAML文件,这些文件部署csi驱动程序容器以及适当的sidecar容器。更多内容参考Deploying in Kubernetes 。
  1. 在Kubernetes集群上部署驱动程序,并在其上运行端到端功能测试。更多内容参考 Driver - Functional Testing

CSI Driver Service

实现Kubernetes CSI driver至少实现Identity 和 Node service,Controller service可选

service Identity {
  rpc GetPluginInfo(GetPluginInfoRequest)
    returns (GetPluginInfoResponse) {} // 返回Driver名称和版本

  rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
    returns (GetPluginCapabilitiesResponse) {} 
  // 返回支持的能力,包括PluginCapability_Service_CONTROLLER_SERVICE 和 
  // PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS 提供拓扑信息从而影响调度

  rpc Probe (ProbeRequest)
    returns (ProbeResponse) {} // 探活
}

 

service Node {
  rpc NodeStageVolume (NodeStageVolumeRequest)
    returns (NodeStageVolumeResponse) {}

  rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
    returns (NodeUnstageVolumeResponse) {}

  rpc NodePublishVolume (NodePublishVolumeRequest)
    returns (NodePublishVolumeResponse) {}

  rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
    returns (NodeUnpublishVolumeResponse) {}

  rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
    returns (NodeGetVolumeStatsResponse) {}


  rpc NodeExpandVolume(NodeExpandVolumeRequest)
    returns (NodeExpandVolumeResponse) {}


  rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
    returns (NodeGetCapabilitiesResponse) {}

  rpc NodeGetInfo (NodeGetInfoRequest)
    returns (NodeGetInfoResponse) {}
}

 

service Controller {
  rpc CreateVolume (CreateVolumeRequest)
    returns (CreateVolumeResponse) {}

  rpc DeleteVolume (DeleteVolumeRequest)
    returns (DeleteVolumeResponse) {}

  rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
    returns (ControllerPublishVolumeResponse) {}

  rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
    returns (ControllerUnpublishVolumeResponse) {}

  rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
    returns (ValidateVolumeCapabilitiesResponse) {}

  rpc ListVolumes (ListVolumesRequest)
    returns (ListVolumesResponse) {}

  rpc GetCapacity (GetCapacityRequest)
    returns (GetCapacityResponse) {} // 获取容量

  rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
    returns (ControllerGetCapabilitiesResponse) {} // 获取Controller支持的能力

  rpc CreateSnapshot (CreateSnapshotRequest)
    returns (CreateSnapshotResponse) {}

  rpc DeleteSnapshot (DeleteSnapshotRequest)
    returns (DeleteSnapshotResponse) {}

  rpc ListSnapshots (ListSnapshotsRequest)
    returns (ListSnapshotsResponse) {}

  rpc ControllerExpandVolume (ControllerExpandVolumeRequest)
    returns (ControllerExpandVolumeResponse) {}

  rpc ControllerGetVolume (ControllerGetVolumeRequest)
    returns (ControllerGetVolumeResponse) {
        option (alpha_method) = true;
    }
}

 

Kubernetes 辅助开发 CSI 组件

repo库:https://github.com/kubernetes-csi

external-provisioner

main.go


func main() {
 ...

  // enableNodeDeployment:Enables deploying the external-provisioner together with a CSI driver on nodes to manage node-local volumes.
 // 启动将external-provisioner和CSI 驱动部署在节点上,用以管理节点本地的卷
  node := os.Getenv("NODE_NAME")
 if *enableNodeDeployment && node == "" {
  klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
 }

 ...

  // 卷快照Clientset
 // snapclientset.NewForConfig creates a new Clientset for  VolumesnapshotV1Client
 snapClient, err := snapclientset.NewForConfig(config)
  ...

  // 通过uds连接csi driver server
 grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
  ...
 
 // 探活,直到driver ready.  Identity.Probe
 err = ctrl.Probe(grpcClient, *operationTimeout)
  ...

  // Identity.GetPluginInfo
 // Autodetect provisioner name
 provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
 ...

  // 兼容的逻辑,将in-Tree的存储插件迁移到CSI,如果支持,metricsManager和grpcClient将重建
 translator := csitrans.New()
 supportsMigrationFromInTreePluginName := ""
 if translator.IsMigratedCSIDriverByName(provisionerName) {
  ...
  // Create a new connection with the metrics manager with migrated label
  metricsManager = metrics.NewCSIMetricsManagerWithOptions(provisionerName,
   // Will be provided via default gatherer.
   metrics.WithProcessStartTime(false),
   metrics.WithMigration())
  migratedGrpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
  ...
  grpcClient = migratedGrpcClient
    ...
 }

 // Prepare http endpoint for metrics + leader election healthz
 ...

  // 调用Identity.GetPluginCapabilities 获取插件能力
  // 调用Controller.ControllerGetCapabilities 获取Controller能力
 pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
 if err != nil {
  klog.Fatalf("Error getting CSI driver capabilities: %s", err)
 }

  // 为provisioner生成唯一ID
 // Generate a unique ID for this provisioner
 timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
 identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName
 // ?
  if *enableNodeDeployment {
  identity = identity + "-" + node
 }

  // 创建InformerFactory
 factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
 var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity

  // 创建sc和pvc的lister
 // -------------------------------
 // Listers
 // Create informer to prevent hit the API server for all resource request
 scLister := factory.Storage().V1().StorageClasses().Lister()
 claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()
  
 // CSI Controller service支持Publish/Unpublish Volume时创建va的lister
 var vaLister storagelistersv1.VolumeAttachmentLister
 if controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] {
  klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments")
  vaLister = factory.Storage().V1().VolumeAttachments().Lister()
 } else {
  klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments")
 }
  
  // 启动Node Deployment模式的时候,初始化nodeDeployment,并获取节点信息
 var nodeDeployment *ctrl.NodeDeployment
 if *enableNodeDeployment {
  nodeDeployment = &ctrl.NodeDeployment{
   NodeName:         node,
   ClaimInformer:    factory.Core().V1().PersistentVolumeClaims(),
   ImmediateBinding: *nodeDeploymentImmediateBinding,
   BaseDelay:        *nodeDeploymentBaseDelay,
   MaxDelay:         *nodeDeploymentMaxDelay,
  }
  nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout)
  if err != nil {
   klog.Fatalf("Failed to get node info from CSI driver: %v", err)
  }
  nodeDeployment.NodeInfo = *nodeInfo
 }

  // 构建拓扑信息
 var nodeLister listersv1.NodeLister
 var csiNodeLister storagelistersv1.CSINodeLister
 if ctrl.SupportsTopology(pluginCapabilities) {
  if nodeDeployment != nil {
   // Avoid watching in favor of fake, static objects. This is particularly relevant for
   // Node objects, which can generate significant traffic.
   csiNode := &storagev1.CSINode{
    ObjectMeta: metav1.ObjectMeta{
     Name: nodeDeployment.NodeName,
    },
    Spec: storagev1.CSINodeSpec{
     Drivers: []storagev1.CSINodeDriver{
      {
       Name:   provisionerName,
       NodeID: nodeDeployment.NodeInfo.NodeId,
      },
     },
    },
   }
   node := &v1.Node{
    ObjectMeta: metav1.ObjectMeta{
     Name: nodeDeployment.NodeName,
    },
   }
   if nodeDeployment.NodeInfo.AccessibleTopology != nil {
    for key := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {
     csiNode.Spec.Drivers[0].TopologyKeys = append(csiNode.Spec.Drivers[0].TopologyKeys, key)
    }
    node.Labels = nodeDeployment.NodeInfo.AccessibleTopology.Segments
   }
   klog.Infof("using local topology with Node = %+v and CSINode = %+v", node, csiNode)

   // We make those fake objects available to the topology code via informers which
   // never change.
   stoppedFactory := informers.NewSharedInformerFactory(clientset, 1000*time.Hour)
   csiNodes := stoppedFactory.Storage().V1().CSINodes()
   nodes := stoppedFactory.Core().V1().Nodes()
   csiNodes.Informer().GetStore().Add(csiNode)
   nodes.Informer().GetStore().Add(node)
   csiNodeLister = csiNodes.Lister()
   nodeLister = nodes.Lister()

  } else {
   csiNodeLister = factory.Storage().V1().CSINodes().Lister()
   nodeLister = factory.Core().V1().Nodes().Lister()
  }
 }

  // 创建pvc informer
 // -------------------------------
 // PersistentVolumeClaims informer
 rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
 claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
 claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()

 // Setup options
 provisionerOptions := []func(*controller.ProvisionController) error{
  controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
  controller.FailedProvisionThreshold(0),
  controller.FailedDeleteThreshold(0),
  controller.RateLimiter(rateLimiter),
  controller.Threadiness(int(*workerThreads)),
  controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
  controller.ClaimsInformer(claimInformer),
  controller.NodesLister(nodeLister),
 }

 if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {
  provisionerOptions = append(provisionerOptions, controller.AddFinalizer(true))
 }

 if supportsMigrationFromInTreePluginName != "" {
  provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
 }

  // 创建provisioner,实现了Provisioner接口
 // Create the provisioner: it implements the Provisioner interface expected by
 // the controller
 csiProvisioner := ctrl.NewCSIProvisioner(
  clientset,
  *operationTimeout,
  identity,
  *volumeNamePrefix,
  *volumeNameUUIDLength,
  grpcClient,
  snapClient,
  provisionerName,
  pluginCapabilities,
  controllerCapabilities,
  supportsMigrationFromInTreePluginName,
  *strictTopology,
  *immediateTopology,
  translator,
  scLister,
  csiNodeLister,
  nodeLister,
  claimLister,
  vaLister,
  *extraCreateMetadata,
  *defaultFSType,
  nodeDeployment,
  *controllerPublishReadOnly,
 )

  // 启用时,通过调用CSI Driver Contorler.GetCapacity获得的容量生成CSIStorageCapacity对象
  // enableCapacity:This enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call.
 var capacityController *capacity.Controller
 if *enableCapacity {
  ...

  // Wrap Provision and Delete to detect when it is time to refresh capacity.
  csiProvisioner = capacity.NewProvisionWrapper(csiProvisioner, capacityController)
 }

 provisionController = controller.NewProvisionController(
  clientset,
  provisionerName,
  csiProvisioner,
  provisionerOptions...,
 )

  // 克隆方面的逻辑,待深究
 csiClaimController := ctrl.NewCloningProtectionController(
  clientset,
  claimLister,
  claimInformer,
  claimQueue,
  controllerCapabilities,
 )

  // 暴露metrics和分析接口
 // Start HTTP server, regardless whether we are the leader or not.
 if addr != "" {
  ...
 }

  // 启动
 run := func(ctx context.Context) {
  factory.Start(ctx.Done())
  if factoryForNamespace != nil {
   // Starting is enough, the capacity controller will
   // wait for sync.
   factoryForNamespace.Start(ctx.Done())
  }
  cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
  for _, v := range cacheSyncResult {
   if !v {
    klog.Fatalf("Failed to sync Informers!")
   }
  }

  if capacityController != nil {
   go capacityController.Run(ctx, int(*capacityThreads))
  }
  if csiClaimController != nil {
   go csiClaimController.Run(ctx, int(*finalizerThreads))
  }
  provisionController.Run(ctx)
 }

 if !*enableLeaderElection {
    // 运行
  run(context.TODO())
 } else {
  ...
  // 选主运行
  le := leaderelection.NewLeaderElection(leClientset, lockName, run)
  ...

  if err := le.Run(); err != nil {
   klog.Fatalf("failed to initialize leader election: %v", err)
  }
 }

}

 

 

 

 

0条评论
作者已关闭评论
Darren
11文章数
0粉丝数
Darren
11 文章 | 0 粉丝
Darren
11文章数
0粉丝数
Darren
11 文章 | 0 粉丝
原创

CSI规范与开发

2023-04-28 06:54:46
68
0

CSI规范

CSI是Container Storage Interface(容器存储接口)的简写。CSI规范见 https://github.com/container-storage-interface/spec

CSI是一个行业标准,存储供应商(SP)按照规范开发的插件可在支持该规范的容器编排(CO)系统中运行。言下之意即CSI非CO绑定的,支持CSI的CO包括Kubernetes、Mesos等。

架构里面会涉及到Node Plugin和Controller Plugin(可选):

  • Node Plugin,为CSI RPC提供服务的gRPC端点,该RPC必须在节点上运行,SP配置的卷将在该节点上发布。Node Plugin必须在节点上运行,发布(mount/unmount) SP 提供的卷。
  • Controller Plugin,为CSI RPC提供服务的gRPC端点。Controller Plugin可以运行任何地方,干创建和删除卷等事情。

 

Kubernetes CSI开发

Kubernetes CSI 开发文档:https://kubernetes-csi.github.io/docs/introduction.html

Kubernetes 有自己的架构和组件,CSI的实现也基于自身的kubelet和Kubernetes API,利用kubelet和Kubernetes API与CSI Driver通信

  • Kubelet到CSI驱动程序的通信,利用kubelet的插件机制实现与CSI Driver通信,通常是CSI Driver的Node Plugin
    • Kubelet通过UDS(Unix Domain Socket)直接向CSI驱动程序发出CSI调用(如NodeStageVolume、NodePublishVolume等),以装载和卸载卷。
    • Kubelet通过Kubelet插件注册机制发现CSI驱动程序(以及用于与CSI驱动进行交互的UDS)。
    • 部署在Kubernetes上的所有CSI驱动程序必须在每个受支持的节点上使用kubelet插件注册机制进行注册。
  • Master到CSI驱动程序的通信,利用Kubernetes API实现与CSI Driver交互,通常是CSI Driver的Controller Plugin
    • Kubernetes主组件不与CSI驱动程序直接通信(通过UDS或其他方式)。
    • Kubernetes Master组件仅与KubernetesAPI交互。
    • 需要依赖于Kubernetes API的操作(如卷创建、卷连接、卷快照等)的CSI驱动程序必须监视Kubernetes API并针对其触发适当的CSI操作。

实现Kubernetes CSI Driver时推荐使用下列组件:

  • Kubernetes CSI Sidecar ContainersSidecar Containter目的是简化开发和部署CSI Driver。例如external-provisioner就是一个Controller,用于watch PVC,并调用CSI Driver endpoint创建卷。
  • Kubernetes CSI objects
  • CSI Driver Testing tools

要使用此机制实现CSI驱动程序,CSI驱动软件开发人员应:

  1. 创建一个容器化应用程序,实现CSI规范中描述的标识、节点和可选的控制器服务(CSI驱动程序容器)。更多内容参考 开发CSI驱动 。
  1. 使用csi-sanity进行单元测试。更多内容参考 驱动 - 单元测试
  1. 定义Kubernetes API YAML文件,这些文件部署csi驱动程序容器以及适当的sidecar容器。更多内容参考Deploying in Kubernetes 。
  1. 在Kubernetes集群上部署驱动程序,并在其上运行端到端功能测试。更多内容参考 Driver - Functional Testing

CSI Driver Service

实现Kubernetes CSI driver至少实现Identity 和 Node service,Controller service可选

service Identity {
  rpc GetPluginInfo(GetPluginInfoRequest)
    returns (GetPluginInfoResponse) {} // 返回Driver名称和版本

  rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
    returns (GetPluginCapabilitiesResponse) {} 
  // 返回支持的能力,包括PluginCapability_Service_CONTROLLER_SERVICE 和 
  // PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS 提供拓扑信息从而影响调度

  rpc Probe (ProbeRequest)
    returns (ProbeResponse) {} // 探活
}

 

service Node {
  rpc NodeStageVolume (NodeStageVolumeRequest)
    returns (NodeStageVolumeResponse) {}

  rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
    returns (NodeUnstageVolumeResponse) {}

  rpc NodePublishVolume (NodePublishVolumeRequest)
    returns (NodePublishVolumeResponse) {}

  rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
    returns (NodeUnpublishVolumeResponse) {}

  rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
    returns (NodeGetVolumeStatsResponse) {}


  rpc NodeExpandVolume(NodeExpandVolumeRequest)
    returns (NodeExpandVolumeResponse) {}


  rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
    returns (NodeGetCapabilitiesResponse) {}

  rpc NodeGetInfo (NodeGetInfoRequest)
    returns (NodeGetInfoResponse) {}
}

 

service Controller {
  rpc CreateVolume (CreateVolumeRequest)
    returns (CreateVolumeResponse) {}

  rpc DeleteVolume (DeleteVolumeRequest)
    returns (DeleteVolumeResponse) {}

  rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
    returns (ControllerPublishVolumeResponse) {}

  rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
    returns (ControllerUnpublishVolumeResponse) {}

  rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
    returns (ValidateVolumeCapabilitiesResponse) {}

  rpc ListVolumes (ListVolumesRequest)
    returns (ListVolumesResponse) {}

  rpc GetCapacity (GetCapacityRequest)
    returns (GetCapacityResponse) {} // 获取容量

  rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
    returns (ControllerGetCapabilitiesResponse) {} // 获取Controller支持的能力

  rpc CreateSnapshot (CreateSnapshotRequest)
    returns (CreateSnapshotResponse) {}

  rpc DeleteSnapshot (DeleteSnapshotRequest)
    returns (DeleteSnapshotResponse) {}

  rpc ListSnapshots (ListSnapshotsRequest)
    returns (ListSnapshotsResponse) {}

  rpc ControllerExpandVolume (ControllerExpandVolumeRequest)
    returns (ControllerExpandVolumeResponse) {}

  rpc ControllerGetVolume (ControllerGetVolumeRequest)
    returns (ControllerGetVolumeResponse) {
        option (alpha_method) = true;
    }
}

 

Kubernetes 辅助开发 CSI 组件

repo库:https://github.com/kubernetes-csi

external-provisioner

main.go


func main() {
 ...

  // enableNodeDeployment:Enables deploying the external-provisioner together with a CSI driver on nodes to manage node-local volumes.
 // 启动将external-provisioner和CSI 驱动部署在节点上,用以管理节点本地的卷
  node := os.Getenv("NODE_NAME")
 if *enableNodeDeployment && node == "" {
  klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
 }

 ...

  // 卷快照Clientset
 // snapclientset.NewForConfig creates a new Clientset for  VolumesnapshotV1Client
 snapClient, err := snapclientset.NewForConfig(config)
  ...

  // 通过uds连接csi driver server
 grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
  ...
 
 // 探活,直到driver ready.  Identity.Probe
 err = ctrl.Probe(grpcClient, *operationTimeout)
  ...

  // Identity.GetPluginInfo
 // Autodetect provisioner name
 provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
 ...

  // 兼容的逻辑,将in-Tree的存储插件迁移到CSI,如果支持,metricsManager和grpcClient将重建
 translator := csitrans.New()
 supportsMigrationFromInTreePluginName := ""
 if translator.IsMigratedCSIDriverByName(provisionerName) {
  ...
  // Create a new connection with the metrics manager with migrated label
  metricsManager = metrics.NewCSIMetricsManagerWithOptions(provisionerName,
   // Will be provided via default gatherer.
   metrics.WithProcessStartTime(false),
   metrics.WithMigration())
  migratedGrpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
  ...
  grpcClient = migratedGrpcClient
    ...
 }

 // Prepare http endpoint for metrics + leader election healthz
 ...

  // 调用Identity.GetPluginCapabilities 获取插件能力
  // 调用Controller.ControllerGetCapabilities 获取Controller能力
 pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
 if err != nil {
  klog.Fatalf("Error getting CSI driver capabilities: %s", err)
 }

  // 为provisioner生成唯一ID
 // Generate a unique ID for this provisioner
 timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
 identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName
 // ?
  if *enableNodeDeployment {
  identity = identity + "-" + node
 }

  // 创建InformerFactory
 factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
 var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity

  // 创建sc和pvc的lister
 // -------------------------------
 // Listers
 // Create informer to prevent hit the API server for all resource request
 scLister := factory.Storage().V1().StorageClasses().Lister()
 claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()
  
 // CSI Controller service支持Publish/Unpublish Volume时创建va的lister
 var vaLister storagelistersv1.VolumeAttachmentLister
 if controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] {
  klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments")
  vaLister = factory.Storage().V1().VolumeAttachments().Lister()
 } else {
  klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments")
 }
  
  // 启动Node Deployment模式的时候,初始化nodeDeployment,并获取节点信息
 var nodeDeployment *ctrl.NodeDeployment
 if *enableNodeDeployment {
  nodeDeployment = &ctrl.NodeDeployment{
   NodeName:         node,
   ClaimInformer:    factory.Core().V1().PersistentVolumeClaims(),
   ImmediateBinding: *nodeDeploymentImmediateBinding,
   BaseDelay:        *nodeDeploymentBaseDelay,
   MaxDelay:         *nodeDeploymentMaxDelay,
  }
  nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout)
  if err != nil {
   klog.Fatalf("Failed to get node info from CSI driver: %v", err)
  }
  nodeDeployment.NodeInfo = *nodeInfo
 }

  // 构建拓扑信息
 var nodeLister listersv1.NodeLister
 var csiNodeLister storagelistersv1.CSINodeLister
 if ctrl.SupportsTopology(pluginCapabilities) {
  if nodeDeployment != nil {
   // Avoid watching in favor of fake, static objects. This is particularly relevant for
   // Node objects, which can generate significant traffic.
   csiNode := &storagev1.CSINode{
    ObjectMeta: metav1.ObjectMeta{
     Name: nodeDeployment.NodeName,
    },
    Spec: storagev1.CSINodeSpec{
     Drivers: []storagev1.CSINodeDriver{
      {
       Name:   provisionerName,
       NodeID: nodeDeployment.NodeInfo.NodeId,
      },
     },
    },
   }
   node := &v1.Node{
    ObjectMeta: metav1.ObjectMeta{
     Name: nodeDeployment.NodeName,
    },
   }
   if nodeDeployment.NodeInfo.AccessibleTopology != nil {
    for key := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {
     csiNode.Spec.Drivers[0].TopologyKeys = append(csiNode.Spec.Drivers[0].TopologyKeys, key)
    }
    node.Labels = nodeDeployment.NodeInfo.AccessibleTopology.Segments
   }
   klog.Infof("using local topology with Node = %+v and CSINode = %+v", node, csiNode)

   // We make those fake objects available to the topology code via informers which
   // never change.
   stoppedFactory := informers.NewSharedInformerFactory(clientset, 1000*time.Hour)
   csiNodes := stoppedFactory.Storage().V1().CSINodes()
   nodes := stoppedFactory.Core().V1().Nodes()
   csiNodes.Informer().GetStore().Add(csiNode)
   nodes.Informer().GetStore().Add(node)
   csiNodeLister = csiNodes.Lister()
   nodeLister = nodes.Lister()

  } else {
   csiNodeLister = factory.Storage().V1().CSINodes().Lister()
   nodeLister = factory.Core().V1().Nodes().Lister()
  }
 }

  // 创建pvc informer
 // -------------------------------
 // PersistentVolumeClaims informer
 rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
 claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
 claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()

 // Setup options
 provisionerOptions := []func(*controller.ProvisionController) error{
  controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
  controller.FailedProvisionThreshold(0),
  controller.FailedDeleteThreshold(0),
  controller.RateLimiter(rateLimiter),
  controller.Threadiness(int(*workerThreads)),
  controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
  controller.ClaimsInformer(claimInformer),
  controller.NodesLister(nodeLister),
 }

 if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {
  provisionerOptions = append(provisionerOptions, controller.AddFinalizer(true))
 }

 if supportsMigrationFromInTreePluginName != "" {
  provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
 }

  // 创建provisioner,实现了Provisioner接口
 // Create the provisioner: it implements the Provisioner interface expected by
 // the controller
 csiProvisioner := ctrl.NewCSIProvisioner(
  clientset,
  *operationTimeout,
  identity,
  *volumeNamePrefix,
  *volumeNameUUIDLength,
  grpcClient,
  snapClient,
  provisionerName,
  pluginCapabilities,
  controllerCapabilities,
  supportsMigrationFromInTreePluginName,
  *strictTopology,
  *immediateTopology,
  translator,
  scLister,
  csiNodeLister,
  nodeLister,
  claimLister,
  vaLister,
  *extraCreateMetadata,
  *defaultFSType,
  nodeDeployment,
  *controllerPublishReadOnly,
 )

  // 启用时,通过调用CSI Driver Contorler.GetCapacity获得的容量生成CSIStorageCapacity对象
  // enableCapacity:This enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call.
 var capacityController *capacity.Controller
 if *enableCapacity {
  ...

  // Wrap Provision and Delete to detect when it is time to refresh capacity.
  csiProvisioner = capacity.NewProvisionWrapper(csiProvisioner, capacityController)
 }

 provisionController = controller.NewProvisionController(
  clientset,
  provisionerName,
  csiProvisioner,
  provisionerOptions...,
 )

  // 克隆方面的逻辑,待深究
 csiClaimController := ctrl.NewCloningProtectionController(
  clientset,
  claimLister,
  claimInformer,
  claimQueue,
  controllerCapabilities,
 )

  // 暴露metrics和分析接口
 // Start HTTP server, regardless whether we are the leader or not.
 if addr != "" {
  ...
 }

  // 启动
 run := func(ctx context.Context) {
  factory.Start(ctx.Done())
  if factoryForNamespace != nil {
   // Starting is enough, the capacity controller will
   // wait for sync.
   factoryForNamespace.Start(ctx.Done())
  }
  cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
  for _, v := range cacheSyncResult {
   if !v {
    klog.Fatalf("Failed to sync Informers!")
   }
  }

  if capacityController != nil {
   go capacityController.Run(ctx, int(*capacityThreads))
  }
  if csiClaimController != nil {
   go csiClaimController.Run(ctx, int(*finalizerThreads))
  }
  provisionController.Run(ctx)
 }

 if !*enableLeaderElection {
    // 运行
  run(context.TODO())
 } else {
  ...
  // 选主运行
  le := leaderelection.NewLeaderElection(leClientset, lockName, run)
  ...

  if err := le.Run(); err != nil {
   klog.Fatalf("failed to initialize leader election: %v", err)
  }
 }

}

 

 

 

 

文章来自个人专栏
Darren的容器专栏
11 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0