WorkQueue 是 Kubernetes 的一个队列实现,属于 client-go 库的 workqueue 包,其作为 K8S Controller 的重要组成部份,在资源变化时将资源写入 WorkQueue 中,再由具体的 worker 去消费执行
上文我们讲解了 WorkQueue 基本队列的设计与实现
本文描述其另外一个队列,限流队列 rate limiting queue 的实现
限流队列
接口声明
基本队列的接口声明
type RateLimitingInterface interface {
// 延时队列的接口
DelayingInterface
// 使用限流器向队列加入元素
AddRateLimited(item interface{})
// 标记该元素已经重试完成,限流器可以停止跟踪它。但依然要执行 Done 操作将其出队
Forget(item interface{})
// 返回元素入队的次数
NumRequeues(item interface{}) int
}
限流队列的使用方式一般是在 Get 之后如果元素处理出错时,通过 AddRateLimited 方法将元素加回队列继续重试,如果失败就继续调用 AddRateLimited,直到成功或者放弃时调用 Forget 方法停止限流器的跟踪,并最终调用 Done 完成出队
接口实现
让我们看一下基本队列的实现
type rateLimitingType struct {
// 延时队列
DelayingInterface
// 限流器
rateLimiter RateLimiter
}
限流队列在创建是需要指定限流器 RateLimiter,如下面的 NewRateLimitingQueue 函数需要传入一个 RateLimiter 的实现
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewDelayingQueue(),
rateLimiter: rateLimiter,
}
}
限流器
限流器的接口定义如下,使用 When 方法获取元素的等待时长并跟踪元素,处理后使用 Forget 取消跟踪
type RateLimiter interface {
// 获取元素需要等待多长时间才加入队列
When(item interface{}) time.Duration
// 停止跟踪元素
Forget(item interface{})
// 获取元素的失败次数,即加入队列的次数
NumRequeues(item interface{}) int
}
WorkQueue 中也提供了几种限流器的实现,有如下:
BucketRateLimiter 使用 Golang 标准库 time/rate 来实现,time/rate 是 Token Bucket(令牌桶) 来实现,桶做为缓冲区,令牌发放来控制速率
ItemExponentialFailureRateLimiter 延时时间随着元素跟踪次数呈指数增长
ItemFastSlowRateLimiter 在特定阈值次数内使用短的时间延迟,超过后使用长的时间
MaxOfRateLimiter 指定多个限流器,选择这些限流器中最大的延时时间
WithMaxWaitRateLimiter 添加一个最大的时间限制,限制使用的限流器的时间超过指定的延迟时间