k8S的工作队列代码解析和rust的实现(基础工作队列)
基础知识
在 Kubernetes(K8S)中,workqueue
是一个用于处理异步任务的工具,它提供了一种安全、高效的方式来管理并处理工作项。下面我们详细解读这段代码的工作原理。
主要结构和接口
Interface
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShutDownWithDrain()
ShuttingDown() bool
}
这个接口定义了一个工作队列应具有的基本功能:
Add(item interface{})
:添加一个新的工作项。Len() int
:返回队列中的当前工作项数。Get() (item interface{}, shutdown bool)
:获取一个要处理的工作项。Done(item interface{})
:标记一个工作项为已完成。ShutDown()
:关闭队列,不再接受新工作项。ShutDownWithDrain()
:在所有工作项处理完毕后关闭队列。ShuttingDown() bool
:检查队列是否正在关闭。
FIFO队列存储过程
正常添加流程
通过 Add 方法往 FIFO 队列中分别插入 1、2、3 三个元素,此时队列中的 queue 和 dirty 分别存有 1、2、3 元素,processing为空;然后通过 Get 方法获取最先进入的1元素,此时1 元素被放入 processing中,queue 和 dirty 剩余有 2、3 元素,表示1元素正在被处理;最后,当我们处理完 1 元素时通过 Done 方法标记该元素已经被处理完成,此时将1元素从 processing中移除。
高并发下如何保证一个元素哪怕其被添加了多次,但也只会被处理一次?
元素在queue和dirty中还未放入processing:
在并发场景下,假设 goroutine A 通过Add方法将1元素到queue和dirty中;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在dirty中已经存在相同的元素,会直接返回。
元素在processing中正被处理:
在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 中并从queue和dirty中移除;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,而是仅添加到dirty中;在 goroutine A 通过 Done 方法标记1元素被处理完成并从processing删除后,检测到dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。
get时候队列空了:
当goroutine进行get时候,队列空了,则会让get的goroutine进行等待、阻塞。当队列有值的时候才会继续get。
队列关闭了:
当队列关闭了之后,get、add方法将不允许添加、获取数据。但正在处理的数据除外,可以设置一种策略等待正在处理的数据处理完成。
多线程管理
在 Go 语言中,sync.Cond
是一个用于协调并发操作的条件变量。它允许一个或多个 goroutine 等待某个条件发生变化,同时确保这些等待和通知操作是安全的。sync.Cond
通常与互斥锁 (sync.Mutex
) 一起使用。
sync.Cond
的主要方法
Wait()
Signal()
Broadcast()
sync.Cond
的用法
Wait()
Wait方法会使调用它的 goroutine 进入等待状态,直到收到
Signal或
Broadcast通知。调用
Wait前,必须持有与条件变量关联的互斥锁。调用
Wait后,会自动释放该锁,当
Wait` 返回时,goroutine 会重新获得该锁。
Signal()
Signal 方法会唤醒一个正在等待的 goroutine。如果没有 goroutine 在等待,
Signal` 不做任何操作。
Broadcast()
Broadcast` 方法会唤醒所有正在等待的 goroutine。
sync.Cond
的应用场景
sync.Cond` 适用于需要等待某个条件的并发场景,例如生产者-消费者模型、资源池等。
Rust实现
基础技术
堆、栈与Box
在 Rust 中,Box
是一个智能指针类型,用于在堆上分配内存并拥有该内存的所有权。Box
将值从栈上移动到堆上,并且当 Box
被丢弃时,堆上的值也会被自动清理。Box
主要用于以下情况:
- 在堆上分配大型数据结构:有时候我们不希望在栈上分配过大的数据结构,这时可以使用
Box
将其分配到堆上。 - 递归数据结构:由于 Rust 编译器需要在编译时知道类型的大小,递归数据结构(例如树或链表)无法在栈上存储,这时可以使用
Box
。 - 动态分发(Dynamic Dispatch):当需要使用运行时多态(例如
trait
对象)时,可以使用Box<dyn Trait>
。
在rust中的工作队列的实现中,Box
用于存储不同类型的工作项。这是通过将工作项封装在Box<dyn Any + Send + Sync>
中实现的,Any
是一个允许存储任意类型的特征,Send
和Sync
则确保这些类型在线程间传递和共享时是安全的。
高并发控制设计
Go 语言中的高并发控制
在 Go 中,sync.Mutex
和 sync.Cond
是高并发控制的基础工具:
sync.Mutex
:用于保护临界区,确保同一时间只有一个 goroutine 访问共享资源。sync.Cond
:条件变量,用于 goroutines 等待或通知某个条件发生变化。
**Go 语言中 **sync.Cond
的主要方法:
Wait()
Signal()
Broadcast()
Rust语言中的高并发控制
在 Rust 中,std::sync::Mutex
和 std::sync::Condvar
提供了和golang中类似的功能:
std::sync::Mutex
:用于保护临界区,确保同一时间只有一个线程访问共享资源。std::sync::Condvar
:条件变量,用于线程等待或通知某个条件发生变化。
**Rust 中 **Condvar
的主要方法:
wait
:和golang中的wait相同notify_one
notify_all
代码设计
结构体
struct Queue {
queue: Mutex<VecDeque<WorkItem>>,
dirty: Mutex<HashSet<usize>>,
processing: Mutex<HashSet<usize>>,
cond: Condvar,
shutting_down: Mutex<bool>,
}
- queue:标准的工作队列,是workqpueue的载体
- dirty:记录添加进去队列中,但未被处理的元素
- p
- cond:控制线程同步、高并发的rust组件
- shutting_down:用于记录是否这个队列被关闭了
Add方法设计
代码对比
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if _, exists := q.dirty[item]; exists {
return
}
q.dirty[item] = struct{}{}
if _, exists := q.processing[item]; !exists {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
** **这个方法的实现如下
fn add(&self, item: WorkItem) {
let key = Queue::get_item_key(&item);
// 如果队列已经关闭,则退出
{
let shutting_down = self.shutting_down.lock().unwrap();
if *shutting_down {
println!("Queue shut down");
return;
}
}
let mut dirty = self.dirty.lock().unwrap();
if !dirty.contains(&key) {
dirty.insert(key);
drop(dirty);
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);
self.cond.notify_one();
}
}
方法解析
获取工作项的唯一键
let key = Queue::get_item_key(&item);
- **使用 **
Queue::get_item_key(&item)
方法获取工作项的唯一键。这个键用于标识工作项并在dirty
集合中跟踪它。
检查队列是否已经关闭
{
let shutting_down = self.shutting_down.lock().unwrap();
if *shutting_down {
println!("Queue shut down");
return;
}
}
- **通过获取 **
shutting_down
锁来检查队列是否已经关闭。 - **如果队列已经关闭,打印 **
Queue shut down
并返回。 - **使用一个代码块来限制锁的持有范围,确保在检查完 **
shutting_down
状态后立即释放锁。
更新 dirty
集合
let mut dirty = self.dirty.lock().unwrap();
if !dirty.contains(&key) {
dirty.insert(key);
drop(dirty);
- **获取 **
dirty
集合的锁,检查工作项的键是否已经存在于dirty
集合中。如果键不存在于dirty
集合中,将键插入集合。 - **使用 **
drop(dirty)
手动释放锁,确保在插入键后立即释放锁。
更新 queue
队列并通知消费者
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);
self.cond.notify_one();
- 获取
queue
队列的锁,将工作项添加到队列的末尾。 - 使用 drop(queue)手动释放锁,确保在添加工作项后立即释放锁。
- 调用
self.cond.notify_one()
通知等待的消费者有新的工作项可用。
get方法设计
代码对比
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item = q.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
rust中的实现代码,这里实现了主体功能,省略了metrics的获取。
fn get(&self) -> Option<WorkItem> {
let mut item = None;
let mut queue = self.queue.lock().unwrap();
while queue.is_empty() {
// 使用一个块来释放 shutting_down 的锁
{
let shutting_down = self.shutting_down.lock().unwrap();
if *shutting_down {
return None;
}
}
queue = self.cond.wait(queue).unwrap();
}
if !queue.is_empty() {
item = queue.pop_front();
}
if let Some(ref itm) = item {
let key = Queue::get_item_key(itm);
self.processing.lock().unwrap().insert(key);
self.dirty.lock().unwrap().remove(&key);
}
item
}
方法解析
定义返回值和获取队列锁
let mut item = None;
let mut queue = self.queue.lock().unwrap();
-
item
:定义一个变量作为最终的返回值,初始化为None
。 - 获取队列锁:获取
queue
的互斥锁,如果其他线程正在访问queue
,当前线程将会阻塞,直到锁可用。此时queue
被锁定,确保对queue
的操作是线程安全的。
等待非空队列或检查关闭状态
while queue.is_empty() {
// 使用一个块来释放 shutting_down 的锁
{
let shutting_down = self.shutting_down.lock().unwrap();
if *shutting_down {
return None;
}
}
queue = self.cond.wait(queue).unwrap();
}
- 检查队列是否为空:如果
queue
为空,则进入while
循环。 - 检查关闭状态:使用一个代码块来获取
shutting_down
的互斥锁并检查其状态。如果shutting_down
为true
,说明队列正在关闭,返回None
退出方法。 - 等待条件变量:调用
self.cond.wait(queue).unwrap()
,使当前线程进入等待状态,直到条件变量收到通知。此时会释放queue
的锁,允许其他线程操作queue
。当被唤醒后,线程会重新获取queue
的锁。这里他会一直等待队列不为空,即有生产者入队消息。
从队列中获取工作项
if !queue.is_empty() {
item = queue.pop_front();
}
- 获取工作项:如果
queue
不为空,从队列前端弹出一个工作项并赋值给item
。
更新 processing
和 dirty
集合
if let Some(ref itm) = item {
let key = Queue::get_item_key(itm);
self.processing.lock().unwrap().insert(key);
self.dirty.lock().unwrap().remove(&key);
}
- 检查
item
是否存在:如果item
存在,获取其引用itm
。 - 获取工作项的键:使用
Queue::get_item_key(itm)
获取工作项的唯一键。 - 更新
processing
集合:获取processing
的互斥锁,并将键插入processing
集合中,表示该工作项正在处理。 - 更新
dirty
集合:获取dirty
的互斥锁,并将键从dirty
集合中移除,表示该工作项不再需要重新处理,因为他已经get到了。
返回工作项
item
- 返回值:返回
item
,这是一个可选类型,表示要处理的工作项。如果在整个过程中没有获取到工作项,则返回None
。
done方法
代码对比
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
} else if q.processing.len() == 0 {
q.cond.Signal()
}
}
rust中的实现如下:
fn done(&self, item: WorkItem) {
let key = Queue::get_item_key(&item);
self.processing.lock().unwrap().remove(&key);
let mut dirty = self.dirty.lock().unwrap();
if dirty.contains(&key) {
drop(dirty);
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);
self.cond.notify_one();
} else if self.processing.lock().unwrap().is_empty() {
self.cond.notify_all();
}
}
方法解析
获取工作项的唯一键并从 processing
集合中移除
let key = Queue::get_item_key(&item);
self.processing.lock().unwrap().remove(&key);
- 获取工作项的唯一键:调用
Queue::get_item_key(&item)
方法获取工作项的唯一键。这个键用于标识工作项。 - 从
processing
集合中移除:获取processing
的互斥锁,并从processing
集合中移除该键,表示该工作项的处理已经完成。
检查并更新 dirty
集合和 queue
队列
let mut dirty = self.dirty.lock().unwrap();
if dirty.contains(&key) {
drop(dirty);
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);
self.cond.notify_one();
}
- 获取
dirty
集合的锁:获取dirty
的互斥锁,确保对dirty
集合的操作是线程安全的。 - 检查
dirty
集合:如果dirty
集合包含该键,表示该工作项在处理过程中再次被标记为需要处理。 - 更新
queue
队列:将工作项重新添加到queue
队列中,以便后续处理。具体步骤如下:- 释放
dirty
锁:使用drop(dirty)
释放dirty
的锁。 - 获取
queue
锁:获取queue
的互斥锁,并将工作项添加到队列末尾。 - 释放
queue
锁:使用drop(queue)
释放queue
的锁。 - 通知等待的线程:调用
self.cond.notify_one()
,通知等待在条件变量上的消费者线程有新的工作项可用。
- 释放
检查 processing
集合并通知所有线程
else if self.processing.lock().unwrap().is_empty() {
self.cond.notify_all();
}
- 获取
processing
锁:获取processing
的互斥锁。 - 检查
processing
集合:如果processing
集合为空,表示当前没有正在处理的工作项。 - 通知所有线程:调用
self.cond.notify_all()
,通知所有等待在条件变量上的线程。这通常用于在队列即将关闭时确保所有工作项都已处理完毕。