最近学习了一下GRAPH LIBRARY,该特性是在20.05版本引入的,核心思想是将报文处理构成node 和 link组成的有向图。与pipe line 相比,graph 可以非常直观地呈现业务的转发路径;node 理论上可以任意组合实现灵活的业务转发;最后,报文的处理是在每个node 中批量执行的,因此减少了报文处理过程的cache miss, 提高转发性能。
下面对graph library 实现进行分析。
1.基本数据结构
1.1 node
1.1.1 node结构
struct rte_node_register {
char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */
uint64_t flags; /**< Node configuration flag. */
#define RTE_NODE_SOURCE_F (1ULL << 0) /**< Node type is source. */
rte_node_process_t process; /**< Node process function. */
rte_node_init_t init; /**< Node init function. */
rte_node_fini_t fini; /**< Node fini function. */
rte_node_t id; /**< Node Identifier. */
rte_node_t parent_id; /**< Identifier of parent node. */
rte_edge_t nb_edges; /**< Number of edges from this node. */
const char *next_nodes[]; /**< Names of next nodes. */
};
node结构里flag 区分source flag和非source flag,置位source flag的称为source node,后续的graph 运行时walk都是从source node 出发。
1.1.2 node注册
node 注册的实现比较简单,name作为唯一的标志判断是否重复,如果是新的节点生成一个节点,加入全局变量node_list里。
rte_node_t
__rte_node_register(const struct rte_node_register *reg)
{
struct node *node;
rte_edge_t i;
size_t sz;
/* Limit Node specific metadata to one cacheline on 64B CL machine */
RTE_BUILD_BUG_ON((offsetof(struct rte_node, nodes) -
offsetof(struct rte_node, ctx)) !=
RTE_CACHE_LINE_MIN_SIZE);
graph_spinlock_lock();
/* Check sanity */
if (reg == NULL || reg->process == NULL) {
rte_errno = EINVAL;
goto fail;
}
/* Check for duplicate name */
if (node_has_duplicate_entry(reg->name))
goto fail;
sz = sizeof(struct node) + (reg->nb_edges * RTE_NODE_NAMESIZE);
node = calloc(1, sz);
if (node == NULL) {
rte_errno = ENOMEM;
goto fail;
}
/* Initialize the node */
if (rte_strscpy(node->name, reg->name, RTE_NODE_NAMESIZE) < 0)
goto free;
node->flags = reg->flags;
node->process = reg->process;
node->init = reg->init;
node->fini = reg->fini;
node->nb_edges = reg->nb_edges;
node->parent_id = reg->parent_id;
for (i = 0; i < reg->nb_edges; i++) {
if (rte_strscpy(node->next_nodes[i], reg->next_nodes[i],
RTE_NODE_NAMESIZE) < 0)
goto free;
}
node->id = node_id++;
/* Add the node at tail */
STAILQ_INSERT_TAIL(&node_list, node, next);
graph_spinlock_unlock();
return node->id;
free:
free(node);
fail:
graph_spinlock_unlock();
return RTE_NODE_ID_INVALID;
}
1.2 graph
1.2.1 graph结构
struct graph {
STAILQ_ENTRY(graph) next;
/**< List of graphs. */
char name[RTE_GRAPH_NAMESIZE];
/**< Name of the graph. */
const struct rte_memzone *mz;
/**< Memzone to store graph data. */
rte_graph_off_t nodes_start;
/**< Node memory start offset in graph reel. */
rte_node_t src_node_count;
/**< Number of source nodes in a graph. */
struct rte_graph *graph;
/**< Pointer to graph data. */
rte_node_t node_count;
/**< Total number of nodes. */
uint32_t cir_start;
/**< Circular buffer start offset in graph reel. */
uint32_t cir_mask;
/**< Circular buffer mask for wrap around. */
rte_graph_t id;
/**< Graph identifier. */
size_t mem_sz;
/**< Memory size of the graph. */
int socket;
/**< Socket identifier where memory is allocated. */
STAILQ_HEAD(gnode_list, graph_node) node_list;
/**< Nodes in a graph. */
};
1.2.1 graph生成
rte_graph_t
rte_graph_create(const char *name, struct rte_graph_param *prm)
{
rte_node_t src_node_count;
struct graph *graph;
const char *pattern;
uint16_t i;
graph_spinlock_lock();
/* Check arguments sanity */
if (prm == NULL)
SET_ERR_JMP(EINVAL, fail, "Param should not be NULL");
if (name == NULL)
SET_ERR_JMP(EINVAL, fail, "Graph name should not be NULL");
/* Check for existence of duplicate graph */
/* 在graph全局链表中查找看是否已有同名的graph */
STAILQ_FOREACH(graph, &graph_list, next)
if (strncmp(name, graph->name, RTE_GRAPH_NAMESIZE) == 0)
SET_ERR_JMP(EEXIST, fail, "Found duplicate graph %s",
name);
/* Create graph object */
/* 分配graph结构 */
graph = calloc(1, sizeof(*graph));
if (graph == NULL)
SET_ERR_JMP(ENOMEM, fail, "Failed to calloc graph object");
/* Initialize the graph object */
STAILQ_INIT(&graph->node_list);
if (rte_strscpy(graph->name, name, RTE_GRAPH_NAMESIZE) < 0)
SET_ERR_JMP(E2BIG, free, "Too big name=%s", name);
/* Expand node pattern and add the nodes to the graph */
for (i = 0; i < prm->nb_node_patterns; i++) {
pattern = prm->node_patterns[i];
if (expand_pattern_to_node(graph, pattern))
goto graph_cleanup;
}
/* Go over all the nodes edges and add them to the graph */
if (graph_node_edges_add(graph))
goto graph_cleanup;
/* Update adjacency list of all nodes in the graph */
if (graph_adjacency_list_update(graph))
goto graph_cleanup;
/* Make sure at least a source node present in the graph */
src_node_count = graph_src_nodes_count(graph);
if (src_node_count == 0)
goto graph_cleanup;
/* Make sure no node is pointing to source node */
if (graph_node_has_edge_to_src_node(graph))
goto graph_cleanup;
/* Don't allow node has loop to self */
if (graph_node_has_loop_edge(graph))
goto graph_cleanup;
/* Do BFS from src nodes on the graph to find isolated nodes */
if (graph_has_isolated_node(graph))
goto graph_cleanup;
/* Initialize graph object */
graph->socket = prm->socket_id;
graph->src_node_count = src_node_count;
graph->node_count = graph_nodes_count(graph);
graph->id = graph_id;
/* Allocate the Graph fast path memory and populate the data */
if (graph_fp_mem_create(graph))
goto graph_cleanup;
/* Call init() of the all the nodes in the graph */
if (graph_node_init(graph))
goto graph_mem_destroy;
/* All good, Lets add the graph to the list */
graph_id++;
STAILQ_INSERT_TAIL(&graph_list, graph, next);
graph_spinlock_unlock();
return graph->id;
graph_mem_destroy:
graph_fp_mem_destroy(graph);
graph_cleanup:
graph_cleanup(graph);
free:
free(graph);
fail:
graph_spinlock_unlock();
return RTE_GRAPH_ID_INVALID;
}
1.3 graph的内存模型
2.运行原理
实际使用中,在pmd中循环调用rte_graph_walk 完成报文的处理,所以接下来分析这个函数的实现。
static inline void
rte_graph_walk(struct rte_graph *graph)
{
const rte_graph_off_t *cir_start = graph->cir_start;
const rte_node_t mask = graph->cir_mask;
uint32_t head = graph->head;
struct rte_node *node;
uint64_t start;
uint16_t rc;
void **objs;
/*
* Walk on the source node(s) ((cir_start - head) -> cir_start) and then
* on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
* in a circular buffer fashion.
*
* +-----+ <= cir_start - head [number of source nodes]
* | |
* | ... | <= source nodes
* | |
* +-----+ <= cir_start [head = 0] [tail = 0]
* | |
* | ... | <= pending streams
* | |
* +-----+ <= cir_start + mask
*/
while (likely(head != graph->tail)) {
node = (struct rte_node *)RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
objs = node->objs;
rte_prefetch0(objs);
if (rte_graph_has_stats_feature()) {
start = rte_rdtsc();
rc = node->process(graph, node, objs, node->idx);
node->total_cycles += rte_rdtsc() - start;
node->total_calls++;
node->total_objs += rc;
} else {
node->process(graph, node, objs, node->idx);
}
node->idx = 0;
head = likely((int32_t)head > 0) ? head & mask : head;
}
graph->tail = 0;
}
结合前面的代已经函数注释中,walk 分两个阶段,第一个阶段这里head = - src_node_count,tail = 0;所以会遍历执行每个src_node的process函数,while 循环写得非常清楚。第二阶段非src_node节点的遍历和报文处理流程,会在每个node的process函数修改graph->tail,即while循环的终止条件,具体实现在后续l3fwd-graph 结合具体例子分析。
3. l3fwd-graph的运行
下面以l3fwd-graph分析代码的实现和运行
代码路径为examples\l3fwd-graph\main.c
整个流程包括eal_init,rte_eth_dev_configure对每个port进行配置,然后调用rte_eth_tx_queue_setup配置tx queue,调用rte_eth_rx_queue_setup配置每个队列的tx queue,然后调用rte_eth_dev_start对每个port进行start,这些不展开,主要看与graph有关的地方,主要有三点:
- graph的创建
通过调用rte_graph_create 创建了如下图所示的graph,这个example在ethdev_rx node初始化的时候指定了next是pkt_cls节点,所以graph 有两个next node,但是运行的时候路径是ethdev_rx ->pkt_cls。 - 表项输出是node节点
LPM route 创建和配置与原有的路由表不同,route 的next_hop 指定为node节点而不是一个ip addr。 - pmd循环运行rte_graph_walk
graph_main_loop pmd 执行的是graph_main_loop,该函数循环调用rte_graph_walk 运行该graph。
接下来分析src node的process函数,如下图所示,其他的node 的process也有类似的逻辑,实现walk的核心函数是__rte_node_enqueue_tail_update,该函数会tail ++从而修改循环终止条件,使得walk可以执行到非src node的process,从而完成整个业务流的执行。