WorkQueue 是 Kubernetes 的一个队列实现,属于 client-go 库的 workqueue 包,其作为 K8S Controller 的重要组成部份,在资源变化时将资源写入 WorkQueue 中,再由具体的 worker 去消费执行
WorkQueue 具备如下基础特性:
- 有序性:队列的基本性质,先进先出,处理队列元素的顺序与加入队列的顺序一致
- 去重:同一时间不会重复处理相同的元素,如果一个元素在处理前被多次加入队列,也只会被处理一次
- 支持多生成者与多消费者
- 通知机制:队列关闭时会以信号量的方式发出通知
workqueue 除了基本的队列类型之外,还扩展提供了另外两种队列:
- 延迟队列(delaying queue),支持延迟指定的时间后再入队列
- 限时队列(rate limiting queue),支持按指定的限速算法控制元素加入队列的时间
本文主要描述 workqueue 基本队列的实现
基本队列
接口声明
基本队列的接口声明
type Interface interface {
// 入队, 元素加入队尾
Add(item interface{})
// 当前队列长度
Len() int
// 获取队头元素
Get() (item interface{}, shutdown bool)
// 标记为完成
Done(item interface{})
// 关闭队列(立即停止)
ShutDown()
// 关闭队列(优雅关闭,等待元素处理完成)
ShutDownWithDrain()
// 队列是否关闭中
ShuttingDown() bool
}
生产者通过 Add 添加元素到队列中,消费者先使用 Get 获取要处理的元素,处理结束后调用 Done 标记其为完成,将元素出队。
同时还有其他的 Len 方法和关闭相关的方法
接口实现
让我们看一下基本队列的实现
type Type struct {
// 队列的主体结构 ,确保了有序
queue []t
// 需要处理的元素集合,用以去重
dirty set
// 正在处理中的元素集合
processing set
...
}
type empty struct{}
type t interface{}
type set map[t]empty
上面展示了其核心的字段,主要就是将元素加入队列 queue 中,并加入到 dirty 的集合中方便检查去重,而通过 Get 方法获取的元素则将其从 queue 提出,并加入到 processing 集合中