1、主要结构
1.1、rte_ring
struct rte_ring {
char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
/* 标记,用来描述队列是单/多生产者还是单/多消费者安全 */
int flags;
/* 所属的memzone,memzone是dpdk内存管理底层的数据结构 */
const struct rte_memzone *memzone;
/* 队列长度,初始化时必须为 2^N幂,如果创建时 flag=RING_F_EXACT_SZ,size 向上取 2^N,否则 size必须是 2^N */
uint32_t size;
/* 掩码,为队列长 - 1,用来计算位置的时候取余用 */
uint32_t mask;
/* 队列容量 */
uint32_t capacity;
char pad0 __rte_cache_aligned; /**< empty cache line */
/* 包含 prod.head 和 prod.tail,
* prod.head 代表着下一次生产时的起始生产位置,
* prod.tail 代表着消费者可以消费的位置界限,到达 prod.tail 就无法消费,
* 通常生成完成后, prod.tail=prod.head,意味着刚生产的数据皆可被消费。
*/
struct rte_ring_headtail prod __rte_cache_aligned;
char pad1 __rte_cache_aligned; /**< empty cache line */
/* 包含 cons.head 和 cons.tail,
* cons.head 代表着下一次消费时的起始消费位置,
* cons.tail 代表着生产者可以生产位置的界限,到达 cons.tail 后就无法继续生产,
* 通常情况消费完成后,cons.tail=cons.head,表示刚消费的位置 皆可用于生产。
*/
struct rte_ring_headtail cons __rte_cache_aligned;
char pad2 __rte_cache_aligned; /**< empty cache line */
};
2、实现分析
2.1、入队
简称 入队三步骤
,主要操作包含:
-
移动
head
以预先占用空间, -
填充数据,
-
移动
tail
完成 入队。
/*
* @n :一次填充 n 个数据,bulk 或 burst 场景可能一次性填充多个。
* @behavior : RTE_RING_QUEUE_FIXED 表示 如果剩余可用空间 m < n,则返回失败
* : RTE_RING_QUEUE_VARIABLE 表示 如果剩余可用空间 m < n,则仅入队 m 个
* @is_sp :是否为单消费者
* @free_space:在入队之前 的 m 大小
* 返回给上层使用,用来判断是否达到 高水平线,到达后执行一个 pause 操作,参考 receive_stage 中 send_pause_frame。
*/
static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned int n, enum rte_ring_queue_behavior behavior,
unsigned int is_sp, unsigned int *free_space)
{
uint32_t prod_head, prod_next;
uint32_t free_entries;
/* @prod_head :最初的 r.prod.head,针对多生产者 更新尾部时需要做比对用
* @prod_next :新的 r.prod.head,即当前生产者 占坑后 head位置
*/
n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
&prod_head, &prod_next, &free_entries);
if (n == 0)
goto end;
/* 填充数据,这里有几个有意思的地方:
* r[1] 表示 偏移掉 rte_ring 头部,即 rte_ring 存储数据开始的位置
* head 为 原始 r.prod.head,而 r.prod.head 代表下一个用于 生产的空间,所以这里正好用以本次存储,新的 r.prod.head 已经更新了
* 其中 环 的 实现参考 内核的 kfifo,见后面分析。
*/
ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
end:
if (free_space != NULL)
*free_space = free_entries - n;
return n;
}
2.1.1、移动 head
这里是 普通版本,c11 版本会获取变量时 采用 atomic 函数。
static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
unsigned int n, enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head,
uint32_t *free_entries)
{
const uint32_t capacity = r->capacity;
unsigned int max = n;
int success;
do {
n = max;
*old_head = r->prod.head; // 记录原始 head
// 内存屏障,目前还不是很理解,只知道 用来避免优化编译代码乱序问题,即优先保证 先获取 old_head,后面理解了再写一篇
rte_smp_rmb();
// 暂时理解相当于 capacity - (*old_head - r->cons.tail)
*free_entries = (capacity + r->cons.tail - *old_head);
if (unlikely(n > *free_entries))
n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *free_entries;
if (n == 0)
return 0;
*new_head = *old_head + n;
if (is_sp)
r->prod.head = *new_head, success = 1;
else
/* 比对 r->prod.head 和 old_head 指向的值是否相等,
* 相等 则 r->prod.head = *new_head,并返回 1,
* 不相等 则 r->prod.head 保持不变,并返回 0,
*
* 注意:这里 r->prod.head 做入参时类型为 volatile 类型,每次强制从内存读取。
*/
success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head);
} while (unlikely(success == 0));
return n;
}
经过 head 移动结果如下图
2.1.2、多生产者竞争
当出现 多生产者 竞争时 会出现什么情况呢?
这里假设 prod1
先执行 new_head
偏移 并 先更新 r.prod.head
,
-
prod1 和 prod2 已经获取到
old_head
,prod1 和 prod2 更新 new_head1 和 new_head2,而-
prod1 先 通过
CAS
判断并修改r.prod.head
的值, -
此时 prod2 经CAS 校验必然不通过,success 值为1,循环继续(这也是循环体内(n=max赋值的意义所在,n在循环体内会被更改)。
-
-
prod2 经历第二次循环时,先获取 prod1 修改后的
r.prod.head
,并赋值给 old_head,在经过 CAS 经验时就可以通过了(假设没有更多生产者竞争)。
经过如上两步,出现结果:
-
prod1 和 prod2 均获取到 各自的 old_head1 和 old_head2(两个值不同),后面更新 tail 时会用到。
-
同时
r.prod.head
被偏移了两次,到达最终位置。
如果这里 prod2 在 prod1 已经完成 head 占坑 才进入 该入队函数,也就不存在更新 r.prod.head 时的竞争了。。。所以上面没说这种情况。
2.1.3、移动 tail
/* 这里要加注释说明一下 参数
* @old_val:初始的 r.prod.head 或 r.cons.head
* @new_val:更新后的 r.prod.head 或 r.cons.head
*/
static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue)
{
if (enqueue)
rte_smp_wmb();
else
rte_smp_rmb();
if (!single)
while (unlikely(ht->tail != old_val))
rte_pause(); // 就是等待呗。。
ht->tail = new_val;
}
看这里比较简单,主要是 while 条件判断,针对 多生产者 模式,还是假设 prod1 先于 prod2 更新完成 r.prod.head,
-
此时
__rte_ring_move_prod_head
针对 prod1 和 prod2 有两个出参old_head1
和old_head2
。 -
old_head1 是满足 while 条件的,因为 在
入队的三步骤
开始之前r.prod.head
是等于r.prod.tail
的,然后其更新r.prod.tail = new_head
。 -
在 prod1 更新完 tail 后,prod2 的 while 条件也就满足了,也更新 tail 指针。
最终结果:r.prod.tail
和 r.prod.head
一致,且都指向 下一个用于存储数据的 空间。
还有一点需要注意: 在 调整 prod.tail 之前,消费者最多 消费到 prod.tail 的前一个 对象。
2.1.4、填充数据
几个有意思的地方:
-
因为
prod_head
或cons_head
是一直递增的 u32 的最大值,所以使用idx = prod_head & (r)->mask
执行 取余操作。 -
n & ((~(unsigned)0x3)))
相当于n / 4
。 -
不满足
idx + n < size
时,即到达队列尾部,需要分别存储到队列尾部
和队列头部
。 -
新版本 通过改变 obj_table 类型 减少copy次数 以优化性能,如:
const rte_int128_t *obj = (const rte_int128_t *)obj_table
。
#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
unsigned int i; \
const uint32_t size = (r)->size; \
uint32_t idx = prod_head & (r)->mask; \
obj_type *ring = (obj_type *)ring_start; \
if (likely(idx + n < size)) { \
for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
ring[idx] = obj_table[i]; \
ring[idx+1] = obj_table[i+1]; \
ring[idx+2] = obj_table[i+2]; \
ring[idx+3] = obj_table[i+3]; \
} \
switch (n & 0x3) { \
case 3: \
ring[idx++] = obj_table[i++]; /* fallthrough */ \
case 2: \
ring[idx++] = obj_table[i++]; /* fallthrough */ \
case 1: \
ring[idx++] = obj_table[i++]; \
} \
} else { \ // 看这里,出现反转
for (i = 0; idx < size; i++, idx++)\ // copy 到 ring 的后半段
ring[idx] = obj_table[i]; \
for (idx = 0; i < n; i++, idx++) \ // copy 到 ring 的前半段
ring[idx] = obj_table[i]; \
} \
} while (0)
出队和入队类似,这里不再赘述。
3、相关链接
https://blog.csdn.net/linyt/article/details/53355355