searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

K8S WorkQueue 设计与实现(三)

2023-10-17 01:27:57
29
0

WorkQueue 是 Kubernetes 的一个队列实现,属于 client-go 库的 workqueue 包,其作为 K8S Controller 的重要组成部份,在资源变化时将资源写入 WorkQueue 中,再由具体的 worker 去消费执行

上文我们讲解了 WorkQueue 基本队列的设计与实现

本文描述其另外一个队列,限流队列 rate limiting queue 的实现

 

限流队列

接口声明

基本队列的接口声明

type RateLimitingInterface interface {
	// 延时队列的接口
	DelayingInterface
	// 使用限流器向队列加入元素
	AddRateLimited(item interface{})
	// 标记该元素已经重试完成,限流器可以停止跟踪它。但依然要执行 Done 操作将其出队
	Forget(item interface{})
	// 返回元素入队的次数
	NumRequeues(item interface{}) int
}

限流队列的使用方式一般是在 Get 之后如果元素处理出错时,通过 AddRateLimited 方法将元素加回队列继续重试,如果失败就继续调用 AddRateLimited,直到成功或者放弃时调用 Forget 方法停止限流器的跟踪,并最终调用 Done 完成出队

接口实现

让我们看一下基本队列的实现

type rateLimitingType struct {
	// 延时队列
	DelayingInterface
	// 限流器
	rateLimiter RateLimiter
}

限流队列在创建是需要指定限流器 RateLimiter,如下面的 NewRateLimitingQueue 函数需要传入一个 RateLimiter 的实现

func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {  
   return &rateLimitingType{  
      DelayingInterface: NewDelayingQueue(),  
      rateLimiter:       rateLimiter,  
   }  
}

限流器

限流器的接口定义如下,使用 When 方法获取元素的等待时长并跟踪元素,处理后使用 Forget 取消跟踪 

type RateLimiter interface {  
   // 获取元素需要等待多长时间才加入队列
   When(item interface{}) time.Duration  
   // 停止跟踪元素
   Forget(item interface{})  
   // 获取元素的失败次数,即加入队列的次数
   NumRequeues(item interface{}) int  
}

WorkQueue 中也提供了几种限流器的实现,有如下:

BucketRateLimiter 使用 Golang 标准库 time/rate 来实现,time/rate 是 Token Bucket(令牌桶) 来实现,桶做为缓冲区,令牌发放来控制速率

ItemExponentialFailureRateLimiter 延时时间随着元素跟踪次数呈指数增长

ItemFastSlowRateLimiter 在特定阈值次数内使用短的时间延迟,超过后使用长的时间

MaxOfRateLimiter 指定多个限流器,选择这些限流器中最大的延时时间

WithMaxWaitRateLimiter 添加一个最大的时间限制,限制使用的限流器的时间超过指定的延迟时间

 

0条评论
0 / 1000
LinHeng
3文章数
0粉丝数
LinHeng
3 文章 | 0 粉丝
LinHeng
3文章数
0粉丝数
LinHeng
3 文章 | 0 粉丝
原创

K8S WorkQueue 设计与实现(三)

2023-10-17 01:27:57
29
0

WorkQueue 是 Kubernetes 的一个队列实现,属于 client-go 库的 workqueue 包,其作为 K8S Controller 的重要组成部份,在资源变化时将资源写入 WorkQueue 中,再由具体的 worker 去消费执行

上文我们讲解了 WorkQueue 基本队列的设计与实现

本文描述其另外一个队列,限流队列 rate limiting queue 的实现

 

限流队列

接口声明

基本队列的接口声明

type RateLimitingInterface interface {
	// 延时队列的接口
	DelayingInterface
	// 使用限流器向队列加入元素
	AddRateLimited(item interface{})
	// 标记该元素已经重试完成,限流器可以停止跟踪它。但依然要执行 Done 操作将其出队
	Forget(item interface{})
	// 返回元素入队的次数
	NumRequeues(item interface{}) int
}

限流队列的使用方式一般是在 Get 之后如果元素处理出错时,通过 AddRateLimited 方法将元素加回队列继续重试,如果失败就继续调用 AddRateLimited,直到成功或者放弃时调用 Forget 方法停止限流器的跟踪,并最终调用 Done 完成出队

接口实现

让我们看一下基本队列的实现

type rateLimitingType struct {
	// 延时队列
	DelayingInterface
	// 限流器
	rateLimiter RateLimiter
}

限流队列在创建是需要指定限流器 RateLimiter,如下面的 NewRateLimitingQueue 函数需要传入一个 RateLimiter 的实现

func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {  
   return &rateLimitingType{  
      DelayingInterface: NewDelayingQueue(),  
      rateLimiter:       rateLimiter,  
   }  
}

限流器

限流器的接口定义如下,使用 When 方法获取元素的等待时长并跟踪元素,处理后使用 Forget 取消跟踪 

type RateLimiter interface {  
   // 获取元素需要等待多长时间才加入队列
   When(item interface{}) time.Duration  
   // 停止跟踪元素
   Forget(item interface{})  
   // 获取元素的失败次数,即加入队列的次数
   NumRequeues(item interface{}) int  
}

WorkQueue 中也提供了几种限流器的实现,有如下:

BucketRateLimiter 使用 Golang 标准库 time/rate 来实现,time/rate 是 Token Bucket(令牌桶) 来实现,桶做为缓冲区,令牌发放来控制速率

ItemExponentialFailureRateLimiter 延时时间随着元素跟踪次数呈指数增长

ItemFastSlowRateLimiter 在特定阈值次数内使用短的时间延迟,超过后使用长的时间

MaxOfRateLimiter 指定多个限流器,选择这些限流器中最大的延时时间

WithMaxWaitRateLimiter 添加一个最大的时间限制,限制使用的限流器的时间超过指定的延迟时间

 

文章来自个人专栏
K8S 设计与实现
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0