searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

rte_ring结构分析

2023-05-26 03:02:29
95
0

1、主要结构

1.1、rte_ring

struct rte_ring {
	char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
	
    /* 标记,用来描述队列是单/多生产者还是单/多消费者安全 */
    int flags;
    
    /* 所属的memzone,memzone是dpdk内存管理底层的数据结构 */
	const struct rte_memzone *memzone;
    
    /* 队列长度,初始化时必须为 2^N幂,如果创建时 flag=RING_F_EXACT_SZ,size 向上取 2^N,否则 size必须是 2^N */
	uint32_t size;
    
    /*  掩码,为队列长 - 1,用来计算位置的时候取余用 */
	uint32_t mask;
    
    /* 队列容量 */
	uint32_t capacity;
	char pad0 __rte_cache_aligned; /**< empty cache line */

	/* 包含 prod.head 和 prod.tail,
	 * prod.head 代表着下一次生产时的起始生产位置,
	 * prod.tail 代表着消费者可以消费的位置界限,到达 prod.tail 就无法消费,
	 * 通常生成完成后, prod.tail=prod.head,意味着刚生产的数据皆可被消费。
	*/
	struct rte_ring_headtail prod __rte_cache_aligned;
	char pad1 __rte_cache_aligned; /**< empty cache line */

	/* 包含 cons.head 和 cons.tail,
	 * cons.head 代表着下一次消费时的起始消费位置,
	 * cons.tail 代表着生产者可以生产位置的界限,到达 cons.tail 后就无法继续生产,
	 * 通常情况消费完成后,cons.tail=cons.head,表示刚消费的位置 皆可用于生产。
	*/
	struct rte_ring_headtail cons __rte_cache_aligned;
	char pad2 __rte_cache_aligned; /**< empty cache line */
};

2、实现分析

2.1、入队

简称 入队三步骤,主要操作包含:

  • 移动 head 以预先占用空间,

  • 填充数据,

  • 移动 tail 完成 入队。

/* 
 * @n        :一次填充 n 个数据,bulk 或 burst 场景可能一次性填充多个。
 * @behavior : RTE_RING_QUEUE_FIXED    表示 如果剩余可用空间 m < n,则返回失败
 *           : RTE_RING_QUEUE_VARIABLE 表示 如果剩余可用空间 m < n,则仅入队 m 个
 * @is_sp    :是否为单消费者
 * @free_space:在入队之前 的 m 大小
 *              返回给上层使用,用来判断是否达到 高水平线,到达后执行一个 pause 操作,参考 receive_stage 中 send_pause_frame。
*/
static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
		 unsigned int n, enum rte_ring_queue_behavior behavior,
		 unsigned int is_sp, unsigned int *free_space)
{
	uint32_t prod_head, prod_next;
	uint32_t free_entries;

    /* @prod_head :最初的 r.prod.head,针对多生产者 更新尾部时需要做比对用
     * @prod_next :新的   r.prod.head,即当前生产者 占坑后 head位置
    */
	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
			&prod_head, &prod_next, &free_entries);
	if (n == 0)
		goto end;

    /* 填充数据,这里有几个有意思的地方:
     * r[1] 表示 偏移掉 rte_ring 头部,即 rte_ring 存储数据开始的位置
     * head 为 原始 r.prod.head,而 r.prod.head 代表下一个用于 生产的空间,所以这里正好用以本次存储,新的 r.prod.head 已经更新了
     * 其中 环 的 实现参考 内核的 kfifo,见后面分析。
    */
	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);

	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
end:
	if (free_space != NULL)
		*free_space = free_entries - n;
	return n;
}

2.1.1、移动 head

这里是 普通版本,c11 版本会获取变量时 采用 atomic 函数。

static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
		unsigned int n, enum rte_ring_queue_behavior behavior,
		uint32_t *old_head, uint32_t *new_head,
		uint32_t *free_entries)
{
	const uint32_t capacity = r->capacity;
	unsigned int max = n;
	int success;

	do {
		n = max;
		*old_head = r->prod.head; // 记录原始 head

		// 内存屏障,目前还不是很理解,只知道 用来避免优化编译代码乱序问题,即优先保证 先获取 old_head,后面理解了再写一篇
		rte_smp_rmb(); 

		// 暂时理解相当于 capacity - (*old_head - r->cons.tail)
		*free_entries = (capacity + r->cons.tail - *old_head);

		if (unlikely(n > *free_entries))
			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *free_entries;

		if (n == 0)
			return 0;

		*new_head = *old_head + n;
		if (is_sp)
			r->prod.head = *new_head, success = 1;
		else
            /* 比对 r->prod.head 和 old_head 指向的值是否相等,
             * 相等   则 r->prod.head = *new_head,并返回 1,
             * 不相等 则 r->prod.head 保持不变,并返回 0,
             *
             * 注意:这里 r->prod.head 做入参时类型为 volatile 类型,每次强制从内存读取。
            */
			success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head);
	} while (unlikely(success == 0));
	return n;
}

经过 head 移动结果如下图

2.1.2、多生产者竞

当出现 多生产者 竞争时 会出现什么情况呢?

这里假设 prod1 先执行 new_head 偏移 并 先更新 r.prod.head

  • prod1 和 prod2 已经获取到 old_head,prod1 和 prod2 更新 new_head1 和 new_head2,而

    • prod1 先 通过 CAS 判断并修改 r.prod.head 的值

    • 此时 prod2 经CAS 校验必然不通过,success 值为1,循环继续(这也是循环体内(n=max赋值的意义所在,n在循环体内会被更改)。

  • prod2 经历第二次循环时,先获取 prod1 修改后的 r.prod.head,并赋值给 old_head,在经过 CAS 经验时就可以通过了(假设没有更多生产者竞争)。

经过如上两步,出现结果:

  •  prod1 和 prod2 均获取到 各自的 old_head1 和 old_head2(两个值不同),后面更新 tail 时会用到。

  • 同时 r.prod.head 被偏移了两次,到达最终位置。

如果这里 prod2 在 prod1 已经完成 head 占坑 才进入 该入队函数,也就不存在更新 r.prod.head 时的竞争了。。。所以上面没说这种情况。

2.1.3、移动 tail

/* 这里要加注释说明一下 参数
 * @old_val:初始的   r.prod.head 或 r.cons.head
 * @new_val:更新后的 r.prod.head 或 r.cons.head
*/
static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue)
{
	if (enqueue)
		rte_smp_wmb();
	else
		rte_smp_rmb();
	
    
	if (!single)
		while (unlikely(ht->tail != old_val))
			rte_pause(); // 就是等待呗。。

	ht->tail = new_val;
}

看这里比较简单,主要是 while 条件判断,针对 多生产者 模式,还是假设 prod1 先于 prod2 更新完成 r.prod.head,

  • 此时 __rte_ring_move_prod_head 针对 prod1 和 prod2 有两个出参 old_head1old_head2

  • old_head1 是满足 while 条件的,因为 在 入队的三步骤 开始之前 r.prod.head 是等于 r.prod.tail 的,然后其更新 r.prod.tail = new_head

  • 在 prod1 更新完 tail 后,prod2 的 while 条件也就满足了,也更新 tail 指针。

最终结果:r.prod.tailr.prod.head 一致,且都指向 下一个用于存储数据的 空间

 

还有一点需要注意: 在 调整 prod.tail 之前,消费者最多 消费到 prod.tail 的前一个 对象。

2.1.4、填充数据

几个有意思的地方:

  • 因为 prod_headcons_head 是一直递增的 u32 的最大值,所以使用 idx = prod_head & (r)->mask 执行 取余操作。

  • n & ((~(unsigned)0x3))) 相当于 n / 4

  • 不满足 idx + n < size 时,即到达队列尾部,需要分别存储到 队列尾部队列头部

  • 新版本 通过改变 obj_table 类型 减少copy次数 以优化性能,如:const rte_int128_t *obj = (const rte_int128_t *)obj_table

#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
	unsigned int i; \
	const uint32_t size = (r)->size; \
	uint32_t idx = prod_head & (r)->mask; \
	obj_type *ring = (obj_type *)ring_start; \
	if (likely(idx + n < size)) { \
		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
			ring[idx] = obj_table[i]; \
			ring[idx+1] = obj_table[i+1]; \
			ring[idx+2] = obj_table[i+2]; \
			ring[idx+3] = obj_table[i+3]; \
		} \
		switch (n & 0x3) { \
		case 3: \
			ring[idx++] = obj_table[i++]; /* fallthrough */ \
		case 2: \
			ring[idx++] = obj_table[i++]; /* fallthrough */ \
		case 1: \
			ring[idx++] = obj_table[i++]; \
		} \
	} else { \ // 看这里,出现反转
		for (i = 0; idx < size; i++, idx++)\ // copy 到 ring 的后半段
			ring[idx] = obj_table[i]; \
		for (idx = 0; i < n; i++, idx++) \   // copy 到 ring 的前半段
			ring[idx] = obj_table[i]; \
	} \
} while (0)

出队和入队类似,这里不再赘述。

3、相关链接

https://blog.csdn.net/linyt/article/details/53355355

 

0条评论
0 / 1000
saikor
1文章数
1粉丝数
saikor
1 文章 | 1 粉丝
saikor
1文章数
1粉丝数
saikor
1 文章 | 1 粉丝
原创

rte_ring结构分析

2023-05-26 03:02:29
95
0

1、主要结构

1.1、rte_ring

struct rte_ring {
	char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
	
    /* 标记,用来描述队列是单/多生产者还是单/多消费者安全 */
    int flags;
    
    /* 所属的memzone,memzone是dpdk内存管理底层的数据结构 */
	const struct rte_memzone *memzone;
    
    /* 队列长度,初始化时必须为 2^N幂,如果创建时 flag=RING_F_EXACT_SZ,size 向上取 2^N,否则 size必须是 2^N */
	uint32_t size;
    
    /*  掩码,为队列长 - 1,用来计算位置的时候取余用 */
	uint32_t mask;
    
    /* 队列容量 */
	uint32_t capacity;
	char pad0 __rte_cache_aligned; /**< empty cache line */

	/* 包含 prod.head 和 prod.tail,
	 * prod.head 代表着下一次生产时的起始生产位置,
	 * prod.tail 代表着消费者可以消费的位置界限,到达 prod.tail 就无法消费,
	 * 通常生成完成后, prod.tail=prod.head,意味着刚生产的数据皆可被消费。
	*/
	struct rte_ring_headtail prod __rte_cache_aligned;
	char pad1 __rte_cache_aligned; /**< empty cache line */

	/* 包含 cons.head 和 cons.tail,
	 * cons.head 代表着下一次消费时的起始消费位置,
	 * cons.tail 代表着生产者可以生产位置的界限,到达 cons.tail 后就无法继续生产,
	 * 通常情况消费完成后,cons.tail=cons.head,表示刚消费的位置 皆可用于生产。
	*/
	struct rte_ring_headtail cons __rte_cache_aligned;
	char pad2 __rte_cache_aligned; /**< empty cache line */
};

2、实现分析

2.1、入队

简称 入队三步骤,主要操作包含:

  • 移动 head 以预先占用空间,

  • 填充数据,

  • 移动 tail 完成 入队。

/* 
 * @n        :一次填充 n 个数据,bulk 或 burst 场景可能一次性填充多个。
 * @behavior : RTE_RING_QUEUE_FIXED    表示 如果剩余可用空间 m < n,则返回失败
 *           : RTE_RING_QUEUE_VARIABLE 表示 如果剩余可用空间 m < n,则仅入队 m 个
 * @is_sp    :是否为单消费者
 * @free_space:在入队之前 的 m 大小
 *              返回给上层使用,用来判断是否达到 高水平线,到达后执行一个 pause 操作,参考 receive_stage 中 send_pause_frame。
*/
static __rte_always_inline unsigned int
__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
		 unsigned int n, enum rte_ring_queue_behavior behavior,
		 unsigned int is_sp, unsigned int *free_space)
{
	uint32_t prod_head, prod_next;
	uint32_t free_entries;

    /* @prod_head :最初的 r.prod.head,针对多生产者 更新尾部时需要做比对用
     * @prod_next :新的   r.prod.head,即当前生产者 占坑后 head位置
    */
	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
			&prod_head, &prod_next, &free_entries);
	if (n == 0)
		goto end;

    /* 填充数据,这里有几个有意思的地方:
     * r[1] 表示 偏移掉 rte_ring 头部,即 rte_ring 存储数据开始的位置
     * head 为 原始 r.prod.head,而 r.prod.head 代表下一个用于 生产的空间,所以这里正好用以本次存储,新的 r.prod.head 已经更新了
     * 其中 环 的 实现参考 内核的 kfifo,见后面分析。
    */
	ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);

	update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
end:
	if (free_space != NULL)
		*free_space = free_entries - n;
	return n;
}

2.1.1、移动 head

这里是 普通版本,c11 版本会获取变量时 采用 atomic 函数。

static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
		unsigned int n, enum rte_ring_queue_behavior behavior,
		uint32_t *old_head, uint32_t *new_head,
		uint32_t *free_entries)
{
	const uint32_t capacity = r->capacity;
	unsigned int max = n;
	int success;

	do {
		n = max;
		*old_head = r->prod.head; // 记录原始 head

		// 内存屏障,目前还不是很理解,只知道 用来避免优化编译代码乱序问题,即优先保证 先获取 old_head,后面理解了再写一篇
		rte_smp_rmb(); 

		// 暂时理解相当于 capacity - (*old_head - r->cons.tail)
		*free_entries = (capacity + r->cons.tail - *old_head);

		if (unlikely(n > *free_entries))
			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *free_entries;

		if (n == 0)
			return 0;

		*new_head = *old_head + n;
		if (is_sp)
			r->prod.head = *new_head, success = 1;
		else
            /* 比对 r->prod.head 和 old_head 指向的值是否相等,
             * 相等   则 r->prod.head = *new_head,并返回 1,
             * 不相等 则 r->prod.head 保持不变,并返回 0,
             *
             * 注意:这里 r->prod.head 做入参时类型为 volatile 类型,每次强制从内存读取。
            */
			success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head);
	} while (unlikely(success == 0));
	return n;
}

经过 head 移动结果如下图

2.1.2、多生产者竞

当出现 多生产者 竞争时 会出现什么情况呢?

这里假设 prod1 先执行 new_head 偏移 并 先更新 r.prod.head

  • prod1 和 prod2 已经获取到 old_head,prod1 和 prod2 更新 new_head1 和 new_head2,而

    • prod1 先 通过 CAS 判断并修改 r.prod.head 的值

    • 此时 prod2 经CAS 校验必然不通过,success 值为1,循环继续(这也是循环体内(n=max赋值的意义所在,n在循环体内会被更改)。

  • prod2 经历第二次循环时,先获取 prod1 修改后的 r.prod.head,并赋值给 old_head,在经过 CAS 经验时就可以通过了(假设没有更多生产者竞争)。

经过如上两步,出现结果:

  •  prod1 和 prod2 均获取到 各自的 old_head1 和 old_head2(两个值不同),后面更新 tail 时会用到。

  • 同时 r.prod.head 被偏移了两次,到达最终位置。

如果这里 prod2 在 prod1 已经完成 head 占坑 才进入 该入队函数,也就不存在更新 r.prod.head 时的竞争了。。。所以上面没说这种情况。

2.1.3、移动 tail

/* 这里要加注释说明一下 参数
 * @old_val:初始的   r.prod.head 或 r.cons.head
 * @new_val:更新后的 r.prod.head 或 r.cons.head
*/
static __rte_always_inline void
update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue)
{
	if (enqueue)
		rte_smp_wmb();
	else
		rte_smp_rmb();
	
    
	if (!single)
		while (unlikely(ht->tail != old_val))
			rte_pause(); // 就是等待呗。。

	ht->tail = new_val;
}

看这里比较简单,主要是 while 条件判断,针对 多生产者 模式,还是假设 prod1 先于 prod2 更新完成 r.prod.head,

  • 此时 __rte_ring_move_prod_head 针对 prod1 和 prod2 有两个出参 old_head1old_head2

  • old_head1 是满足 while 条件的,因为 在 入队的三步骤 开始之前 r.prod.head 是等于 r.prod.tail 的,然后其更新 r.prod.tail = new_head

  • 在 prod1 更新完 tail 后,prod2 的 while 条件也就满足了,也更新 tail 指针。

最终结果:r.prod.tailr.prod.head 一致,且都指向 下一个用于存储数据的 空间

 

还有一点需要注意: 在 调整 prod.tail 之前,消费者最多 消费到 prod.tail 的前一个 对象。

2.1.4、填充数据

几个有意思的地方:

  • 因为 prod_headcons_head 是一直递增的 u32 的最大值,所以使用 idx = prod_head & (r)->mask 执行 取余操作。

  • n & ((~(unsigned)0x3))) 相当于 n / 4

  • 不满足 idx + n < size 时,即到达队列尾部,需要分别存储到 队列尾部队列头部

  • 新版本 通过改变 obj_table 类型 减少copy次数 以优化性能,如:const rte_int128_t *obj = (const rte_int128_t *)obj_table

#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
	unsigned int i; \
	const uint32_t size = (r)->size; \
	uint32_t idx = prod_head & (r)->mask; \
	obj_type *ring = (obj_type *)ring_start; \
	if (likely(idx + n < size)) { \
		for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
			ring[idx] = obj_table[i]; \
			ring[idx+1] = obj_table[i+1]; \
			ring[idx+2] = obj_table[i+2]; \
			ring[idx+3] = obj_table[i+3]; \
		} \
		switch (n & 0x3) { \
		case 3: \
			ring[idx++] = obj_table[i++]; /* fallthrough */ \
		case 2: \
			ring[idx++] = obj_table[i++]; /* fallthrough */ \
		case 1: \
			ring[idx++] = obj_table[i++]; \
		} \
	} else { \ // 看这里,出现反转
		for (i = 0; idx < size; i++, idx++)\ // copy 到 ring 的后半段
			ring[idx] = obj_table[i]; \
		for (idx = 0; i < n; i++, idx++) \   // copy 到 ring 的前半段
			ring[idx] = obj_table[i]; \
	} \
} while (0)

出队和入队类似,这里不再赘述。

3、相关链接

https://blog.csdn.net/linyt/article/details/53355355

 

文章来自个人专栏
dpdk-vpp
1 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0