- 线程初始化, 线程名称为 "udpif_revalidator", 默认每10秒执行一轮
void udpif_start_threads() { for(i = 0; i < udpif->n_revalidator; i++){ struct revalidator * revalidator = &udpif->revalidators[i]; revalidator->thread = ovs_thread_create("revalidator", udpif_revalidator, revalidator); } }
- udpif_revalidator线程处理逻辑
2.1 dump_create阶段
udpif->revalidators现成数组里第一个作为leader,创建udpif->dump_create的线程,实际上是一个udpif相关的结构体实例。 在期间做recirc_run的clean up(没搞清楚recircid的作用),计算udpif->max_n_flos和udpif->avg_n_flows的值且判断udpif的pause ,latch的状态。recircle_id是跳表的时候用来存放临时状态的,在 reviec_run里,是清理expired_list和expiring_list的 recircle_id
udpif_revalidator() { ..... if (leader) { uint64_t reval_seq; // expiring and expired list recirc_run(); /* Recirculation cleanup. */ // udpif->reval_sed was set at a before time. reval_seq = seq_read(udpif->reval_seq); last_reval_seq = reval_seq; n_flows = udpif_get_n_flows(udpif); udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows); udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2; /* Only the leader checks the pause latch to prevent a race where * some threads think it's false and proceed to block on * reval_barrier and others think it's true and block indefinitely * on the pause_barrier */ udpif->pause = latch_is_set(&udpif->pause_latch); /* Only the leader checks the exit latch to prevent a race where * some threads think it's true and exit and others think it's * false and block indefinitely on the reval_barrier */ udpif->reval_exit = latch_is_set(&udpif->exit_latch); start_time = time_msec(); if (!udpif->reval_exit) { bool terse_dump; terse_dump = udpif_use_ufid(udpif); udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump, NULL); } } ..... }
2.2 validate阶段
2.2.1 dump 流表遍历所有的pmd线程,累计每次达到max_limit(默认是50)就返回,并记录dpif->dump_thread->cur_pmd和 pmd->dump_position. 直接从pmd->flow_table里dump出来的是struct dp_netdev_flow, 然后转换成struct dpif_flow类型返回dpif->dpif_class->flow_dump_next(thread, flows, max_flows); // 实际上调用的函数 dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_, struct dpif_flow *flows, int max_flows) { if (!pmd) { pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos); if (!pmd) { ovs_mutex_unlock(&dump->mutex); return n_flows; } } do { for (n_flows = 0; n_flows < flow_limit; n_flows++) { struct cmap_node *node; node = cmap_next_position(&pmd->flow_table, &dump->flow_pos); if (!node) { break; } netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow, node); } /* When finishing dumping the current pmd thread, moves to * the next. */ if (n_flows < flow_limit) { memset(&dump->flow_pos, 0, sizeof dump->flow_pos); dp_netdev_pmd_unref(pmd); pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos); if (!pmd) { dump->status = EOF; break; } } /* Keeps the reference to next caller. */ dump->cur_pmd = pmd; /* If the current dump is empty, do not exit the loop, since the * remaining pmds could have flows to be dumped. Just dumps again * on the new 'pmd'. */ } while (!n_flows); for (i = 0; i < n_flows; i++) { struct odputil_keybuf *maskbuf = &thread->maskbuf[i]; struct odputil_keybuf *keybuf = &thread->keybuf[i]; struct dp_netdev_flow *netdev_flow = netdev_flows[i]; struct dpif_flow *f = &flows[i]; struct ofpbuf key, mask; ofpbuf_use_stack(&key, keybuf, sizeof *keybuf); ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf); dp_netdev_flow_to_dpif_flow(dp, netdev_flow, &key, &mask, f, dump->up.terse); } }
有个关键点隐含在dp_netdev_flow_to_dpif_flow
static void
dp_netdev_flow_to_dpif_flow(const struct dp_netdev *dp,
const struct dp_netdev_flow *netdev_flow,
struct ofpbuf *key_buf, struct ofpbuf *mask_buf,
struct dpif_flow *flow, bool terse)
{
if (terse) {
memset(flow, 0, sizeof *flow);
} else {
struct flow_wildcards wc;
struct dp_netdev_actions *actions;
size_t offset;
struct odp_flow_key_parms odp_parms = {
.flow = &netdev_flow->flow,
.mask = &wc.masks,
.support = dp_netdev_support,
};
miniflow_expand(&netdev_flow->cr.mask->mf, &wc.masks);
/* in_port is exact matched, but we have left it out from the mask for
* optimnization reasons. Add in_port back to the mask. */
wc.masks.in_port.odp_port = ODPP_NONE;
/* Key */
offset = key_buf->size;
flow->key = ofpbuf_tail(key_buf);
odp_flow_key_from_flow(&odp_parms, key_buf);
flow->key_len = key_buf->size - offset;
/* Mask */
offset = mask_buf->size;
flow->mask = ofpbuf_tail(mask_buf);
odp_parms.key_buf = key_buf;
odp_flow_key_from_mask(&odp_parms, mask_buf);
flow->mask_len = mask_buf->size - offset;
/* Actions */
actions = dp_netdev_flow_get_actions(netdev_flow);
flow->actions = actions->actions;
flow->actions_len = actions->size;
}
flow->ufid = netdev_flow->ufid;
flow->ufid_present = true;
flow->pmd_id = netdev_flow->pmd_id;
get_dpif_flow_status(dp, netdev_flow, &flow->stats, &flow->attrs);
flow->attrs.dp_extra_info = netdev_flow->dp_extra_info;
}
最终调用dpdk的接口是 netdev_offload_dpdk_flow_get
// get_dpif_flow_status--->dpif_netdev_get_flow_offload_status--->netdev_flow_get--->flow_api->flow_get--->netdev_offload_dpdk_flow_get
static long udpif_get_n_flows()
{
// below ...
dpif_netdev_get_stats();
}
static int
dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_pmd_thread *pmd;
uint64_t pmd_stats[PMD_N_STATS];
stats->n_flows = stats->n_hit = stats->n_missed = stats->n_lost = 0;
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
stats->n_flows += cmap_count(&pmd->flow_table);
pmd_perf_read_counters(&pmd->perf_stats, pmd_stats);
stats->n_hit += pmd_stats[PMD_STAT_PHWOL_HIT];
stats->n_hit += pmd_stats[PMD_STAT_SIMPLE_HIT];
stats->n_hit += pmd_stats[PMD_STAT_EXACT_HIT];
stats->n_hit += pmd_stats[PMD_STAT_SMC_HIT];
stats->n_hit += pmd_stats[PMD_STAT_MASKED_HIT];
stats->n_missed += pmd_stats[PMD_STAT_MISS];
stats->n_lost += pmd_stats[PMD_STAT_LOST];
}
stats->n_masks = UINT32_MAX;
stats->n_mask_hit = UINT64_MAX;
stats->n_cache_hit = UINT64_MAX;
return 0;
}
2.2 max_ide值默认为10s,如果实际的dp流表大于flow limit的值(默认20w),那么max_ide=0.1s
max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
for (f = flows; f < &flows[n_dumped]; f++) {
long long int used = f->stats.used;
struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
struct dpif_flow_stats stats = f->stats;
enum reval_result result;
struct udpif_key *ukey;
bool already_dumped;
int error;
// 查询到或创建成功新的,都返回0
// ukey_acquire间接的会调用的ukey_create)
if (ukey_acquire(udpif, f, &ukey, &error)) {
// error log
}
根据新读取的flow->used时间和ukey->used旧值做差值,判断是否满足需要被删除
if (!used) {
// 对stats进行赋值,然后在revalidate_ukey里对ukey->stats赋值
used = udpif_update_used(udpif, ukey, &stats);
}
if (kill_them_all || (used && used < now - max_idle)) {
result = UKEY_DELETE;
} else {
result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
reval_seq, &recircs,
f->attrs.offloaded);
}
ukey->dump_seq = dump_seq;
2.2.4 revalidate_ukey
/* Verifies that the datapath actions of 'ukey' are still correct, and pushes
* 'stats' for it.
*
* Returns a recommended action for 'ukey', options include:
* UKEY_DELETE The ukey should be deleted.
* UKEY_KEEP The ukey is fine as is.
* UKEY_MODIFY The ukey's actions should be changed but is otherwise
* fine. Callers should change the actions to those found
* in the caller supplied 'odp_actions' buffer. The
* recirculation references can be found in 'recircs' and
* must be handled by the caller.
*
* If the result is UKEY_MODIFY, then references to all recirc_ids used by the
* new flow will be held within 'recircs' (which may be none).
*
* The caller is responsible for both initializing 'recircs' prior this call,
* and ensuring any references are eventually freed.
*/
static enum reval_result
revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
const struct dpif_flow_stats *stats,
struct ofpbuf *odp_actions, uint64_t reval_seq,
struct recirc_refs *recircs, bool offloaded)
OVS_REQUIRES(ukey->mutex)
{
bool need_revalidate = ukey->reval_seq != reval_seq;
enum reval_result result = UKEY_DELETE;
struct dpif_flow_stats push;
ofpbuf_clear(odp_actions);
push.used = stats->used;
push.tcp_flags = stats->tcp_flags;
push.n_packets = (stats->n_packets > ukey->stats.n_packets
? stats->n_packets - ukey->stats.n_packets
: 0);
push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
? stats->n_bytes - ukey->stats.n_bytes
: 0);
if (need_revalidate) {
if (should_revalidate(udpif, push.n_packets, ukey->stats.used)) {
if (!ukey->xcache) {
ukey->xcache = xlate_cache_new();
} else {
xlate_cache_clear(ukey->xcache);
}
result = revalidate_ukey__(udpif, ukey, push.tcp_flags,
odp_actions, recircs, ukey->xcache);
} /* else delete; too expensive to revalidate */
} else if (!push.n_packets || ukey->xcache
|| !populate_xcache(udpif, ukey, push.tcp_flags)) {
result = UKEY_KEEP;
}
/* Stats for deleted flows will be attributed upon flow deletion. Skip. */
if (result != UKEY_DELETE) {
xlate_push_stats(ukey->xcache, &push, offloaded);
ukey->stats = *stats;
ukey->reval_seq = reval_seq;
}
return result;
}
revalidate_ukey
static enum reval_result
revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey,
uint16_t tcp_flags, struct ofpbuf *odp_actions,
struct recirc_refs *recircs, struct xlate_cache *xcache)
{
struct xlate_out *xoutp;
struct netflow *netflow;
struct flow_wildcards dp_mask, wc;
enum reval_result result;
struct reval_context ctx = {
.odp_actions = odp_actions,
.netflow = &netflow,
.xcache = xcache,
.wc = &wc,
};
result = UKEY_DELETE;
xoutp = NULL;
netflow = NULL;
if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) {
goto exit;
}
xoutp = &ctx.xout;
if (xoutp->avoid_caching) {
goto exit;
}
if (xoutp->slow) {
struct ofproto_dpif *ofproto;
ofp_port_t ofp_in_port;
ofproto = xlate_lookup_ofproto(udpif->backer, &ctx.flow, &ofp_in_port,
NULL);
ofpbuf_clear(odp_actions);
if (!ofproto) {
goto exit;
}
compose_slow_path(udpif, xoutp, ctx.flow.in_port.odp_port,
ofp_in_port, odp_actions,
ofproto->up.slowpath_meter_id, &ofproto->uuid);
}
if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &ctx.flow,
NULL)
== ODP_FIT_ERROR) {
goto exit;
}
/* Do not modify if any bit is wildcarded by the installed datapath flow,
* but not the newly revalidated wildcard mask (wc), i.e., if revalidation
* tells that the datapath flow is now too generic and must be narrowed
* down. Note that we do not know if the datapath has ignored any of the
* wildcarded bits, so we may be overly conservative here. */
if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) {
goto exit;
}
//
if (!ofpbuf_equal(odp_actions,
ovsrcu_get(struct ofpbuf *, &ukey->actions))) {
/* The datapath mask was OK, but the actions seem to have changed.
* Let's modify it in place. */
result = UKEY_MODIFY;
/* Transfer recirc action ID references to the caller. */
recirc_refs_swap(recircs, &xoutp->recircs);
goto exit;
}
result = UKEY_KEEP;
exit:
if (netflow && result == UKEY_DELETE) {
netflow_flow_clear(netflow, &ctx.flow);
}
xlate_out_uninit(xoutp);
return result;
}
3.3 sweep阶段
revalidator_sweep间接调用revalidator_sweep__,且purge传值为 false;
static void
revalidator_sweep__(struct revalidator *revalidator, bool purge)
{
struct udpif *udpif;
uint64_t dump_seq, reval_seq;
int slice;
udpif = revalidator->udpif;
dump_seq = seq_read(udpif->dump_seq);
reval_seq = seq_read(udpif->reval_seq);
// udpif->revalidators is the base address of array
slice = revalidator - udpif->revalidators;
ovs_assert(slice < udpif->n_revalidators);
// 遍历所有revalidator线程
for (int i = slice; i < N_UMAPS; i += udpif->n_revalidators) {
uint64_t odp_actions_stub[1024 / 8];
struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);
struct ukey_op ops[REVALIDATE_MAX_BATCH];
struct udpif_key *ukey;
// udpif->ukyes[]
struct umap *umap = &udpif->ukeys[i];
size_t n_ops = 0;
CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
enum ukey_state ukey_state;
/* Handler threads could be holding a ukey lock while it installs a
* new flow, so don't hang around waiting for access to it. */
if (ovs_mutex_trylock(&ukey->mutex)) {
continue;
}
ukey_state = ukey->state;
if (ukey_state == UKEY_OPERATIONAL
|| (ukey_state == UKEY_VISIBLE && purge)) { // purge is false
struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
bool seq_mismatch = (ukey->dump_seq != dump_seq
&& ukey->reval_seq != reval_seq);
enum reval_result result;
if (purge) {
result = UKEY_DELETE;
} else if (!seq_mismatch) {
result = UKEY_KEEP;
COVERAGE
_INC(revalidate_missed_dp_flow
);
memset(&stats, 0, sizeof stats);
result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
reval_seq, &recircs, false);
}
if (result != UKEY_KEEP) {
/* Clears 'recircs' if filled by revalidate_ukey(). */
reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs,
&odp_actions);
}
}
ovs_mutex_unlock(&ukey->mutex);
if (ukey_state == UKEY_EVICTED) {
/* The common flow deletion case involves deletion of the flow
* during the dump phase and ukey deletion here. */
ovs_mutex_lock(&umap->mutex);
ukey_delete(umap, ukey);
ovs_mutex_unlock(&umap->mutex);
}
if (n_ops == REVALIDATE_MAX_BATCH) {
/* Update/delete missed flows and clean up corresponding ukeys
* if necessary. */
push_ukey_ops(udpif, umap, ops, n_ops);
n_ops = 0;
}
}
if (n_ops) {
push_ukey_ops(udpif, umap, ops, n_ops);
}
ofpbuf_uninit(&odp_actions);
ovsrcu_quiesce();
}
}