searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

ovs流表老化线程实现原理

2024-10-12 09:45:10
20
0
  1. 线程初始化, 线程名称为 "udpif_revalidator", 默认每10秒执行一轮
    void udpif_start_threads()
    {
         for(i = 0; i < udpif->n_revalidator; i++){
              struct revalidator * revalidator = &udpif->revalidators[i];
              revalidator->thread = ovs_thread_create("revalidator", udpif_revalidator, revalidator);
         }
    }
  2. udpif_revalidator线程处理逻辑

    2.1 dump_create阶段

          udpif->revalidators现成数组里第一个作为leader,创建udpif->dump_create的线程,实际上是一个udpif相关的结构体实例。 在期间做recirc_run的clean up(没搞清楚recircid的作用),计算udpif->max_n_flos和udpif->avg_n_flows的值且判断udpif的pause ,latch的状态。recircle_id是跳表的时候用来存放临时状态的,在 reviec_run里,是清理expired_list和expiring_list的 recircle_id

    udpif_revalidator()
    {
         .....
          if (leader) {
                uint64_t reval_seq;
    
                // expiring and expired list
                recirc_run(); /* Recirculation cleanup. */
    
                // udpif->reval_sed was set at a before time.
                reval_seq = seq_read(udpif->reval_seq);
                last_reval_seq = reval_seq;
    
                n_flows = udpif_get_n_flows(udpif);
                udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
                udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
    
                /* Only the leader checks the pause latch to prevent a race where
                 * some threads think it's false and proceed to block on
                 * reval_barrier and others think it's true and block indefinitely
                 * on the pause_barrier */
                udpif->pause = latch_is_set(&udpif->pause_latch);
    
                /* Only the leader checks the exit latch to prevent a race where
                 * some threads think it's true and exit and others think it's
                 * false and block indefinitely on the reval_barrier */
                udpif->reval_exit = latch_is_set(&udpif->exit_latch);
    
                start_time = time_msec();
                if (!udpif->reval_exit) {
                    bool terse_dump;
    
                    terse_dump = udpif_use_ufid(udpif);
                    udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump,
                                                        NULL);
                }
            }
            .....
    }

    2.2 validate阶段

        2.2.1 dump 流表
            遍历所有的pmd线程,累计每次达到max_limit(默认是50)就返回,并记录dpif->dump_thread->cur_pmd和 pmd->dump_position. 直接从pmd->flow_table里dump出来的是struct dp_netdev_flow, 然后转换成struct dpif_flow类型返回

     

    dpif->dpif_class->flow_dump_next(thread, flows, max_flows);
    
    // 实际上调用的函数
    dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_, struct dpif_flow *flows, int max_flows)
    {
            if (!pmd) {
                pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
                if (!pmd) {
                    ovs_mutex_unlock(&dump->mutex);
                    return n_flows;
                }
            }
            do {
                for (n_flows = 0; n_flows < flow_limit; n_flows++) {
                    struct cmap_node *node;
    
                    node = cmap_next_position(&pmd->flow_table, &dump->flow_pos);
                    if (!node) {
                        break;
                    }
                    netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow, node);
                }
                /* When finishing dumping the current pmd thread, moves to
                 * the next. */
                if (n_flows < flow_limit) {
                    memset(&dump->flow_pos, 0, sizeof dump->flow_pos);
                    dp_netdev_pmd_unref(pmd);
                    pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
                    if (!pmd) {
                        dump->status = EOF;
                        break;
                    }
                }
                /* Keeps the reference to next caller. */
                dump->cur_pmd = pmd;
    
                /* If the current dump is empty, do not exit the loop, since the
                 * remaining pmds could have flows to be dumped.  Just dumps again
                 * on the new 'pmd'. */
            } while (!n_flows);   
        
       for (i = 0; i < n_flows; i++) {
            struct odputil_keybuf *maskbuf = &thread->maskbuf[i];
            struct odputil_keybuf *keybuf = &thread->keybuf[i];
            struct dp_netdev_flow *netdev_flow = netdev_flows[i];
            struct dpif_flow *f = &flows[i];
            struct ofpbuf key, mask;
    
            ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
            ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
            dp_netdev_flow_to_dpif_flow(dp, netdev_flow, &key, &mask, f,
                                        dump->up.terse);
        }    
    }

         

        有个关键点隐含在dp_netdev_flow_to_dpif_flow

   

static void
dp_netdev_flow_to_dpif_flow(const struct dp_netdev *dp,
                            const struct dp_netdev_flow *netdev_flow,
                            struct ofpbuf *key_buf, struct ofpbuf *mask_buf,
                            struct dpif_flow *flow, bool terse)
{
    if (terse) {
        memset(flow, 0, sizeof *flow);
    } else {
        struct flow_wildcards wc;
        struct dp_netdev_actions *actions;
        size_t offset;
        struct odp_flow_key_parms odp_parms = {
            .flow = &netdev_flow->flow,
            .mask = &wc.masks,
            .support = dp_netdev_support,
        };

        miniflow_expand(&netdev_flow->cr.mask->mf, &wc.masks);
        /* in_port is exact matched, but we have left it out from the mask for
         * optimnization reasons. Add in_port back to the mask. */
        wc.masks.in_port.odp_port = ODPP_NONE;

        /* Key */
        offset = key_buf->size;
        flow->key = ofpbuf_tail(key_buf);
        odp_flow_key_from_flow(&odp_parms, key_buf);
        flow->key_len = key_buf->size - offset;

        /* Mask */
        offset = mask_buf->size;
        flow->mask = ofpbuf_tail(mask_buf);
        odp_parms.key_buf = key_buf;
        odp_flow_key_from_mask(&odp_parms, mask_buf);
        flow->mask_len = mask_buf->size - offset;

        /* Actions */
        actions = dp_netdev_flow_get_actions(netdev_flow);
        flow->actions = actions->actions;
        flow->actions_len = actions->size;
    }

    flow->ufid = netdev_flow->ufid;
    flow->ufid_present = true;
    flow->pmd_id = netdev_flow->pmd_id;

    get_dpif_flow_status(dp, netdev_flow, &flow->stats, &flow->attrs);
    flow->attrs.dp_extra_info = netdev_flow->dp_extra_info;
}
最终调用dpdk的接口是 netdev_offload_dpdk_flow_get  
// get_dpif_flow_status--->dpif_netdev_get_flow_offload_status--->netdev_flow_get--->flow_api->flow_get--->netdev_offload_dpdk_flow_get


static long udpif_get_n_flows()
{
    // below ...
   dpif_netdev_get_stats();     
}

static int
dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
{
    struct dp_netdev *dp = get_dp_netdev(dpif);
    struct dp_netdev_pmd_thread *pmd;
    uint64_t pmd_stats[PMD_N_STATS];

    stats->n_flows = stats->n_hit = stats->n_missed = stats->n_lost = 0;
    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
        stats->n_flows += cmap_count(&pmd->flow_table);
        pmd_perf_read_counters(&pmd->perf_stats, pmd_stats);
        stats->n_hit += pmd_stats[PMD_STAT_PHWOL_HIT];
        stats->n_hit += pmd_stats[PMD_STAT_SIMPLE_HIT];
        stats->n_hit += pmd_stats[PMD_STAT_EXACT_HIT];
        stats->n_hit += pmd_stats[PMD_STAT_SMC_HIT];
        stats->n_hit += pmd_stats[PMD_STAT_MASKED_HIT];
        stats->n_missed += pmd_stats[PMD_STAT_MISS];
        stats->n_lost += pmd_stats[PMD_STAT_LOST];
    }
    stats->n_masks = UINT32_MAX;
    stats->n_mask_hit = UINT64_MAX;
    stats->n_cache_hit = UINT64_MAX;

    return 0;
}

2.2  max_ide值默认为10s,如果实际的dp流表大于flow limit的值(默认20w),那么max_ide=0.1s

max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;


        for (f = flows; f < &flows[n_dumped]; f++) {
            long long int used = f->stats.used;
            struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
            struct dpif_flow_stats stats = f->stats;
            enum reval_result result;
            struct udpif_key *ukey;
            bool already_dumped;
            int error;

            // 查询到或创建成功新的,都返回0
            // ukey_acquire间接的会调用的ukey_create)
            if (ukey_acquire(udpif, f, &ukey, &error)) {  
            // error log
          }

 根据新读取的flow->used时间和ukey->used旧值做差值,判断是否满足需要被删除

     
            if (!used) {
                // 对stats进行赋值,然后在revalidate_ukey里对ukey->stats赋值
                used = udpif_update_used(udpif, ukey, &stats);
            }
            if (kill_them_all || (used && used < now - max_idle)) {
                result = UKEY_DELETE;
            } else {
                result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
                                         reval_seq, &recircs,
                                         f->attrs.offloaded);
            }
            ukey->dump_seq = dump_seq;

  2.2.4 revalidate_ukey 

   

/* Verifies that the datapath actions of 'ukey' are still correct, and pushes
 * 'stats' for it.
 *
 * Returns a recommended action for 'ukey', options include:
 *      UKEY_DELETE The ukey should be deleted.
 *      UKEY_KEEP   The ukey is fine as is.
 *      UKEY_MODIFY The ukey's actions should be changed but is otherwise
 *                  fine.  Callers should change the actions to those found
 *                  in the caller supplied 'odp_actions' buffer.  The
 *                  recirculation references can be found in 'recircs' and
 *                  must be handled by the caller.
 *
 * If the result is UKEY_MODIFY, then references to all recirc_ids used by the
 * new flow will be held within 'recircs' (which may be none).
 *
 * The caller is responsible for both initializing 'recircs' prior this call,
 * and ensuring any references are eventually freed.
 */
static enum reval_result
revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
                const struct dpif_flow_stats *stats,
                struct ofpbuf *odp_actions, uint64_t reval_seq,
                struct recirc_refs *recircs, bool offloaded)
    OVS_REQUIRES(ukey->mutex)
{
    bool need_revalidate = ukey->reval_seq != reval_seq;
    enum reval_result result = UKEY_DELETE;
    struct dpif_flow_stats push;

    ofpbuf_clear(odp_actions);

    push.used = stats->used;
    push.tcp_flags = stats->tcp_flags;
    push.n_packets = (stats->n_packets > ukey->stats.n_packets
                      ? stats->n_packets - ukey->stats.n_packets
                      : 0);
    push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
                    ? stats->n_bytes - ukey->stats.n_bytes
                    : 0);

    if (need_revalidate) {
        if (should_revalidate(udpif, push.n_packets, ukey->stats.used)) {
            if (!ukey->xcache) {
                ukey->xcache = xlate_cache_new();
            } else {
                xlate_cache_clear(ukey->xcache);
            }
            result = revalidate_ukey__(udpif, ukey, push.tcp_flags,
                                       odp_actions, recircs, ukey->xcache);
        } /* else delete; too expensive to revalidate */
    } else if (!push.n_packets || ukey->xcache
               || !populate_xcache(udpif, ukey, push.tcp_flags)) {
        result = UKEY_KEEP;
    }

    /* Stats for deleted flows will be attributed upon flow deletion. Skip. */
    if (result != UKEY_DELETE) {
        xlate_push_stats(ukey->xcache, &push, offloaded);
        ukey->stats = *stats;
        ukey->reval_seq = reval_seq;
    }

    return result;
}

  revalidate_ukey

  

static enum reval_result
revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey,
                  uint16_t tcp_flags, struct ofpbuf *odp_actions,
                  struct recirc_refs *recircs, struct xlate_cache *xcache)
{
    struct xlate_out *xoutp;
    struct netflow *netflow;
    struct flow_wildcards dp_mask, wc;
    enum reval_result result;
    struct reval_context ctx = {
        .odp_actions = odp_actions,
        .netflow = &netflow,
        .xcache = xcache,
        .wc = &wc,
    };

    result = UKEY_DELETE;
    xoutp = NULL;
    netflow = NULL;

    if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) {
        goto exit;
    }
    xoutp = &ctx.xout;

    if (xoutp->avoid_caching) {
        goto exit;
    }

    if (xoutp->slow) {
        struct ofproto_dpif *ofproto;
        ofp_port_t ofp_in_port;

        ofproto = xlate_lookup_ofproto(udpif->backer, &ctx.flow, &ofp_in_port,
                                       NULL);

        ofpbuf_clear(odp_actions);

        if (!ofproto) {
            goto exit;
        }

        compose_slow_path(udpif, xoutp, ctx.flow.in_port.odp_port,
                          ofp_in_port, odp_actions,
                          ofproto->up.slowpath_meter_id, &ofproto->uuid);
    }

    if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &ctx.flow,
                             NULL)
        == ODP_FIT_ERROR) {
        goto exit;
    }

    /* Do not modify if any bit is wildcarded by the installed datapath flow,
     * but not the newly revalidated wildcard mask (wc), i.e., if revalidation
     * tells that the datapath flow is now too generic and must be narrowed
     * down.  Note that we do not know if the datapath has ignored any of the
     * wildcarded bits, so we may be overly conservative here. */
    if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) {
        goto exit;
    }

    // 
    if (!ofpbuf_equal(odp_actions,
                      ovsrcu_get(struct ofpbuf *, &ukey->actions))) {
        /* The datapath mask was OK, but the actions seem to have changed.
         * Let's modify it in place. */
        result = UKEY_MODIFY;
        /* Transfer recirc action ID references to the caller. */
        recirc_refs_swap(recircs, &xoutp->recircs);
        goto exit;
    }

    result = UKEY_KEEP;

exit:
    if (netflow && result == UKEY_DELETE) {
        netflow_flow_clear(netflow, &ctx.flow);
    }
    xlate_out_uninit(xoutp);
    return result;
}

 

 

 

3.3 sweep阶段

    revalidator_sweep间接调用revalidator_sweep__,且purge传值为 false;

   

static void
revalidator_sweep__(struct revalidator *revalidator, bool purge)
{
    struct udpif *udpif;
    uint64_t dump_seq, reval_seq;
    int slice;

    udpif = revalidator->udpif;
    dump_seq = seq_read(udpif->dump_seq);
    reval_seq = seq_read(udpif->reval_seq);
    // udpif->revalidators is the base address of array
    slice = revalidator - udpif->revalidators;
    ovs_assert(slice < udpif->n_revalidators);

    // 遍历所有revalidator线程
    for (int i = slice; i < N_UMAPS; i += udpif->n_revalidators) {
        uint64_t odp_actions_stub[1024 / 8];
        struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);

        struct ukey_op ops[REVALIDATE_MAX_BATCH];
        struct udpif_key *ukey;
        // udpif->ukyes[]
        struct umap *umap = &udpif->ukeys[i];
        size_t n_ops = 0;

        CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
            enum ukey_state ukey_state;

            /* Handler threads could be holding a ukey lock while it installs a
             * new flow, so don't hang around waiting for access to it. */
            if (ovs_mutex_trylock(&ukey->mutex)) {
                continue;
            }
            ukey_state = ukey->state;
            if (ukey_state == UKEY_OPERATIONAL
                || (ukey_state == UKEY_VISIBLE && purge)) { // purge is false 
                struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
                bool seq_mismatch = (ukey->dump_seq != dump_seq
                                     && ukey->reval_seq != reval_seq);
                enum reval_result result;

                if (purge) {
                    result = UKEY_DELETE;
                } else if (!seq_mismatch) {
                    result = UKEY_KEEP;
                COVERAGE
                   _INC(revalidate_missed_dp_flow
                   );
                    memset(&stats, 0, sizeof stats);
                    result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
                                             reval_seq, &recircs, false);
                }
                if (result != UKEY_KEEP) {
                    /* Clears 'recircs' if filled by revalidate_ukey(). */
                    reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs,
                                  &odp_actions);
                }
            }
            ovs_mutex_unlock(&ukey->mutex);

            if (ukey_state == UKEY_EVICTED) {
                /* The common flow deletion case involves deletion of the flow
                 * during the dump phase and ukey deletion here. */
                ovs_mutex_lock(&umap->mutex);
                ukey_delete(umap, ukey);
                ovs_mutex_unlock(&umap->mutex);
            }

            if (n_ops == REVALIDATE_MAX_BATCH) {
                /* Update/delete missed flows and clean up corresponding ukeys
                 * if necessary. */
                push_ukey_ops(udpif, umap, ops, n_ops);
                n_ops = 0;
            }
        }

        if (n_ops) {
            push_ukey_ops(udpif, umap, ops, n_ops);
        }

        ofpbuf_uninit(&odp_actions);
        ovsrcu_quiesce();
    }
}
0条评论
0 / 1000
y****n
2文章数
1粉丝数
y****n
2 文章 | 1 粉丝
y****n
2文章数
1粉丝数
y****n
2 文章 | 1 粉丝
原创

ovs流表老化线程实现原理

2024-10-12 09:45:10
20
0
  1. 线程初始化, 线程名称为 "udpif_revalidator", 默认每10秒执行一轮
    void udpif_start_threads()
    {
         for(i = 0; i < udpif->n_revalidator; i++){
              struct revalidator * revalidator = &udpif->revalidators[i];
              revalidator->thread = ovs_thread_create("revalidator", udpif_revalidator, revalidator);
         }
    }
  2. udpif_revalidator线程处理逻辑

    2.1 dump_create阶段

          udpif->revalidators现成数组里第一个作为leader,创建udpif->dump_create的线程,实际上是一个udpif相关的结构体实例。 在期间做recirc_run的clean up(没搞清楚recircid的作用),计算udpif->max_n_flos和udpif->avg_n_flows的值且判断udpif的pause ,latch的状态。recircle_id是跳表的时候用来存放临时状态的,在 reviec_run里,是清理expired_list和expiring_list的 recircle_id

    udpif_revalidator()
    {
         .....
          if (leader) {
                uint64_t reval_seq;
    
                // expiring and expired list
                recirc_run(); /* Recirculation cleanup. */
    
                // udpif->reval_sed was set at a before time.
                reval_seq = seq_read(udpif->reval_seq);
                last_reval_seq = reval_seq;
    
                n_flows = udpif_get_n_flows(udpif);
                udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
                udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
    
                /* Only the leader checks the pause latch to prevent a race where
                 * some threads think it's false and proceed to block on
                 * reval_barrier and others think it's true and block indefinitely
                 * on the pause_barrier */
                udpif->pause = latch_is_set(&udpif->pause_latch);
    
                /* Only the leader checks the exit latch to prevent a race where
                 * some threads think it's true and exit and others think it's
                 * false and block indefinitely on the reval_barrier */
                udpif->reval_exit = latch_is_set(&udpif->exit_latch);
    
                start_time = time_msec();
                if (!udpif->reval_exit) {
                    bool terse_dump;
    
                    terse_dump = udpif_use_ufid(udpif);
                    udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump,
                                                        NULL);
                }
            }
            .....
    }

    2.2 validate阶段

        2.2.1 dump 流表
            遍历所有的pmd线程,累计每次达到max_limit(默认是50)就返回,并记录dpif->dump_thread->cur_pmd和 pmd->dump_position. 直接从pmd->flow_table里dump出来的是struct dp_netdev_flow, 然后转换成struct dpif_flow类型返回

     

    dpif->dpif_class->flow_dump_next(thread, flows, max_flows);
    
    // 实际上调用的函数
    dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_, struct dpif_flow *flows, int max_flows)
    {
            if (!pmd) {
                pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
                if (!pmd) {
                    ovs_mutex_unlock(&dump->mutex);
                    return n_flows;
                }
            }
            do {
                for (n_flows = 0; n_flows < flow_limit; n_flows++) {
                    struct cmap_node *node;
    
                    node = cmap_next_position(&pmd->flow_table, &dump->flow_pos);
                    if (!node) {
                        break;
                    }
                    netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow, node);
                }
                /* When finishing dumping the current pmd thread, moves to
                 * the next. */
                if (n_flows < flow_limit) {
                    memset(&dump->flow_pos, 0, sizeof dump->flow_pos);
                    dp_netdev_pmd_unref(pmd);
                    pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
                    if (!pmd) {
                        dump->status = EOF;
                        break;
                    }
                }
                /* Keeps the reference to next caller. */
                dump->cur_pmd = pmd;
    
                /* If the current dump is empty, do not exit the loop, since the
                 * remaining pmds could have flows to be dumped.  Just dumps again
                 * on the new 'pmd'. */
            } while (!n_flows);   
        
       for (i = 0; i < n_flows; i++) {
            struct odputil_keybuf *maskbuf = &thread->maskbuf[i];
            struct odputil_keybuf *keybuf = &thread->keybuf[i];
            struct dp_netdev_flow *netdev_flow = netdev_flows[i];
            struct dpif_flow *f = &flows[i];
            struct ofpbuf key, mask;
    
            ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
            ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
            dp_netdev_flow_to_dpif_flow(dp, netdev_flow, &key, &mask, f,
                                        dump->up.terse);
        }    
    }

         

        有个关键点隐含在dp_netdev_flow_to_dpif_flow

   

static void
dp_netdev_flow_to_dpif_flow(const struct dp_netdev *dp,
                            const struct dp_netdev_flow *netdev_flow,
                            struct ofpbuf *key_buf, struct ofpbuf *mask_buf,
                            struct dpif_flow *flow, bool terse)
{
    if (terse) {
        memset(flow, 0, sizeof *flow);
    } else {
        struct flow_wildcards wc;
        struct dp_netdev_actions *actions;
        size_t offset;
        struct odp_flow_key_parms odp_parms = {
            .flow = &netdev_flow->flow,
            .mask = &wc.masks,
            .support = dp_netdev_support,
        };

        miniflow_expand(&netdev_flow->cr.mask->mf, &wc.masks);
        /* in_port is exact matched, but we have left it out from the mask for
         * optimnization reasons. Add in_port back to the mask. */
        wc.masks.in_port.odp_port = ODPP_NONE;

        /* Key */
        offset = key_buf->size;
        flow->key = ofpbuf_tail(key_buf);
        odp_flow_key_from_flow(&odp_parms, key_buf);
        flow->key_len = key_buf->size - offset;

        /* Mask */
        offset = mask_buf->size;
        flow->mask = ofpbuf_tail(mask_buf);
        odp_parms.key_buf = key_buf;
        odp_flow_key_from_mask(&odp_parms, mask_buf);
        flow->mask_len = mask_buf->size - offset;

        /* Actions */
        actions = dp_netdev_flow_get_actions(netdev_flow);
        flow->actions = actions->actions;
        flow->actions_len = actions->size;
    }

    flow->ufid = netdev_flow->ufid;
    flow->ufid_present = true;
    flow->pmd_id = netdev_flow->pmd_id;

    get_dpif_flow_status(dp, netdev_flow, &flow->stats, &flow->attrs);
    flow->attrs.dp_extra_info = netdev_flow->dp_extra_info;
}
最终调用dpdk的接口是 netdev_offload_dpdk_flow_get  
// get_dpif_flow_status--->dpif_netdev_get_flow_offload_status--->netdev_flow_get--->flow_api->flow_get--->netdev_offload_dpdk_flow_get


static long udpif_get_n_flows()
{
    // below ...
   dpif_netdev_get_stats();     
}

static int
dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
{
    struct dp_netdev *dp = get_dp_netdev(dpif);
    struct dp_netdev_pmd_thread *pmd;
    uint64_t pmd_stats[PMD_N_STATS];

    stats->n_flows = stats->n_hit = stats->n_missed = stats->n_lost = 0;
    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
        stats->n_flows += cmap_count(&pmd->flow_table);
        pmd_perf_read_counters(&pmd->perf_stats, pmd_stats);
        stats->n_hit += pmd_stats[PMD_STAT_PHWOL_HIT];
        stats->n_hit += pmd_stats[PMD_STAT_SIMPLE_HIT];
        stats->n_hit += pmd_stats[PMD_STAT_EXACT_HIT];
        stats->n_hit += pmd_stats[PMD_STAT_SMC_HIT];
        stats->n_hit += pmd_stats[PMD_STAT_MASKED_HIT];
        stats->n_missed += pmd_stats[PMD_STAT_MISS];
        stats->n_lost += pmd_stats[PMD_STAT_LOST];
    }
    stats->n_masks = UINT32_MAX;
    stats->n_mask_hit = UINT64_MAX;
    stats->n_cache_hit = UINT64_MAX;

    return 0;
}

2.2  max_ide值默认为10s,如果实际的dp流表大于flow limit的值(默认20w),那么max_ide=0.1s

max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;


        for (f = flows; f < &flows[n_dumped]; f++) {
            long long int used = f->stats.used;
            struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
            struct dpif_flow_stats stats = f->stats;
            enum reval_result result;
            struct udpif_key *ukey;
            bool already_dumped;
            int error;

            // 查询到或创建成功新的,都返回0
            // ukey_acquire间接的会调用的ukey_create)
            if (ukey_acquire(udpif, f, &ukey, &error)) {  
            // error log
          }

 根据新读取的flow->used时间和ukey->used旧值做差值,判断是否满足需要被删除

     
            if (!used) {
                // 对stats进行赋值,然后在revalidate_ukey里对ukey->stats赋值
                used = udpif_update_used(udpif, ukey, &stats);
            }
            if (kill_them_all || (used && used < now - max_idle)) {
                result = UKEY_DELETE;
            } else {
                result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
                                         reval_seq, &recircs,
                                         f->attrs.offloaded);
            }
            ukey->dump_seq = dump_seq;

  2.2.4 revalidate_ukey 

   

/* Verifies that the datapath actions of 'ukey' are still correct, and pushes
 * 'stats' for it.
 *
 * Returns a recommended action for 'ukey', options include:
 *      UKEY_DELETE The ukey should be deleted.
 *      UKEY_KEEP   The ukey is fine as is.
 *      UKEY_MODIFY The ukey's actions should be changed but is otherwise
 *                  fine.  Callers should change the actions to those found
 *                  in the caller supplied 'odp_actions' buffer.  The
 *                  recirculation references can be found in 'recircs' and
 *                  must be handled by the caller.
 *
 * If the result is UKEY_MODIFY, then references to all recirc_ids used by the
 * new flow will be held within 'recircs' (which may be none).
 *
 * The caller is responsible for both initializing 'recircs' prior this call,
 * and ensuring any references are eventually freed.
 */
static enum reval_result
revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
                const struct dpif_flow_stats *stats,
                struct ofpbuf *odp_actions, uint64_t reval_seq,
                struct recirc_refs *recircs, bool offloaded)
    OVS_REQUIRES(ukey->mutex)
{
    bool need_revalidate = ukey->reval_seq != reval_seq;
    enum reval_result result = UKEY_DELETE;
    struct dpif_flow_stats push;

    ofpbuf_clear(odp_actions);

    push.used = stats->used;
    push.tcp_flags = stats->tcp_flags;
    push.n_packets = (stats->n_packets > ukey->stats.n_packets
                      ? stats->n_packets - ukey->stats.n_packets
                      : 0);
    push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
                    ? stats->n_bytes - ukey->stats.n_bytes
                    : 0);

    if (need_revalidate) {
        if (should_revalidate(udpif, push.n_packets, ukey->stats.used)) {
            if (!ukey->xcache) {
                ukey->xcache = xlate_cache_new();
            } else {
                xlate_cache_clear(ukey->xcache);
            }
            result = revalidate_ukey__(udpif, ukey, push.tcp_flags,
                                       odp_actions, recircs, ukey->xcache);
        } /* else delete; too expensive to revalidate */
    } else if (!push.n_packets || ukey->xcache
               || !populate_xcache(udpif, ukey, push.tcp_flags)) {
        result = UKEY_KEEP;
    }

    /* Stats for deleted flows will be attributed upon flow deletion. Skip. */
    if (result != UKEY_DELETE) {
        xlate_push_stats(ukey->xcache, &push, offloaded);
        ukey->stats = *stats;
        ukey->reval_seq = reval_seq;
    }

    return result;
}

  revalidate_ukey

  

static enum reval_result
revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey,
                  uint16_t tcp_flags, struct ofpbuf *odp_actions,
                  struct recirc_refs *recircs, struct xlate_cache *xcache)
{
    struct xlate_out *xoutp;
    struct netflow *netflow;
    struct flow_wildcards dp_mask, wc;
    enum reval_result result;
    struct reval_context ctx = {
        .odp_actions = odp_actions,
        .netflow = &netflow,
        .xcache = xcache,
        .wc = &wc,
    };

    result = UKEY_DELETE;
    xoutp = NULL;
    netflow = NULL;

    if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) {
        goto exit;
    }
    xoutp = &ctx.xout;

    if (xoutp->avoid_caching) {
        goto exit;
    }

    if (xoutp->slow) {
        struct ofproto_dpif *ofproto;
        ofp_port_t ofp_in_port;

        ofproto = xlate_lookup_ofproto(udpif->backer, &ctx.flow, &ofp_in_port,
                                       NULL);

        ofpbuf_clear(odp_actions);

        if (!ofproto) {
            goto exit;
        }

        compose_slow_path(udpif, xoutp, ctx.flow.in_port.odp_port,
                          ofp_in_port, odp_actions,
                          ofproto->up.slowpath_meter_id, &ofproto->uuid);
    }

    if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &ctx.flow,
                             NULL)
        == ODP_FIT_ERROR) {
        goto exit;
    }

    /* Do not modify if any bit is wildcarded by the installed datapath flow,
     * but not the newly revalidated wildcard mask (wc), i.e., if revalidation
     * tells that the datapath flow is now too generic and must be narrowed
     * down.  Note that we do not know if the datapath has ignored any of the
     * wildcarded bits, so we may be overly conservative here. */
    if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) {
        goto exit;
    }

    // 
    if (!ofpbuf_equal(odp_actions,
                      ovsrcu_get(struct ofpbuf *, &ukey->actions))) {
        /* The datapath mask was OK, but the actions seem to have changed.
         * Let's modify it in place. */
        result = UKEY_MODIFY;
        /* Transfer recirc action ID references to the caller. */
        recirc_refs_swap(recircs, &xoutp->recircs);
        goto exit;
    }

    result = UKEY_KEEP;

exit:
    if (netflow && result == UKEY_DELETE) {
        netflow_flow_clear(netflow, &ctx.flow);
    }
    xlate_out_uninit(xoutp);
    return result;
}

 

 

 

3.3 sweep阶段

    revalidator_sweep间接调用revalidator_sweep__,且purge传值为 false;

   

static void
revalidator_sweep__(struct revalidator *revalidator, bool purge)
{
    struct udpif *udpif;
    uint64_t dump_seq, reval_seq;
    int slice;

    udpif = revalidator->udpif;
    dump_seq = seq_read(udpif->dump_seq);
    reval_seq = seq_read(udpif->reval_seq);
    // udpif->revalidators is the base address of array
    slice = revalidator - udpif->revalidators;
    ovs_assert(slice < udpif->n_revalidators);

    // 遍历所有revalidator线程
    for (int i = slice; i < N_UMAPS; i += udpif->n_revalidators) {
        uint64_t odp_actions_stub[1024 / 8];
        struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);

        struct ukey_op ops[REVALIDATE_MAX_BATCH];
        struct udpif_key *ukey;
        // udpif->ukyes[]
        struct umap *umap = &udpif->ukeys[i];
        size_t n_ops = 0;

        CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
            enum ukey_state ukey_state;

            /* Handler threads could be holding a ukey lock while it installs a
             * new flow, so don't hang around waiting for access to it. */
            if (ovs_mutex_trylock(&ukey->mutex)) {
                continue;
            }
            ukey_state = ukey->state;
            if (ukey_state == UKEY_OPERATIONAL
                || (ukey_state == UKEY_VISIBLE && purge)) { // purge is false 
                struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
                bool seq_mismatch = (ukey->dump_seq != dump_seq
                                     && ukey->reval_seq != reval_seq);
                enum reval_result result;

                if (purge) {
                    result = UKEY_DELETE;
                } else if (!seq_mismatch) {
                    result = UKEY_KEEP;
                COVERAGE
                   _INC(revalidate_missed_dp_flow
                   );
                    memset(&stats, 0, sizeof stats);
                    result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
                                             reval_seq, &recircs, false);
                }
                if (result != UKEY_KEEP) {
                    /* Clears 'recircs' if filled by revalidate_ukey(). */
                    reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs,
                                  &odp_actions);
                }
            }
            ovs_mutex_unlock(&ukey->mutex);

            if (ukey_state == UKEY_EVICTED) {
                /* The common flow deletion case involves deletion of the flow
                 * during the dump phase and ukey deletion here. */
                ovs_mutex_lock(&umap->mutex);
                ukey_delete(umap, ukey);
                ovs_mutex_unlock(&umap->mutex);
            }

            if (n_ops == REVALIDATE_MAX_BATCH) {
                /* Update/delete missed flows and clean up corresponding ukeys
                 * if necessary. */
                push_ukey_ops(udpif, umap, ops, n_ops);
                n_ops = 0;
            }
        }

        if (n_ops) {
            push_ukey_ops(udpif, umap, ops, n_ops);
        }

        ofpbuf_uninit(&odp_actions);
        ovsrcu_quiesce();
    }
}
文章来自个人专栏
ovs
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
1
1