searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

k8S的工作队列代码解析和rust的实现(基础工作队列)

2024-09-04 09:42:29
8
0

k8S的工作队列代码解析和rust的实现(基础工作队列)

基础知识

在 Kubernetes(K8S)中,workqueue 是一个用于处理异步任务的工具,它提供了一种安全、高效的方式来管理并处理工作项。下面我们详细解读这段代码的工作原理。

主要结构和接口

Interface

type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShutDownWithDrain()
    ShuttingDown() bool
}

这个接口定义了一个工作队列应具有的基本功能:

  • Add(item interface{}):添加一个新的工作项。
  • Len() int:返回队列中的当前工作项数。
  • Get() (item interface{}, shutdown bool):获取一个要处理的工作项。
  • Done(item interface{}):标记一个工作项为已完成。
  • ShutDown():关闭队列,不再接受新工作项。
  • ShutDownWithDrain():在所有工作项处理完毕后关闭队列。
  • ShuttingDown() bool:检查队列是否正在关闭。

FIFO队列存储过程

正常添加流程

通过 Add 方法往 FIFO 队列中分别插入 1、2、3 三个元素,此时队列中的 queue 和 dirty 分别存有 1、2、3 元素,processing为空;然后通过 Get 方法获取最先进入的1元素,此时1 元素被放入 processing中,queue 和 dirty 剩余有 2、3 元素,表示1元素正在被处理;最后,当我们处理完 1 元素时通过 Done 方法标记该元素已经被处理完成,此时将1元素从 processing中移除。

高并发下如何保证一个元素哪怕其被添加了多次,但也只会被处理一次?

元素在queue和dirty中还未放入processing:

在并发场景下,假设 goroutine A 通过Add方法将1元素到queue和dirty中;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在dirty中已经存在相同的元素,会直接返回。

元素在processing中正被处理:

在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 中并从queue和dirty中移除;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,而是仅添加到dirty中;在 goroutine A 通过 Done 方法标记1元素被处理完成并从processing删除后,检测到dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。

get时候队列空了:

当goroutine进行get时候,队列空了,则会让get的goroutine进行等待、阻塞。当队列有值的时候才会继续get。

队列关闭了:

当队列关闭了之后,get、add方法将不允许添加、获取数据。但正在处理的数据除外,可以设置一种策略等待正在处理的数据处理完成。

多线程管理

在 Go 语言中,sync.Cond 是一个用于协调并发操作的条件变量。它允许一个或多个 goroutine 等待某个条件发生变化,同时确保这些等待和通知操作是安全的。sync.Cond 通常与互斥锁 (sync.Mutex) 一起使用。

sync.Cond 的主要方法

  • Wait()
  • Signal()
  • Broadcast()

sync.Cond 的用法

Wait()

Wait方法会使调用它的 goroutine 进入等待状态,直到收到SignalBroadcast通知。调用Wait前,必须持有与条件变量关联的互斥锁。调用Wait后,会自动释放该锁,当Wait` 返回时,goroutine 会重新获得该锁。

Signal()

Signal 方法会唤醒一个正在等待的 goroutine。如果没有 goroutine 在等待,Signal` 不做任何操作。

Broadcast()

Broadcast` 方法会唤醒所有正在等待的 goroutine。

sync.Cond 的应用场景

sync.Cond` 适用于需要等待某个条件的并发场景,例如生产者-消费者模型、资源池等。

Rust实现

基础技术

堆、栈与Box

在 Rust 中,Box 是一个智能指针类型,用于在堆上分配内存并拥有该内存的所有权。Box 将值从栈上移动到堆上,并且当 Box 被丢弃时,堆上的值也会被自动清理。Box 主要用于以下情况:

  1. 在堆上分配大型数据结构​:有时候我们不希望在栈上分配过大的数据结构,这时可以使用 Box 将其分配到堆上。
  2. 递归数据结构​:由于 Rust 编译器需要在编译时知道类型的大小,递归数据结构(例如树或链表)无法在栈上存储,这时可以使用 Box
  3. 动态分发(Dynamic Dispatch)​:当需要使用运行时多态(例如 trait 对象)时,可以使用 Box<dyn Trait>
    在rust中的工作队列的实现中,Box 用于存储不同类型的工作项。这是通过将工作项封装在 Box<dyn Any + Send + Sync> 中实现的,Any 是一个允许存储任意类型的特征,SendSync 则确保这些类型在线程间传递和共享时是安全的。

高并发控制设计

Go 语言中的高并发控制

​在 Go 中,​sync.Mutexsync.Cond 是高并发控制的基础工具:

  • sync.Mutex:用于保护临界区,确保同一时间只有一个 goroutine 访问共享资源。
  • sync.Cond:条件变量,用于 goroutines 等待或通知某个条件发生变化。

**Go 语言中 **sync.Cond 的主要方法:

  • Wait()
  • Signal()
  • Broadcast()
Rust语言中的高并发控制

​在 Rust 中,​std::sync::Mutexstd::sync::Condvar 提供了和golang中类似的功能:

  • std::sync::Mutex:用于保护临界区,确保同一时间只有一个线程访问共享资源。
  • std::sync::Condvar:条件变量,用于线程等待或通知某个条件发生变化。

**Rust 中 **Condvar 的主要方法:

  • wait:和golang中的wait相同
  • notify_one
  • notify_all

代码设计

结构体

struct Queue {
    queue: Mutex<VecDeque<WorkItem>>,
    dirty: Mutex<HashSet<usize>>,
    processing: Mutex<HashSet<usize>>,
    cond: Condvar,
    shutting_down: Mutex<bool>,
}
  • queue:标准的工作队列,是workqpueue的载体
  • dirty:记录添加进去队列中,但未被处理的元素​
  • p
  • cond:控制线程同步、高并发的rust组件
  • shutting_down:用于记录是否这个队列被关闭了

Add方法设计

代码对比
func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    if q.shuttingDown {
        return
    }
    if _, exists := q.dirty[item]; exists {
        return
    }
    q.dirty[item] = struct{}{}
    if _, exists := q.processing[item]; !exists {
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
}

** **这个方法的实现如下

fn add(&self, item: WorkItem) {
        let key = Queue::get_item_key(&item);
​
        // 如果队列已经关闭,则退出
        {
            let shutting_down = self.shutting_down.lock().unwrap();
            if *shutting_down {
                println!("Queue shut down");
                return;
            }
        }
​
        let mut dirty = self.dirty.lock().unwrap();
        if !dirty.contains(&key) {
            dirty.insert(key);
            drop(dirty);
​
            let mut queue = self.queue.lock().unwrap();
            queue.push_back(item);
            drop(queue);
​
            self.cond.notify_one();
        }
    }
方法解析
获取工作项的唯一键
let key = Queue::get_item_key(&item);
  • **使用 **Queue::get_item_key(&item) 方法获取工作项的唯一键。这个键用于标识工作项并在 dirty 集合中跟踪它。
检查队列是否已经关闭
{
    let shutting_down = self.shutting_down.lock().unwrap();
    if *shutting_down {
        println!("Queue shut down");
        return;
    }
}
  • **通过获取 **shutting_down 锁来检查队列是否已经关闭。
  • **如果队列已经关闭,打印 **Queue shut down 并返回。
  • **使用一个代码块来限制锁的持有范围,确保在检查完 **shutting_down 状态后立即释放锁。
更新 dirty 集合
let mut dirty = self.dirty.lock().unwrap();
if !dirty.contains(&key) {
    dirty.insert(key);
    drop(dirty);
  • **获取 **dirty 集合的锁,检查工作项的键是否已经存在于 dirty 集合中。如果键不存在于 dirty 集合中,将键插入集合。
  • **使用 **drop(dirty) 手动释放锁,确保在插入键后立即释放锁。
更新 queue 队列并通知消费者
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);

self.cond.notify_one();
  • 获取 queue 队列的锁,将工作项添加到队列的末尾。
  • 使用 drop(queue)手动释放锁,确保在添加工作项后立即释放锁。
  • 调用 self.cond.notify_one() 通知等待的消费者有新的工作项可用。

get方法设计

代码对比
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}

	item = q.queue[0]
	// The underlying array still exists and reference this object, so the object will not be garbage collected.
	q.queue[0] = nil
	q.queue = q.queue[1:]

	q.metrics.get(item)

	q.processing.insert(item)
	q.dirty.delete(item)

	return item, false
}

rust中的实现代码,这里实现了主体功能,省略了metrics的获取。

fn get(&self) -> Option<WorkItem> {
    let mut item = None;
    let mut queue = self.queue.lock().unwrap();
    while queue.is_empty() {
        // 使用一个块来释放 shutting_down 的锁
        {
            let shutting_down = self.shutting_down.lock().unwrap();
            if *shutting_down {
                return None;
            }
        }

        queue = self.cond.wait(queue).unwrap();
    }
    if !queue.is_empty() {
        item = queue.pop_front();
    }

    if let Some(ref itm) = item {
        let key = Queue::get_item_key(itm);
        self.processing.lock().unwrap().insert(key);
        self.dirty.lock().unwrap().remove(&key);
    }

    item
}
方法解析
定义返回值和获取队列锁
let mut item = None;
let mut queue = self.queue.lock().unwrap();
  • item:定义一个变量作为最终的返回值,初始化为 None
  • 获取队列锁​:获取 queue 的互斥锁,如果其他线程正在访问 queue,当前线程将会阻塞,直到锁可用。此时 queue 被锁定,确保对 queue 的操作是线程安全的。
等待非空队列或检查关闭状态
while queue.is_empty() {
    // 使用一个块来释放 shutting_down 的锁
    {
        let shutting_down = self.shutting_down.lock().unwrap();
        if *shutting_down {
            return None;
        }
    }

    queue = self.cond.wait(queue).unwrap();
}
  • 检查队列是否为空​:如果 queue 为空,则进入 while 循环。
  • 检查关闭状态​:使用一个代码块来获取 shutting_down 的互斥锁并检查其状态。如果 shutting_downtrue,说明队列正在关闭,返回 None 退出方法。
  • 等待条件变量​:调用 self.cond.wait(queue).unwrap(),使当前线程进入等待状态,直到条件变量收到通知。此时会释放 queue 的锁,允许其他线程操作 queue。当被唤醒后,线程会重新获取 queue 的锁。这里他会一直等待队列不为空,即有生产者入队消息。
从队列中获取工作项
if !queue.is_empty() {
    item = queue.pop_front();
}
  • 获取工作项​:如果 queue 不为空,从队列前端弹出一个工作项并赋值给 item
更新 processingdirty 集合
if let Some(ref itm) = item {
    let key = Queue::get_item_key(itm);
    self.processing.lock().unwrap().insert(key);
    self.dirty.lock().unwrap().remove(&key);
}
  • 检查 item 是否存在​:如果 item 存在,获取其引用 itm
  • 获取工作项的键​:使用 Queue::get_item_key(itm) 获取工作项的唯一键。
  • 更新 processing 集合​:获取 processing 的互斥锁,并将键插入 processing 集合中,表示该工作项正在处理。
  • 更新 dirty 集合​:获取 dirty 的互斥锁,并将键从 dirty 集合中移除,表示该工作项不再需要重新处理,因为他已经get到了。
返回工作项
item
  • 返回值​:返回 item,这是一个可选类型,表示要处理的工作项。如果在整个过程中没有获取到工作项,则返回 None

done方法

代码对比
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

	q.processing.delete(item)
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	} else if q.processing.len() == 0 {
		q.cond.Signal()
	}
}

rust中的实现如下:

fn done(&self, item: WorkItem) {
    let key = Queue::get_item_key(&item);
    self.processing.lock().unwrap().remove(&key);

    let mut dirty = self.dirty.lock().unwrap();
    if dirty.contains(&key) {
        drop(dirty);
        let mut queue = self.queue.lock().unwrap();
        queue.push_back(item);
        drop(queue);

        self.cond.notify_one();
    } else if self.processing.lock().unwrap().is_empty() {
        self.cond.notify_all();
    }
}
方法解析
获取工作项的唯一键并从 processing 集合中移除
let key = Queue::get_item_key(&item);
self.processing.lock().unwrap().remove(&key);
  • 获取工作项的唯一键​:调用 Queue::get_item_key(&item) 方法获取工作项的唯一键。这个键用于标识工作项。
  • processing 集合中移除​:获取 processing 的互斥锁,并从 processing 集合中移除该键,表示该工作项的处理已经完成。
检查并更新 dirty 集合和 queue 队列
let mut dirty = self.dirty.lock().unwrap();
if dirty.contains(&key) {
    drop(dirty);
    let mut queue = self.queue.lock().unwrap();
    queue.push_back(item);
    drop(queue);

    self.cond.notify_one();
}
  • 获取 dirty 集合的锁​:获取 dirty 的互斥锁,确保对 dirty 集合的操作是线程安全的。
  • 检查 dirty 集合​:如果 dirty 集合包含该键,表示该工作项在处理过程中再次被标记为需要处理。
  • 更新 queue 队列:将工作项重新添加到 queue 队列中,以便后续处理。具体步骤如下:
    • 释放 dirty​:使用 drop(dirty) 释放 dirty 的锁。
    • 获取 queue​:获取 queue 的互斥锁,并将工作项添加到队列末尾。
    • 释放 queue​:使用 drop(queue) 释放 queue 的锁。
    • 通知等待的线程​:调用 self.cond.notify_one(),通知等待在条件变量上的消费者线程有新的工作项可用。
检查 processing 集合并通知所有线程
else if self.processing.lock().unwrap().is_empty() {
    self.cond.notify_all();
}
  • 获取 processing​:获取 processing 的互斥锁。
  • 检查 processing 集合​:如果 processing 集合为空,表示当前没有正在处理的工作项。
  • 通知所有线程​:调用 self.cond.notify_all(),通知所有等待在条件变量上的线程。这通常用于在队列即将关闭时确保所有工作项都已处理完毕。
0条评论
0 / 1000
l****n
17文章数
0粉丝数
l****n
17 文章 | 0 粉丝
原创

k8S的工作队列代码解析和rust的实现(基础工作队列)

2024-09-04 09:42:29
8
0

k8S的工作队列代码解析和rust的实现(基础工作队列)

基础知识

在 Kubernetes(K8S)中,workqueue 是一个用于处理异步任务的工具,它提供了一种安全、高效的方式来管理并处理工作项。下面我们详细解读这段代码的工作原理。

主要结构和接口

Interface

type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShutDownWithDrain()
    ShuttingDown() bool
}

这个接口定义了一个工作队列应具有的基本功能:

  • Add(item interface{}):添加一个新的工作项。
  • Len() int:返回队列中的当前工作项数。
  • Get() (item interface{}, shutdown bool):获取一个要处理的工作项。
  • Done(item interface{}):标记一个工作项为已完成。
  • ShutDown():关闭队列,不再接受新工作项。
  • ShutDownWithDrain():在所有工作项处理完毕后关闭队列。
  • ShuttingDown() bool:检查队列是否正在关闭。

FIFO队列存储过程

正常添加流程

通过 Add 方法往 FIFO 队列中分别插入 1、2、3 三个元素,此时队列中的 queue 和 dirty 分别存有 1、2、3 元素,processing为空;然后通过 Get 方法获取最先进入的1元素,此时1 元素被放入 processing中,queue 和 dirty 剩余有 2、3 元素,表示1元素正在被处理;最后,当我们处理完 1 元素时通过 Done 方法标记该元素已经被处理完成,此时将1元素从 processing中移除。

高并发下如何保证一个元素哪怕其被添加了多次,但也只会被处理一次?

元素在queue和dirty中还未放入processing:

在并发场景下,假设 goroutine A 通过Add方法将1元素到queue和dirty中;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在dirty中已经存在相同的元素,会直接返回。

元素在processing中正被处理:

在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 中并从queue和dirty中移除;同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,而是仅添加到dirty中;在 goroutine A 通过 Done 方法标记1元素被处理完成并从processing删除后,检测到dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。

get时候队列空了:

当goroutine进行get时候,队列空了,则会让get的goroutine进行等待、阻塞。当队列有值的时候才会继续get。

队列关闭了:

当队列关闭了之后,get、add方法将不允许添加、获取数据。但正在处理的数据除外,可以设置一种策略等待正在处理的数据处理完成。

多线程管理

在 Go 语言中,sync.Cond 是一个用于协调并发操作的条件变量。它允许一个或多个 goroutine 等待某个条件发生变化,同时确保这些等待和通知操作是安全的。sync.Cond 通常与互斥锁 (sync.Mutex) 一起使用。

sync.Cond 的主要方法

  • Wait()
  • Signal()
  • Broadcast()

sync.Cond 的用法

Wait()

Wait方法会使调用它的 goroutine 进入等待状态,直到收到SignalBroadcast通知。调用Wait前,必须持有与条件变量关联的互斥锁。调用Wait后,会自动释放该锁,当Wait` 返回时,goroutine 会重新获得该锁。

Signal()

Signal 方法会唤醒一个正在等待的 goroutine。如果没有 goroutine 在等待,Signal` 不做任何操作。

Broadcast()

Broadcast` 方法会唤醒所有正在等待的 goroutine。

sync.Cond 的应用场景

sync.Cond` 适用于需要等待某个条件的并发场景,例如生产者-消费者模型、资源池等。

Rust实现

基础技术

堆、栈与Box

在 Rust 中,Box 是一个智能指针类型,用于在堆上分配内存并拥有该内存的所有权。Box 将值从栈上移动到堆上,并且当 Box 被丢弃时,堆上的值也会被自动清理。Box 主要用于以下情况:

  1. 在堆上分配大型数据结构​:有时候我们不希望在栈上分配过大的数据结构,这时可以使用 Box 将其分配到堆上。
  2. 递归数据结构​:由于 Rust 编译器需要在编译时知道类型的大小,递归数据结构(例如树或链表)无法在栈上存储,这时可以使用 Box
  3. 动态分发(Dynamic Dispatch)​:当需要使用运行时多态(例如 trait 对象)时,可以使用 Box<dyn Trait>
    在rust中的工作队列的实现中,Box 用于存储不同类型的工作项。这是通过将工作项封装在 Box<dyn Any + Send + Sync> 中实现的,Any 是一个允许存储任意类型的特征,SendSync 则确保这些类型在线程间传递和共享时是安全的。

高并发控制设计

Go 语言中的高并发控制

​在 Go 中,​sync.Mutexsync.Cond 是高并发控制的基础工具:

  • sync.Mutex:用于保护临界区,确保同一时间只有一个 goroutine 访问共享资源。
  • sync.Cond:条件变量,用于 goroutines 等待或通知某个条件发生变化。

**Go 语言中 **sync.Cond 的主要方法:

  • Wait()
  • Signal()
  • Broadcast()
Rust语言中的高并发控制

​在 Rust 中,​std::sync::Mutexstd::sync::Condvar 提供了和golang中类似的功能:

  • std::sync::Mutex:用于保护临界区,确保同一时间只有一个线程访问共享资源。
  • std::sync::Condvar:条件变量,用于线程等待或通知某个条件发生变化。

**Rust 中 **Condvar 的主要方法:

  • wait:和golang中的wait相同
  • notify_one
  • notify_all

代码设计

结构体

struct Queue {
    queue: Mutex<VecDeque<WorkItem>>,
    dirty: Mutex<HashSet<usize>>,
    processing: Mutex<HashSet<usize>>,
    cond: Condvar,
    shutting_down: Mutex<bool>,
}
  • queue:标准的工作队列,是workqpueue的载体
  • dirty:记录添加进去队列中,但未被处理的元素​
  • p
  • cond:控制线程同步、高并发的rust组件
  • shutting_down:用于记录是否这个队列被关闭了

Add方法设计

代码对比
func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    if q.shuttingDown {
        return
    }
    if _, exists := q.dirty[item]; exists {
        return
    }
    q.dirty[item] = struct{}{}
    if _, exists := q.processing[item]; !exists {
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
}

** **这个方法的实现如下

fn add(&self, item: WorkItem) {
        let key = Queue::get_item_key(&item);
​
        // 如果队列已经关闭,则退出
        {
            let shutting_down = self.shutting_down.lock().unwrap();
            if *shutting_down {
                println!("Queue shut down");
                return;
            }
        }
​
        let mut dirty = self.dirty.lock().unwrap();
        if !dirty.contains(&key) {
            dirty.insert(key);
            drop(dirty);
​
            let mut queue = self.queue.lock().unwrap();
            queue.push_back(item);
            drop(queue);
​
            self.cond.notify_one();
        }
    }
方法解析
获取工作项的唯一键
let key = Queue::get_item_key(&item);
  • **使用 **Queue::get_item_key(&item) 方法获取工作项的唯一键。这个键用于标识工作项并在 dirty 集合中跟踪它。
检查队列是否已经关闭
{
    let shutting_down = self.shutting_down.lock().unwrap();
    if *shutting_down {
        println!("Queue shut down");
        return;
    }
}
  • **通过获取 **shutting_down 锁来检查队列是否已经关闭。
  • **如果队列已经关闭,打印 **Queue shut down 并返回。
  • **使用一个代码块来限制锁的持有范围,确保在检查完 **shutting_down 状态后立即释放锁。
更新 dirty 集合
let mut dirty = self.dirty.lock().unwrap();
if !dirty.contains(&key) {
    dirty.insert(key);
    drop(dirty);
  • **获取 **dirty 集合的锁,检查工作项的键是否已经存在于 dirty 集合中。如果键不存在于 dirty 集合中,将键插入集合。
  • **使用 **drop(dirty) 手动释放锁,确保在插入键后立即释放锁。
更新 queue 队列并通知消费者
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);

self.cond.notify_one();
  • 获取 queue 队列的锁,将工作项添加到队列的末尾。
  • 使用 drop(queue)手动释放锁,确保在添加工作项后立即释放锁。
  • 调用 self.cond.notify_one() 通知等待的消费者有新的工作项可用。

get方法设计

代码对比
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}

	item = q.queue[0]
	// The underlying array still exists and reference this object, so the object will not be garbage collected.
	q.queue[0] = nil
	q.queue = q.queue[1:]

	q.metrics.get(item)

	q.processing.insert(item)
	q.dirty.delete(item)

	return item, false
}

rust中的实现代码,这里实现了主体功能,省略了metrics的获取。

fn get(&self) -> Option<WorkItem> {
    let mut item = None;
    let mut queue = self.queue.lock().unwrap();
    while queue.is_empty() {
        // 使用一个块来释放 shutting_down 的锁
        {
            let shutting_down = self.shutting_down.lock().unwrap();
            if *shutting_down {
                return None;
            }
        }

        queue = self.cond.wait(queue).unwrap();
    }
    if !queue.is_empty() {
        item = queue.pop_front();
    }

    if let Some(ref itm) = item {
        let key = Queue::get_item_key(itm);
        self.processing.lock().unwrap().insert(key);
        self.dirty.lock().unwrap().remove(&key);
    }

    item
}
方法解析
定义返回值和获取队列锁
let mut item = None;
let mut queue = self.queue.lock().unwrap();
  • item:定义一个变量作为最终的返回值,初始化为 None
  • 获取队列锁​:获取 queue 的互斥锁,如果其他线程正在访问 queue,当前线程将会阻塞,直到锁可用。此时 queue 被锁定,确保对 queue 的操作是线程安全的。
等待非空队列或检查关闭状态
while queue.is_empty() {
    // 使用一个块来释放 shutting_down 的锁
    {
        let shutting_down = self.shutting_down.lock().unwrap();
        if *shutting_down {
            return None;
        }
    }

    queue = self.cond.wait(queue).unwrap();
}
  • 检查队列是否为空​:如果 queue 为空,则进入 while 循环。
  • 检查关闭状态​:使用一个代码块来获取 shutting_down 的互斥锁并检查其状态。如果 shutting_downtrue,说明队列正在关闭,返回 None 退出方法。
  • 等待条件变量​:调用 self.cond.wait(queue).unwrap(),使当前线程进入等待状态,直到条件变量收到通知。此时会释放 queue 的锁,允许其他线程操作 queue。当被唤醒后,线程会重新获取 queue 的锁。这里他会一直等待队列不为空,即有生产者入队消息。
从队列中获取工作项
if !queue.is_empty() {
    item = queue.pop_front();
}
  • 获取工作项​:如果 queue 不为空,从队列前端弹出一个工作项并赋值给 item
更新 processingdirty 集合
if let Some(ref itm) = item {
    let key = Queue::get_item_key(itm);
    self.processing.lock().unwrap().insert(key);
    self.dirty.lock().unwrap().remove(&key);
}
  • 检查 item 是否存在​:如果 item 存在,获取其引用 itm
  • 获取工作项的键​:使用 Queue::get_item_key(itm) 获取工作项的唯一键。
  • 更新 processing 集合​:获取 processing 的互斥锁,并将键插入 processing 集合中,表示该工作项正在处理。
  • 更新 dirty 集合​:获取 dirty 的互斥锁,并将键从 dirty 集合中移除,表示该工作项不再需要重新处理,因为他已经get到了。
返回工作项
item
  • 返回值​:返回 item,这是一个可选类型,表示要处理的工作项。如果在整个过程中没有获取到工作项,则返回 None

done方法

代码对比
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

	q.processing.delete(item)
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	} else if q.processing.len() == 0 {
		q.cond.Signal()
	}
}

rust中的实现如下:

fn done(&self, item: WorkItem) {
    let key = Queue::get_item_key(&item);
    self.processing.lock().unwrap().remove(&key);

    let mut dirty = self.dirty.lock().unwrap();
    if dirty.contains(&key) {
        drop(dirty);
        let mut queue = self.queue.lock().unwrap();
        queue.push_back(item);
        drop(queue);

        self.cond.notify_one();
    } else if self.processing.lock().unwrap().is_empty() {
        self.cond.notify_all();
    }
}
方法解析
获取工作项的唯一键并从 processing 集合中移除
let key = Queue::get_item_key(&item);
self.processing.lock().unwrap().remove(&key);
  • 获取工作项的唯一键​:调用 Queue::get_item_key(&item) 方法获取工作项的唯一键。这个键用于标识工作项。
  • processing 集合中移除​:获取 processing 的互斥锁,并从 processing 集合中移除该键,表示该工作项的处理已经完成。
检查并更新 dirty 集合和 queue 队列
let mut dirty = self.dirty.lock().unwrap();
if dirty.contains(&key) {
    drop(dirty);
    let mut queue = self.queue.lock().unwrap();
    queue.push_back(item);
    drop(queue);

    self.cond.notify_one();
}
  • 获取 dirty 集合的锁​:获取 dirty 的互斥锁,确保对 dirty 集合的操作是线程安全的。
  • 检查 dirty 集合​:如果 dirty 集合包含该键,表示该工作项在处理过程中再次被标记为需要处理。
  • 更新 queue 队列:将工作项重新添加到 queue 队列中,以便后续处理。具体步骤如下:
    • 释放 dirty​:使用 drop(dirty) 释放 dirty 的锁。
    • 获取 queue​:获取 queue 的互斥锁,并将工作项添加到队列末尾。
    • 释放 queue​:使用 drop(queue) 释放 queue 的锁。
    • 通知等待的线程​:调用 self.cond.notify_one(),通知等待在条件变量上的消费者线程有新的工作项可用。
检查 processing 集合并通知所有线程
else if self.processing.lock().unwrap().is_empty() {
    self.cond.notify_all();
}
  • 获取 processing​:获取 processing 的互斥锁。
  • 检查 processing 集合​:如果 processing 集合为空,表示当前没有正在处理的工作项。
  • 通知所有线程​:调用 self.cond.notify_all(),通知所有等待在条件变量上的线程。这通常用于在队列即将关闭时确保所有工作项都已处理完毕。
文章来自个人专栏
rust与golang等并发编程
10 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0