searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

OVS RCU 实现分析

2023-03-30 01:36:52
202
0

RCU锁谈起来许多人云里雾里,听着也觉得高大上,竟然可以不加锁就实现加锁的效果。此外,网上关于RCU实现原理的完整闭环逻辑分析,比较少,大多停留在阻塞写者,等待读者退出临界区的程度。而对如何实现延迟调用,如何判定何时是合适的释放调用时机,由谁来执行延迟调用等实现细节没有覆盖。所以本文着重从这些方面透彻讲解RCU实现,而对于RCU基本原理如内存屏障等、一些概念性的东西在这里就不赘述了,不清楚的可以参考LWN上的经典文章:https://lwn.net/Articles/262464/

RCU锁的广泛运用,得从2002年时内核社区首先发起开始,当然相关技术论文比这更早。这篇文章将从顶层设计谈起,宏观看待实现一个RCU锁,必须实现的一些基本原理与组件,并具体细聊下OVS中用户态RCU锁实现,而内核Hierarchical RCU的实现,后续再详述。

本文主要讨论普通RCU锁,对于睡眠RCU等变体,暂不覆盖。

RCU原理

直接说结论,RCU锁要实现几个关键组件,才能完成逻辑闭环:

  1. 读取时,获取指针用rcu 专用函数。其实是为了避免指令乱序,专门用内存屏障确保读到的指针一定是当前最新指针,因为编译器优化、CPU指令优化等都可能打乱指令序列,导致读取的指针为旧版本。通常可以认为是nop操作,无指令开销。

  2. 读临界区范围内,无睡眠(否则需要用sleeping rcu),这样就可以认为某个线程只要发生停顿,就可以执行延迟调用释放该线程所属的旧版本数据

  3. postpone 机制:rcu postpone 延迟调用,将释放函数挂载到callback list 回调函数链表,等待检测到所有读者执行线程,都经历过至少一次停顿态时,可以统一执行这些回调。本质上类似高级语言的垃圾回收。

  4. 最关键的,延迟调用时机:需要确保每个执行线程,都至少经历过一次停顿,所以本质上bitmap标记即可,每个执行线程只要对应bit位为1了,表示至少发生过一次停顿,即可执行callback list中的回调释放函数。

    但实际实现时,为了使每个使用rcu锁的线程,能不停的新增新版本数据,释放旧版本数据,采用流水线模式,所以通常不会只用一个bit位,而是为每个线程维护一个version字段,小于指定version的,都可以释放掉,而大于指定version的,可能还有其他线程在使用。

  5. 谁来执行延迟调用:这个在各个实现上自行决定。

OVS 用户态 RCU 实现分析

线程本地存储

在多线程程序中,所有线程共享程序中的变量。现在有一全局变量,所有线程都可以使用它,改变它的值。而如果每个线程希望能单独拥有它,那么就需要使用线程存储了。表面上看起来这是一个全局变量,所有线程都可以使用它,而它的值在每一个线程中又是单独存储的

说白了,就是给每个线程分配一个线程独有的存储空间,用于存储线程自己的东西

pthread线程库的线程本地存储使用方法
pthread_key_t key; //定义线程本地存储的载体变量,其实就是个指针

int pthread_key_create(pthread_key_t *key, void (*destructor)(void*));  //创建,所有线程都会有一份key,其对应的value 为NULL 

int pthread_setspecific(pthread_key_t key, const void *value);  //存储线程本地存储的特定key value
void *pthread_getspecific(pthread_key_t key); //读取线程本地存储的特定key value
void pthread_key_delete(pthread_key_t key); //删除存储及关联关系

OVS-RCU 的数据结构基础

每个线程都有自己的回调链,存储在ovsrcu_perthread.cbset字段中。

在未进入停顿态时,注册的延迟回调由线程自己管理,此时ovsrcu_cbset.cbs用于记录当前线程调用ovsrcu_postpone__延迟释放注册的各个回调函数。

而当进入停顿态时,则通过ovsrcu_flush_cbset__将当前线程的ovsrcu_cbset.cbs 全部挂载到flushed_cbsets全局链表中,并刷新flushed_cbsets_seq序号。

struct ovsrcu_cb {
    void (*function)(void *aux);
    void *aux;
};
struct ovsrcu_cbset {
    //当线程未进入停顿态时,此节点不会链入任何链表;当线程进入停顿态时链入全局 flushed_cbsets 链表
    struct ovs_list list_node;        
    struct ovsrcu_cb *cbs;   // 当
    size_t n_allocated;
    int n_cbs;
};
struct ovsrcu_perthread {
    struct ovs_list list_node;  // 链入全局 ovsrcu_threads 链表

    uint64_t seqno;
    struct ovsrcu_cbset *cbset;  // 当前线程的回调函数集合
    char name[16];              // 当前RCU归属的线程名
};

而每当全局需要flushed_cbsets_seq变化时,urcu线程垃圾回收执行函数ovsrcu_postpone_thread都会将当前链表中的函数挨个执行回调,也就是释放结构体:

static void *
ovsrcu_postpone_thread(void *arg OVS_UNUSED)
{
    pthread_detach(pthread_self());

    while (!latch_is_set(&postpone_exit)) {
        uint64_t seqno = seq_read(flushed_cbsets_seq);
        //挨个执行全局链表flushed_cbsets中的回调,释放结构体
        if (!ovsrcu_call_postponed()) {
            //等待全局刷新序号flushed_cbsets_seq变化。当变化时,代表所有线程都经过了一次停顿期,所以可以执行下一轮刷新
            seq_wait(flushed_cbsets_seq, seqno);
            latch_wait(&postpone_exit);
            poll_block();
        }
    }

    ovs_barrier_block(&postpone_barrier);
    return NULL;
}

OVS RCU 线程启动时,为每个线程都分配一份 perthread_key,其中存放的就是ovsrcu_perthread 结构体。而全局变量 ovsrcu_threads 则将所有的ovs 线程都管理起来:

static void
ovsrcu_init_module(void)
{
    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
    if (ovsthread_once_start(&once)) {
        ......
        /*为每个线程都分配一份 `perthread_key`, 但其对应的value 还是NULL 值*/
        xpthread_key_create(&perthread_key, ovsrcu_thread_exit_cb);
		
        ovs_list_init(&ovsrcu_threads);
		......
        ovsthread_once_done(&once);
    }
}

static struct ovsrcu_perthread *
ovsrcu_perthread_get(void)
{
    struct ovsrcu_perthread *perthread;

    ovsrcu_init_module();

    perthread = pthread_getspecific(perthread_key);
    if (!perthread) {
        const char *name = get_subprogram_name();

        perthread = xmalloc(sizeof *perthread);
        perthread->seqno = seq_read(global_seqno);
        perthread->cbset = NULL;
        ovs_strlcpy(perthread->name, name[0] ? name : "main",
                    sizeof perthread->name);

        ovs_mutex_lock(&ovsrcu_threads_mutex);
        ovs_list_push_back(&ovsrcu_threads, &perthread->list_node);
        ovs_mutex_unlock(&ovsrcu_threads_mutex);
		// 为每个线程分配对应的ovsrcu_perthread 私有数据
        pthread_setspecific(perthread_key, perthread);
    }
    return perthread;
}

OVS-RCU 的线程停顿态标记

RCU 锁里面,必须标记每个执行线程是否经历过至少一次停顿态。因为对于常规rcu锁(除开sleeping-rcu以外)而言,临界区都是没有睡眠动作的。所以只要睡眠停顿过,也就意味着其已经退出了读临界区,所以此时是可以释放旧数据的。

这类标记本质上用bit位来标记正好,只需要标记是否已经经历过。OVS 做的复杂些,用了一个结构体的指针,但是本质上也是一样的,只是判断对应指针是否位NULL。如果为NULL,就代表其经历过一次停顿了

bool
ovsrcu_is_quiescent(void)
{
    ovsrcu_init_module();
    return pthread_getspecific(perthread_key) == NULL;
}

进入停顿态时,ovsrcu_quisce_start会置位perthread_key为NULL,同时会刷新当前的已经注册的回调链表到全局回调链表flushed_cbsets中:

/* Indicates the beginning of a quiescent state.  See "Details" near the top of
 * ovs-rcu.h. */
void
ovsrcu_quiesce_start(void)
{
    struct ovsrcu_perthread *perthread;

    ovsrcu_init_module();
    perthread = pthread_getspecific(perthread_key);
    if (perthread) {
        // 设置当前线程的perthread_key为NULL
        pthread_setspecific(perthread_key, NULL);
        // 刷新当前线程目前的回调链表到全局回调链表flushed_cbsets中,并释放当前perthread指向的ovsrcu_perthread结构体
        ovsrcu_unregister__(perthread);
    }

    ovsrcu_quiesced();
}

如果不为NULL,就表示其目前处在非停顿态,也就是Active活跃态

/* Indicates the end of a quiescent state.  See "Details" near the top of
 * ovs-rcu.h.
 *
 * Quiescent states don't stack or nest, so this always ends a quiescent state
 * even if ovsrcu_quiesce_start() was called multiple times in a row. */
void
ovsrcu_quiesce_end(void)
{
    ovsrcu_perthread_get();  //该函数会设置perthread_key不为NULL
}

此外,注意OVS RCU的实现细节:

static struct ovsrcu_perthread *
ovsrcu_perthread_get(void)
{
    struct ovsrcu_perthread *perthread;

    ovsrcu_init_module();

    perthread = pthread_getspecific(perthread_key);
    if (!perthread) {
        const char *name = get_subprogram_name();

        perthread = xmalloc(sizeof *perthread);
        perthread->seqno = seq_read(global_seqno);   //设置当前的执行编号,只要下次
        perthread->cbset = NULL;
        ovs_strlcpy(perthread->name, name[0] ? name : "main",
                    sizeof perthread->name);

        ovs_mutex_lock(&ovsrcu_threads_mutex);
        ovs_list_push_back(&ovsrcu_threads, &perthread->list_node); //所有需要追踪停顿态的线程,都会push到该链表中
        ovs_mutex_unlock(&ovsrcu_threads_mutex);

        pthread_setspecific(perthread_key, perthread);  //初始化时,设置为非空,表示当前是非停顿态
    }
    return perthread;
}

最开始是main线程,在main线程fork出新的线程后,main线程自动转换为非停顿状态:

void
ovsrcu_quiesce_end(void)
{
    ovsrcu_perthread_get();
}
pthread_t
ovs_thread_create(const char *name, void *(*start)(void *), void *arg)
{
        if (ovsthread_once_start(&once)) {
            //首先设置main线程为非停顿态
            /* The first call to this function has to happen in the main thread.
             * Before the process becomes multithreaded we make sure that the
             * main thread is considered non quiescent.
             *
             * For other threads this is done in ovs_thread_wrapper(), but the
             * main thread has no such wrapper.
             *
             * There's no reason to call ovsrcu_quiesce_end() in subsequent
             * invocations of this function and it might introduce problems
             * for other threads. */
            ovsrcu_quiesce_end();
            ovsthread_once_done(&once);
        }
        //接下来才是真正的线程创建
        error = pthread_create(&thread, &attr, ovsthread_wrapper, aux);
}

这里ovsrcu_perthread_get会自动设置当前线程,也就是main线程的线程本地存储指针为非NULL值,从而表示当前线程为非停顿态。

综上:按照RCU理论,ovs_postpone发起的延迟滞后释放操作(这些操作通常挂载到了一个list上,ovs是挂到了flushed_cbsets链表上),原则上,只要所有线程的线程本地存储perthread_key都经历过至少一次NULL值,则就可以执行了。

何时标记停顿态

OVS在很多停顿函数中,都嵌入了ovsrcu_quiesce_start()ovs_rcu_quiesce_end()函数,用于标记进入了停顿态,如

/* High resolution sleep with thread quiesce. */
void
xnanosleep(uint64_t nanoseconds)
{
    ovsrcu_quiesce_start();
    xnanosleep__(nanoseconds);
    ovsrcu_quiesce_end();
}

ovsrcu_quiesce_start会进而调用ovsrcu_unregister__来标记进入停顿态,并刷新局部回调链表到全局链表。

static void
ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
{
    if (perthread->cbset) {
        //这里直接将当前线程的回调刷到全局链表flushed_cbsets
        ovsrcu_flush_cbset(perthread);
    }

    ovs_mutex_lock(&ovsrcu_threads_mutex);
    ovs_list_remove(&perthread->list_node);
    ovs_mutex_unlock(&ovsrcu_threads_mutex);

    free(perthread);

    seq_change(global_seqno);
}

此外,许多非PMD 线程如offload 线程、revalidate线程等,执行完自己本职任务后,在线程尾部定期调用来给RCU做gc。这些则是通过直接调用另一个接口ovsrcu_quiesce来定期将回调刷新到全局链表的:

void
ovsrcu_quiesce(void)
{
    struct ovsrcu_perthread *perthread;

    perthread = ovsrcu_perthread_get();
    perthread->seqno = seq_read(global_seqno);  //设置当前线程的停顿编号为当前的global_seqno
    if (perthread->cbset) {
        //将当前线程的回调刷到全局链表flushed_cbsets
        ovsrcu_flush_cbset(perthread);
    }
    seq_change(global_seqno); // 递增全局的下一次停顿编号

    ovsrcu_quiesced();
}

而对于PMD之类的实现,因为其压根没有sleep等停顿逻辑,则是通过PMD主动调用ovsrcu_try_quiesce来将当前线程的回调链表刷新到全局flushed_cbsets链表的:

int
ovsrcu_try_quiesce(void)
{
    struct ovsrcu_perthread *perthread;
    int ret = EBUSY;

    ovs_assert(!single_threaded());
    perthread = ovsrcu_perthread_get();
    if (!seq_try_lock()) {
        perthread->seqno = seq_read_protected(global_seqno);
        if (perthread->cbset) {
            //这里直接将当前线程的回调刷到全局链表flushed_cbsets
            ovsrcu_flush_cbset__(perthread, true);
        }
        seq_change_protected(global_seqno);
        seq_unlock();
        ovsrcu_quiesced();
        ret = 0;
    }
    return ret;
}

这些实现,其实都是做两件事情:

  1. 刷新当前线程的回调函数集合到全局链表

  2. 更新global_seqno、flushed_cbsets_seq版本号

这些都仍然是在当前线程范围内,当当前线程经历一次停顿态后,刷新到全局。但仍然无法保证其他持有旧版本数据指针的线程,已经经过了一次停顿态。所以问题来了。

谁,以及何时来执行延迟调用

前面已经看到,ovsrcu_flush_cbset将当前线程在这一次周期内发起的所有postpone延迟调用函数(在perthread->cbset中管理),提交到全局延迟调用列表flushed_cbsets中。而当前线程自己,则刷新当前局部调用后,直接无阻塞进入下一轮的rcu周期,通过递增全局global_seqno代表全局rcu周期

static void
ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
{
    struct ovsrcu_cbset *cbset = perthread->cbset;

    if (cbset) {
        guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
        perthread->cbset = NULL;

        if (protected) {
            seq_change_protected(flushed_cbsets_seq);
        } else {
            seq_change(flushed_cbsets_seq);
        }
    }
}

那么,如最初RCU原理一节提到,显然,必须要等待经过至少一次全局停顿后,才能执行全局回调flushed_cbsets中的回调函数。到底如何保证,在所有线程都经过全局停顿呢?全局停顿后,谁来执行全局回调链表中的回调函数呢?答案是urcu线程,通过global_seqno确保所有线程都经历过全局停顿;并由该线程来负责执行全局回调。

OVS实现了专门的urcu线程ovsrcu_postpone_thread来负责清理:

static void ovsrcu_quiesced(void)
{
    if (single_threaded()) {
        ovsrcu_call_postponed();
    } else {
        static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
        if (ovsthread_once_start(&once)) {
            latch_init(&postpone_exit);
            ovs_barrier_init(&postpone_barrier, 2);
            ovs_thread_create("urcu", ovsrcu_postpone_thread, NULL);
            ovsthread_once_done(&once);
        }
    }
}

如果是单线程,那么当前线程执行ovsrcu_quiesced()时,就认为是其已经经历过停顿期(可以看该函数的调用点,都是在停顿期完成后调用),不用再看其他不存在的线程的引用问题,直接执行释放。我们着重来看多线程模式下,ovs urcu 线程实现:

static void *
ovsrcu_postpone_thread(void *arg OVS_UNUSED)
{
    pthread_detach(pthread_self());

    while (!latch_is_set(&postpone_exit)) {
        uint64_t seqno = seq_read(flushed_cbsets_seq);
         //不停摘下全局回调链表,并在等待合适时间执行回调以释放旧版本数据,注意,该函数会阻塞等待
        if (!ovsrcu_call_postponed()) {
            //当 flushed_cbsets_seq变化时,代表flushed_cbsets链表有变化,有新入队的回调需要处理
            seq_wait(flushed_cbsets_seq, seqno);
            latch_wait(&postpone_exit);
            poll_block();
        }
    }

    ovs_barrier_block(&postpone_barrier);
    return NULL;
}

主要实现还是在ovsrcu_call_postponed:

static bool
ovsrcu_call_postponed(void)
{
    struct ovsrcu_cbset *cbset;
    struct ovs_list cbsets;

    //摘下全局回调链表,等待合适时间处理
    guarded_list_pop_all(&flushed_cbsets, &cbsets);
    if (ovs_list_is_empty(&cbsets)) {
        return false;
    }

    ovsrcu_synchronize();   //这里阻塞等待所有线程seqno大于最开始的global_seqno

    //挨个处理全局回调链表,
    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
        struct ovsrcu_cb *cb;

        for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
            cb->function(cb->aux);
        }
        free(cbset->cbs);
        free(cbset);
    }

    return true;
}

在这里可以看到,urcu 线程就是负责实际执行延迟调用的实体!他将每个使用rcu机制的线程,在每个线程各自停顿期内将自己延迟调用挂载到全局flushed_cbsets链表的所有回调,摘取下来,挨个执行。

Ok,现在我们明白了谁来执行延迟调用。那么,如前文所述,urcu 线程需要确定全局每个使用rcu机制的线程,都经历过至少一次停顿。如何实现的呢?答案就是在ovsrcu_synchronize中等待global_seqno实现的。

但在具体看实现前,我们先搞清楚,我们为什么需要seqno。

不是只需要bit位标志是否经历过停顿期就可以了吗?为什么还需要seqno?这是静态思想,我们要动态看待这个问题。因为当前线程经历了停顿期,刷新回调函数到全局链表后,继续进入v2.0数据版本的线程,它还需要继续执行,继续在非停顿态执行,甚至会再次进入rcu 临界区,并再次将当前v2.0数据标记为v3.0数据版本,v4.0版本等等。所以,可能存在当前线程已经有两三个旧数据版本刷新挂在到全局回调列表flushed_cbsets,当前线程已经进入v10.0时代,而其他线程还在第一个rcu周期,处于v1.0状态,还未经过停顿态,进入v2.0版本。

所以,这里perthread->seqno其实是当前线程的数据版本号,取自线程经历停顿期时当时的global_seqno。而global_seqno代表的是全局版本号,且每个线程经历一次停顿期时,全局版本号都要被递增1。

接下来再看ovs rcu的停顿等待机制,在ovsrcu_synchronize()内:

void ovsrcu_synchronize(void)
{
    unsigned int warning_threshold = 1000;
    uint64_t target_seqno;
    long long int start;

    if (single_threaded()) {
        return;
    }

    //先定个小目标,确定当前要释放小于哪个版本的旧数据,假设比如是v4.0版本旧数据
    target_seqno = seq_read(global_seqno);
    ovsrcu_quiesce_start();
    start = time_msec();

    for (;;) {
        //不停循环,可能当前cur_seqno远大于target_seq,语义上等效于
        //全局版本进入v10.0时代,而当前urcu线程,需要释放v4.0时代的旧版本数据
        uint64_t cur_seqno = seq_read(global_seqno);  
        struct ovsrcu_perthread *perthread;
        char stalled_thread[16];
        unsigned int elapsed;
        bool done = true;

        ovs_mutex_lock(&ovsrcu_threads_mutex);
        LIST_FOR_EACH (perthread, list_node, &ovsrcu_threads) {
            //这里是关键,循环检测所有ovs线程,比对线程的数据版本号和v4.0版本,看是否还有线程还在用v3.0版本
            if (perthread->seqno <= target_seqno) {
                ovs_strlcpy_arrays(stalled_thread, perthread->name);
                //如果还有线程在用v3.0版本,很显然,我们现在还不能执行v4.0版本的释放
                done = false;
                break;
            }
        }
        ovs_mutex_unlock(&ovsrcu_threads_mutex);

        if (done) {
            //如果所有线程的perthread->seqno都大于target_seqno,
            // 即所有线程都没有用v4.0版本以前的数据了,
            // 则我们不必再等待了,可以释放旧版本数据
            break;
        }

        elapsed = time_msec() - start;
        if (elapsed >= warning_threshold) {
            VLOG_WARN("blocked %u ms waiting for %s to quiesce",
                      elapsed, stalled_thread);
            warning_threshold *= 2;
        }
        poll_timer_wait_until(start + warning_threshold);

        //还有线程在用旧版本v3.0数据,所以等待global_seqno变化我们再做循环检测。
        //而只有当有线程进入停顿态,才会导致global_seq变化
        seq_wait(global_seqno, cur_seqno);
        poll_block();
    }
    ovsrcu_quiesce_end();
}

进函数时,读取当前周期编号global_seq,定义为目标序号target_seqno,随后for循环内持续等待,直到每个线程的seqno 都大于目标序号。其余代码实现,都是为了如果当前有线程的seqno 小于目标序号,则显然还有线程还可能在使用旧版本数据,则继续循环检测 + poll_block等待。

所以,ovs urcu线程就是不停记录global_seqno为target_seqno并开始等待,等到所有线程的版本号都大于了刚进入时记录的target_seqno 后,就删除一波,如此不停删除做垃圾回收。因此,如果urcu线程执行过长,就会导致全局回调链表中挂载许多回调释放函数,系统垃圾较多。

同样,这也理解为何如果某个ovs线程没有睡眠,ovs日志中会出现urcu线程报错:"blocked %u ms waiting for %s to quiesce"。其实这是因为这个线程一直死循环,没有经过停顿态,其perthread->seqno没有增长,导致urcu线程认为该线程一直在持有老的引用。初看似乎没什么,但其实这会导致urcu 线程一直在循环等待旧版本数据的持有者释放引用,并执行垃圾回收。但如果那个线程因为没有调用过rcu的停顿态函数,导致perthread->seqno一直没有增长,那么整个ovs的rcu机制都会空转等待,所有垃圾回收都无法执行!

Ok,至此,OVS 的 RCU 实现机制厘清。

参考文档:

  1. 线程本地存储:https://www.jianshu.com/p/d52c1ebf808a

  2. lwn.net 的 RCU 介绍:https://lwn.net/Articles/262464/

  3. OVS RCU 文档:https://github.com/openvswitch/ovs/blob/master/lib/ovs-rcu.h

0条评论
0 / 1000
李****易
4文章数
1粉丝数
李****易
4 文章 | 1 粉丝
李****易
4文章数
1粉丝数
李****易
4 文章 | 1 粉丝
原创

OVS RCU 实现分析

2023-03-30 01:36:52
202
0

RCU锁谈起来许多人云里雾里,听着也觉得高大上,竟然可以不加锁就实现加锁的效果。此外,网上关于RCU实现原理的完整闭环逻辑分析,比较少,大多停留在阻塞写者,等待读者退出临界区的程度。而对如何实现延迟调用,如何判定何时是合适的释放调用时机,由谁来执行延迟调用等实现细节没有覆盖。所以本文着重从这些方面透彻讲解RCU实现,而对于RCU基本原理如内存屏障等、一些概念性的东西在这里就不赘述了,不清楚的可以参考LWN上的经典文章:https://lwn.net/Articles/262464/

RCU锁的广泛运用,得从2002年时内核社区首先发起开始,当然相关技术论文比这更早。这篇文章将从顶层设计谈起,宏观看待实现一个RCU锁,必须实现的一些基本原理与组件,并具体细聊下OVS中用户态RCU锁实现,而内核Hierarchical RCU的实现,后续再详述。

本文主要讨论普通RCU锁,对于睡眠RCU等变体,暂不覆盖。

RCU原理

直接说结论,RCU锁要实现几个关键组件,才能完成逻辑闭环:

  1. 读取时,获取指针用rcu 专用函数。其实是为了避免指令乱序,专门用内存屏障确保读到的指针一定是当前最新指针,因为编译器优化、CPU指令优化等都可能打乱指令序列,导致读取的指针为旧版本。通常可以认为是nop操作,无指令开销。

  2. 读临界区范围内,无睡眠(否则需要用sleeping rcu),这样就可以认为某个线程只要发生停顿,就可以执行延迟调用释放该线程所属的旧版本数据

  3. postpone 机制:rcu postpone 延迟调用,将释放函数挂载到callback list 回调函数链表,等待检测到所有读者执行线程,都经历过至少一次停顿态时,可以统一执行这些回调。本质上类似高级语言的垃圾回收。

  4. 最关键的,延迟调用时机:需要确保每个执行线程,都至少经历过一次停顿,所以本质上bitmap标记即可,每个执行线程只要对应bit位为1了,表示至少发生过一次停顿,即可执行callback list中的回调释放函数。

    但实际实现时,为了使每个使用rcu锁的线程,能不停的新增新版本数据,释放旧版本数据,采用流水线模式,所以通常不会只用一个bit位,而是为每个线程维护一个version字段,小于指定version的,都可以释放掉,而大于指定version的,可能还有其他线程在使用。

  5. 谁来执行延迟调用:这个在各个实现上自行决定。

OVS 用户态 RCU 实现分析

线程本地存储

在多线程程序中,所有线程共享程序中的变量。现在有一全局变量,所有线程都可以使用它,改变它的值。而如果每个线程希望能单独拥有它,那么就需要使用线程存储了。表面上看起来这是一个全局变量,所有线程都可以使用它,而它的值在每一个线程中又是单独存储的

说白了,就是给每个线程分配一个线程独有的存储空间,用于存储线程自己的东西

pthread线程库的线程本地存储使用方法
pthread_key_t key; //定义线程本地存储的载体变量,其实就是个指针

int pthread_key_create(pthread_key_t *key, void (*destructor)(void*));  //创建,所有线程都会有一份key,其对应的value 为NULL 

int pthread_setspecific(pthread_key_t key, const void *value);  //存储线程本地存储的特定key value
void *pthread_getspecific(pthread_key_t key); //读取线程本地存储的特定key value
void pthread_key_delete(pthread_key_t key); //删除存储及关联关系

OVS-RCU 的数据结构基础

每个线程都有自己的回调链,存储在ovsrcu_perthread.cbset字段中。

在未进入停顿态时,注册的延迟回调由线程自己管理,此时ovsrcu_cbset.cbs用于记录当前线程调用ovsrcu_postpone__延迟释放注册的各个回调函数。

而当进入停顿态时,则通过ovsrcu_flush_cbset__将当前线程的ovsrcu_cbset.cbs 全部挂载到flushed_cbsets全局链表中,并刷新flushed_cbsets_seq序号。

struct ovsrcu_cb {
    void (*function)(void *aux);
    void *aux;
};
struct ovsrcu_cbset {
    //当线程未进入停顿态时,此节点不会链入任何链表;当线程进入停顿态时链入全局 flushed_cbsets 链表
    struct ovs_list list_node;        
    struct ovsrcu_cb *cbs;   // 当
    size_t n_allocated;
    int n_cbs;
};
struct ovsrcu_perthread {
    struct ovs_list list_node;  // 链入全局 ovsrcu_threads 链表

    uint64_t seqno;
    struct ovsrcu_cbset *cbset;  // 当前线程的回调函数集合
    char name[16];              // 当前RCU归属的线程名
};

而每当全局需要flushed_cbsets_seq变化时,urcu线程垃圾回收执行函数ovsrcu_postpone_thread都会将当前链表中的函数挨个执行回调,也就是释放结构体:

static void *
ovsrcu_postpone_thread(void *arg OVS_UNUSED)
{
    pthread_detach(pthread_self());

    while (!latch_is_set(&postpone_exit)) {
        uint64_t seqno = seq_read(flushed_cbsets_seq);
        //挨个执行全局链表flushed_cbsets中的回调,释放结构体
        if (!ovsrcu_call_postponed()) {
            //等待全局刷新序号flushed_cbsets_seq变化。当变化时,代表所有线程都经过了一次停顿期,所以可以执行下一轮刷新
            seq_wait(flushed_cbsets_seq, seqno);
            latch_wait(&postpone_exit);
            poll_block();
        }
    }

    ovs_barrier_block(&postpone_barrier);
    return NULL;
}

OVS RCU 线程启动时,为每个线程都分配一份 perthread_key,其中存放的就是ovsrcu_perthread 结构体。而全局变量 ovsrcu_threads 则将所有的ovs 线程都管理起来:

static void
ovsrcu_init_module(void)
{
    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
    if (ovsthread_once_start(&once)) {
        ......
        /*为每个线程都分配一份 `perthread_key`, 但其对应的value 还是NULL 值*/
        xpthread_key_create(&perthread_key, ovsrcu_thread_exit_cb);
		
        ovs_list_init(&ovsrcu_threads);
		......
        ovsthread_once_done(&once);
    }
}

static struct ovsrcu_perthread *
ovsrcu_perthread_get(void)
{
    struct ovsrcu_perthread *perthread;

    ovsrcu_init_module();

    perthread = pthread_getspecific(perthread_key);
    if (!perthread) {
        const char *name = get_subprogram_name();

        perthread = xmalloc(sizeof *perthread);
        perthread->seqno = seq_read(global_seqno);
        perthread->cbset = NULL;
        ovs_strlcpy(perthread->name, name[0] ? name : "main",
                    sizeof perthread->name);

        ovs_mutex_lock(&ovsrcu_threads_mutex);
        ovs_list_push_back(&ovsrcu_threads, &perthread->list_node);
        ovs_mutex_unlock(&ovsrcu_threads_mutex);
		// 为每个线程分配对应的ovsrcu_perthread 私有数据
        pthread_setspecific(perthread_key, perthread);
    }
    return perthread;
}

OVS-RCU 的线程停顿态标记

RCU 锁里面,必须标记每个执行线程是否经历过至少一次停顿态。因为对于常规rcu锁(除开sleeping-rcu以外)而言,临界区都是没有睡眠动作的。所以只要睡眠停顿过,也就意味着其已经退出了读临界区,所以此时是可以释放旧数据的。

这类标记本质上用bit位来标记正好,只需要标记是否已经经历过。OVS 做的复杂些,用了一个结构体的指针,但是本质上也是一样的,只是判断对应指针是否位NULL。如果为NULL,就代表其经历过一次停顿了

bool
ovsrcu_is_quiescent(void)
{
    ovsrcu_init_module();
    return pthread_getspecific(perthread_key) == NULL;
}

进入停顿态时,ovsrcu_quisce_start会置位perthread_key为NULL,同时会刷新当前的已经注册的回调链表到全局回调链表flushed_cbsets中:

/* Indicates the beginning of a quiescent state.  See "Details" near the top of
 * ovs-rcu.h. */
void
ovsrcu_quiesce_start(void)
{
    struct ovsrcu_perthread *perthread;

    ovsrcu_init_module();
    perthread = pthread_getspecific(perthread_key);
    if (perthread) {
        // 设置当前线程的perthread_key为NULL
        pthread_setspecific(perthread_key, NULL);
        // 刷新当前线程目前的回调链表到全局回调链表flushed_cbsets中,并释放当前perthread指向的ovsrcu_perthread结构体
        ovsrcu_unregister__(perthread);
    }

    ovsrcu_quiesced();
}

如果不为NULL,就表示其目前处在非停顿态,也就是Active活跃态

/* Indicates the end of a quiescent state.  See "Details" near the top of
 * ovs-rcu.h.
 *
 * Quiescent states don't stack or nest, so this always ends a quiescent state
 * even if ovsrcu_quiesce_start() was called multiple times in a row. */
void
ovsrcu_quiesce_end(void)
{
    ovsrcu_perthread_get();  //该函数会设置perthread_key不为NULL
}

此外,注意OVS RCU的实现细节:

static struct ovsrcu_perthread *
ovsrcu_perthread_get(void)
{
    struct ovsrcu_perthread *perthread;

    ovsrcu_init_module();

    perthread = pthread_getspecific(perthread_key);
    if (!perthread) {
        const char *name = get_subprogram_name();

        perthread = xmalloc(sizeof *perthread);
        perthread->seqno = seq_read(global_seqno);   //设置当前的执行编号,只要下次
        perthread->cbset = NULL;
        ovs_strlcpy(perthread->name, name[0] ? name : "main",
                    sizeof perthread->name);

        ovs_mutex_lock(&ovsrcu_threads_mutex);
        ovs_list_push_back(&ovsrcu_threads, &perthread->list_node); //所有需要追踪停顿态的线程,都会push到该链表中
        ovs_mutex_unlock(&ovsrcu_threads_mutex);

        pthread_setspecific(perthread_key, perthread);  //初始化时,设置为非空,表示当前是非停顿态
    }
    return perthread;
}

最开始是main线程,在main线程fork出新的线程后,main线程自动转换为非停顿状态:

void
ovsrcu_quiesce_end(void)
{
    ovsrcu_perthread_get();
}
pthread_t
ovs_thread_create(const char *name, void *(*start)(void *), void *arg)
{
        if (ovsthread_once_start(&once)) {
            //首先设置main线程为非停顿态
            /* The first call to this function has to happen in the main thread.
             * Before the process becomes multithreaded we make sure that the
             * main thread is considered non quiescent.
             *
             * For other threads this is done in ovs_thread_wrapper(), but the
             * main thread has no such wrapper.
             *
             * There's no reason to call ovsrcu_quiesce_end() in subsequent
             * invocations of this function and it might introduce problems
             * for other threads. */
            ovsrcu_quiesce_end();
            ovsthread_once_done(&once);
        }
        //接下来才是真正的线程创建
        error = pthread_create(&thread, &attr, ovsthread_wrapper, aux);
}

这里ovsrcu_perthread_get会自动设置当前线程,也就是main线程的线程本地存储指针为非NULL值,从而表示当前线程为非停顿态。

综上:按照RCU理论,ovs_postpone发起的延迟滞后释放操作(这些操作通常挂载到了一个list上,ovs是挂到了flushed_cbsets链表上),原则上,只要所有线程的线程本地存储perthread_key都经历过至少一次NULL值,则就可以执行了。

何时标记停顿态

OVS在很多停顿函数中,都嵌入了ovsrcu_quiesce_start()ovs_rcu_quiesce_end()函数,用于标记进入了停顿态,如

/* High resolution sleep with thread quiesce. */
void
xnanosleep(uint64_t nanoseconds)
{
    ovsrcu_quiesce_start();
    xnanosleep__(nanoseconds);
    ovsrcu_quiesce_end();
}

ovsrcu_quiesce_start会进而调用ovsrcu_unregister__来标记进入停顿态,并刷新局部回调链表到全局链表。

static void
ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
{
    if (perthread->cbset) {
        //这里直接将当前线程的回调刷到全局链表flushed_cbsets
        ovsrcu_flush_cbset(perthread);
    }

    ovs_mutex_lock(&ovsrcu_threads_mutex);
    ovs_list_remove(&perthread->list_node);
    ovs_mutex_unlock(&ovsrcu_threads_mutex);

    free(perthread);

    seq_change(global_seqno);
}

此外,许多非PMD 线程如offload 线程、revalidate线程等,执行完自己本职任务后,在线程尾部定期调用来给RCU做gc。这些则是通过直接调用另一个接口ovsrcu_quiesce来定期将回调刷新到全局链表的:

void
ovsrcu_quiesce(void)
{
    struct ovsrcu_perthread *perthread;

    perthread = ovsrcu_perthread_get();
    perthread->seqno = seq_read(global_seqno);  //设置当前线程的停顿编号为当前的global_seqno
    if (perthread->cbset) {
        //将当前线程的回调刷到全局链表flushed_cbsets
        ovsrcu_flush_cbset(perthread);
    }
    seq_change(global_seqno); // 递增全局的下一次停顿编号

    ovsrcu_quiesced();
}

而对于PMD之类的实现,因为其压根没有sleep等停顿逻辑,则是通过PMD主动调用ovsrcu_try_quiesce来将当前线程的回调链表刷新到全局flushed_cbsets链表的:

int
ovsrcu_try_quiesce(void)
{
    struct ovsrcu_perthread *perthread;
    int ret = EBUSY;

    ovs_assert(!single_threaded());
    perthread = ovsrcu_perthread_get();
    if (!seq_try_lock()) {
        perthread->seqno = seq_read_protected(global_seqno);
        if (perthread->cbset) {
            //这里直接将当前线程的回调刷到全局链表flushed_cbsets
            ovsrcu_flush_cbset__(perthread, true);
        }
        seq_change_protected(global_seqno);
        seq_unlock();
        ovsrcu_quiesced();
        ret = 0;
    }
    return ret;
}

这些实现,其实都是做两件事情:

  1. 刷新当前线程的回调函数集合到全局链表

  2. 更新global_seqno、flushed_cbsets_seq版本号

这些都仍然是在当前线程范围内,当当前线程经历一次停顿态后,刷新到全局。但仍然无法保证其他持有旧版本数据指针的线程,已经经过了一次停顿态。所以问题来了。

谁,以及何时来执行延迟调用

前面已经看到,ovsrcu_flush_cbset将当前线程在这一次周期内发起的所有postpone延迟调用函数(在perthread->cbset中管理),提交到全局延迟调用列表flushed_cbsets中。而当前线程自己,则刷新当前局部调用后,直接无阻塞进入下一轮的rcu周期,通过递增全局global_seqno代表全局rcu周期

static void
ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
{
    struct ovsrcu_cbset *cbset = perthread->cbset;

    if (cbset) {
        guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
        perthread->cbset = NULL;

        if (protected) {
            seq_change_protected(flushed_cbsets_seq);
        } else {
            seq_change(flushed_cbsets_seq);
        }
    }
}

那么,如最初RCU原理一节提到,显然,必须要等待经过至少一次全局停顿后,才能执行全局回调flushed_cbsets中的回调函数。到底如何保证,在所有线程都经过全局停顿呢?全局停顿后,谁来执行全局回调链表中的回调函数呢?答案是urcu线程,通过global_seqno确保所有线程都经历过全局停顿;并由该线程来负责执行全局回调。

OVS实现了专门的urcu线程ovsrcu_postpone_thread来负责清理:

static void ovsrcu_quiesced(void)
{
    if (single_threaded()) {
        ovsrcu_call_postponed();
    } else {
        static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
        if (ovsthread_once_start(&once)) {
            latch_init(&postpone_exit);
            ovs_barrier_init(&postpone_barrier, 2);
            ovs_thread_create("urcu", ovsrcu_postpone_thread, NULL);
            ovsthread_once_done(&once);
        }
    }
}

如果是单线程,那么当前线程执行ovsrcu_quiesced()时,就认为是其已经经历过停顿期(可以看该函数的调用点,都是在停顿期完成后调用),不用再看其他不存在的线程的引用问题,直接执行释放。我们着重来看多线程模式下,ovs urcu 线程实现:

static void *
ovsrcu_postpone_thread(void *arg OVS_UNUSED)
{
    pthread_detach(pthread_self());

    while (!latch_is_set(&postpone_exit)) {
        uint64_t seqno = seq_read(flushed_cbsets_seq);
         //不停摘下全局回调链表,并在等待合适时间执行回调以释放旧版本数据,注意,该函数会阻塞等待
        if (!ovsrcu_call_postponed()) {
            //当 flushed_cbsets_seq变化时,代表flushed_cbsets链表有变化,有新入队的回调需要处理
            seq_wait(flushed_cbsets_seq, seqno);
            latch_wait(&postpone_exit);
            poll_block();
        }
    }

    ovs_barrier_block(&postpone_barrier);
    return NULL;
}

主要实现还是在ovsrcu_call_postponed:

static bool
ovsrcu_call_postponed(void)
{
    struct ovsrcu_cbset *cbset;
    struct ovs_list cbsets;

    //摘下全局回调链表,等待合适时间处理
    guarded_list_pop_all(&flushed_cbsets, &cbsets);
    if (ovs_list_is_empty(&cbsets)) {
        return false;
    }

    ovsrcu_synchronize();   //这里阻塞等待所有线程seqno大于最开始的global_seqno

    //挨个处理全局回调链表,
    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
        struct ovsrcu_cb *cb;

        for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
            cb->function(cb->aux);
        }
        free(cbset->cbs);
        free(cbset);
    }

    return true;
}

在这里可以看到,urcu 线程就是负责实际执行延迟调用的实体!他将每个使用rcu机制的线程,在每个线程各自停顿期内将自己延迟调用挂载到全局flushed_cbsets链表的所有回调,摘取下来,挨个执行。

Ok,现在我们明白了谁来执行延迟调用。那么,如前文所述,urcu 线程需要确定全局每个使用rcu机制的线程,都经历过至少一次停顿。如何实现的呢?答案就是在ovsrcu_synchronize中等待global_seqno实现的。

但在具体看实现前,我们先搞清楚,我们为什么需要seqno。

不是只需要bit位标志是否经历过停顿期就可以了吗?为什么还需要seqno?这是静态思想,我们要动态看待这个问题。因为当前线程经历了停顿期,刷新回调函数到全局链表后,继续进入v2.0数据版本的线程,它还需要继续执行,继续在非停顿态执行,甚至会再次进入rcu 临界区,并再次将当前v2.0数据标记为v3.0数据版本,v4.0版本等等。所以,可能存在当前线程已经有两三个旧数据版本刷新挂在到全局回调列表flushed_cbsets,当前线程已经进入v10.0时代,而其他线程还在第一个rcu周期,处于v1.0状态,还未经过停顿态,进入v2.0版本。

所以,这里perthread->seqno其实是当前线程的数据版本号,取自线程经历停顿期时当时的global_seqno。而global_seqno代表的是全局版本号,且每个线程经历一次停顿期时,全局版本号都要被递增1。

接下来再看ovs rcu的停顿等待机制,在ovsrcu_synchronize()内:

void ovsrcu_synchronize(void)
{
    unsigned int warning_threshold = 1000;
    uint64_t target_seqno;
    long long int start;

    if (single_threaded()) {
        return;
    }

    //先定个小目标,确定当前要释放小于哪个版本的旧数据,假设比如是v4.0版本旧数据
    target_seqno = seq_read(global_seqno);
    ovsrcu_quiesce_start();
    start = time_msec();

    for (;;) {
        //不停循环,可能当前cur_seqno远大于target_seq,语义上等效于
        //全局版本进入v10.0时代,而当前urcu线程,需要释放v4.0时代的旧版本数据
        uint64_t cur_seqno = seq_read(global_seqno);  
        struct ovsrcu_perthread *perthread;
        char stalled_thread[16];
        unsigned int elapsed;
        bool done = true;

        ovs_mutex_lock(&ovsrcu_threads_mutex);
        LIST_FOR_EACH (perthread, list_node, &ovsrcu_threads) {
            //这里是关键,循环检测所有ovs线程,比对线程的数据版本号和v4.0版本,看是否还有线程还在用v3.0版本
            if (perthread->seqno <= target_seqno) {
                ovs_strlcpy_arrays(stalled_thread, perthread->name);
                //如果还有线程在用v3.0版本,很显然,我们现在还不能执行v4.0版本的释放
                done = false;
                break;
            }
        }
        ovs_mutex_unlock(&ovsrcu_threads_mutex);

        if (done) {
            //如果所有线程的perthread->seqno都大于target_seqno,
            // 即所有线程都没有用v4.0版本以前的数据了,
            // 则我们不必再等待了,可以释放旧版本数据
            break;
        }

        elapsed = time_msec() - start;
        if (elapsed >= warning_threshold) {
            VLOG_WARN("blocked %u ms waiting for %s to quiesce",
                      elapsed, stalled_thread);
            warning_threshold *= 2;
        }
        poll_timer_wait_until(start + warning_threshold);

        //还有线程在用旧版本v3.0数据,所以等待global_seqno变化我们再做循环检测。
        //而只有当有线程进入停顿态,才会导致global_seq变化
        seq_wait(global_seqno, cur_seqno);
        poll_block();
    }
    ovsrcu_quiesce_end();
}

进函数时,读取当前周期编号global_seq,定义为目标序号target_seqno,随后for循环内持续等待,直到每个线程的seqno 都大于目标序号。其余代码实现,都是为了如果当前有线程的seqno 小于目标序号,则显然还有线程还可能在使用旧版本数据,则继续循环检测 + poll_block等待。

所以,ovs urcu线程就是不停记录global_seqno为target_seqno并开始等待,等到所有线程的版本号都大于了刚进入时记录的target_seqno 后,就删除一波,如此不停删除做垃圾回收。因此,如果urcu线程执行过长,就会导致全局回调链表中挂载许多回调释放函数,系统垃圾较多。

同样,这也理解为何如果某个ovs线程没有睡眠,ovs日志中会出现urcu线程报错:"blocked %u ms waiting for %s to quiesce"。其实这是因为这个线程一直死循环,没有经过停顿态,其perthread->seqno没有增长,导致urcu线程认为该线程一直在持有老的引用。初看似乎没什么,但其实这会导致urcu 线程一直在循环等待旧版本数据的持有者释放引用,并执行垃圾回收。但如果那个线程因为没有调用过rcu的停顿态函数,导致perthread->seqno一直没有增长,那么整个ovs的rcu机制都会空转等待,所有垃圾回收都无法执行!

Ok,至此,OVS 的 RCU 实现机制厘清。

参考文档:

  1. 线程本地存储:https://www.jianshu.com/p/d52c1ebf808a

  2. lwn.net 的 RCU 介绍:https://lwn.net/Articles/262464/

  3. OVS RCU 文档:https://github.com/openvswitch/ovs/blob/master/lib/ovs-rcu.h

文章来自个人专栏
OVS
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0