coroutine
一个程序要真正运行起来,需要两个因素:可执行代码段、数据。体现在 CPU 中,主要包含以下几个方面:
- EIP 寄存器:用来存储 CPU 要读取指令的地址
- ESP 寄存器:指向当前线程栈的栈顶位置
- 其他通用寄存器的内容:包括代表函数参数的 rdi、rsi 等等。
- 线程栈中的内存内容。
这些数据内容,我们一般将其称为 “上下文” 或者 “现场”。有栈协程的原理,就是从线程的上下文下手,如果把线程的上下文完全改变,即:改变 EIP 寄存的内容,指向其他指令地址;改变线程栈的内存内容等等。
这样的话,当前线程运行的程序也就完全改变了,是一个全新的程序。
协程的实现分为有栈和无栈的方式,有栈的实现方式又是可以分为“独立栈” 和 "共享栈"的实现方式。
共享栈的实现
仅以云风的coroutine实现为例。它的底层是使用 Linux
下的 ucontext
函数簇实现的。
typedef struct ucontext_t {
struct ucontext_t *uc_link; // 要去的环境
sigset_t uc_sigmask; // 屏蔽的信号
stack_t uc_stack; // 当前这个环境要使用的栈内存
mcontext_t uc_mcontext;
...
} ucontext_t;
每次在进行协程切换时,都是由先将当前上下文数据保存共享栈中,当切换回来的时候再从共享栈中恢复数据。设置的共享栈的大小是 1M。调度器的结构如下:
#define STACK_SIZE (1024*1024) // 1M
/// @brief: 协程调度器
struct schedule {
char stack[STACK_SIZE]; // 运行时栈
ucontext_t main; // 主协程的上下文
int nco; // 当前存活的协程个数,用作协程 id
int cap; // 协程管理器的当前最大容量,如果不够了,则进行扩容
int running; // 正在运行的协程ID
struct coroutine** co; // 一个一维数组,用于存放协程
};
/// @brief: 协程
struct coroutine {
coroutine_func func; // 协程运行的函数
void* ud; // func 参数
ucontext_t ctx; // 协程上下文
struct schedule* sch; // 该协程所属的调度器
ptrdiff_t cap; // 已经分配的内存大小
ptrdiff_t size; // 当前协程运行时栈,保存起来后的大小
int status; // 协程当前的状态
char* stack; // 当前协程的保存起来的运行时栈
};
创建协程调度器
函数 coroutine_open
负责创建一个协程调度器,初始化协程调度器的数值。
#define DEFAULT_COROUTINE 16 // 初始化建立协程时大小
/// @brief: 创建一个协程调度器
struct schedule * coroutine_open(void)
{
struct schedule *S = malloc(sizeof(struct schedule)); // 这里做的主要就是分配内存,同时赋初值
S->nco = 0;
S->cap = DEFAULT_COROUTINE;
S->running = -1;
S->co = malloc(sizeof(struct coroutine *) * S->cap);
memset(S->co, 0, sizeof(struct coroutine *) * S->cap);
return S;
}
创建一个协程
函数 _co_new
初始化 coroutine
对象:分配内存以及设置初始化状态是 COROUTINE_READY
。coroutine_new
将 创建的协程与对应的协程调度器联系起来。如果当前的协程数已经超过协程调度器中的协程数最大值cap
就会两倍扩容。最终还是会更新协程调度器的相关参数,返回协程的 id
。
/// @brief: 创建协程
/// @param: @c S 是协程调度器
/// @param: @c func 协程运行函数
/// @param: @c ud 是函数 func 的参数
/// @param: co 新创建的对象
struct coroutine*
_co_new(struct schedule* S , coroutine_func func, void *ud) {
struct coroutine* co = malloc(sizeof(struct coroutine));
co->func = func;
co->ud = ud;
co->sch = S;
co->cap = 0;
co->size = 0;
co->status = COROUTINE_READY; // 默认的最初状态都是 COROUTINE_READY
co->stack = NULL;
return co;
}
/// @brief: 创建一个协程对象
/// @param: S 该协程所属的调度器
/// @param: func 该协程函数执行体
/// @param: ud func 的参数
/// @return: 新建的协程的 ID
int
coroutine_new(struct schedule* S, coroutine_func func, void *ud) {
struct coroutine *co = _co_new(S, func , ud);
// 如果目前协程的数量已经大于调度器的容量,那么进行扩容
if (S->nco >= S->cap)
{
int id = S->cap; // 新的协程的id直接为当前容量的大小
S->co = realloc(S->co, S->cap * 2 * sizeof(struct coroutine *)); // 2倍扩容
memset(S->co + S->cap , 0 , sizeof(struct coroutine *) * S->cap); // 初始化新的内存
S->co[S->cap] = co; // 将新创建的协程放入调度器中
S->cap *= 2; // 将容量参数更新为两倍
++S->nco; // 协程数更新
return id;
}
else
{
// 如果目前协程的数量小于调度器的容量,则取一个为NULL的位置,放入新的协程
int i;
for (i=0; i<S->cap; i++) {
// 为什么不 i % S->cap, 而是要从 nco+i 开始呢,因为前nco有很大概率都非NULL的,直接跳过去更好
int id = (i + S->nco) % S->cap;
if (S->co[id] == NULL) {
S->co[id] = co;
++S->nco;
return id;
}
}
}
assert(NULL);
return -1;
}
切出
coroutine_resume
函数会切入到指定协程中执行。当前正在执行的协程的上下文会被保存起来,同时上下文替换成新的协程,该协程的状态将被置为 RUNNING
。
/// @biref: 切换到对应协程中执行
/// @param: S 协程调度器
/// @param: id 协程ID
void
coroutine_resume(struct schedule * S, int id) {
assert(S->running == -1);
assert(0 <= id && id < S->cap);
struct coroutine* C = S->co[id]; // 取出协程
if (C == NULL) return;
int status = C->status;
switch(status) {
case COROUTINE_READY:
getcontext(&C->ctx); // 初始化 ucontext_t 结构体,将当前的上下文放到 C->ctx 里面
// 将当前协程的运行时栈的栈顶设置为 S->stack
// 每个协程都这么设置,这就是所谓的共享栈。(注意,这里是栈顶)
C->ctx.uc_stack.ss_sp = S->stack;
C->ctx.uc_stack.ss_size = STACK_SIZE;
C->ctx.uc_link = &S->main; // 如果协程执行完,将切换到主协程中执行
S->running = id;
C->status = COROUTINE_RUNNING;
// 设置执行 C->ctx 函数, 并将 S 作为参数传进去
uintptr_t ptr = (uintptr_t)S;
makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));
// 将当前的上下文放入S->main中,并将C->ctx的上下文替换到当前上下文
swapcontext(&S->main, &C->ctx);
break;
case COROUTINE_SUSPEND:
// 将协程所保存的栈的内容,拷贝到当前运行时栈中
// 其中C->size在yield时有保存
memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
S->running = id;
C->status = COROUTINE_RUNNING;
swapcontext(&S->main, &C->ctx);
break;
default:
assert(0);
}
}
yield
/// @brief: 将当前正在运行的协程让出,切换到主协程上
/// @param S 协程调度器
void
coroutine_yield(struct schedule * S) {
// 取出当前正在运行的协程
int id = S->running;
assert(id >= 0);
struct coroutine * C = S->co[id];
assert((char *)&C > S->stack);
// 将当前运行的协程的栈内容保存起来
_save_stack(C,S->stack + STACK_SIZE);
// 将当前栈的状态改为 挂起
C->status = COROUTINE_SUSPEND;
S->running = -1;
// 所以这里可以看到,只能从协程切换到主协程中
swapcontext(&C->ctx , &S->main);
}
static void _save_stack(struct coroutine *C, char *top) {
char dummy = 0;
assert(top - &dummy <= STACK_SIZE);
// 如果已分配内存小于当前栈的大小,则释放内存重新分配
if (C->cap < top - &dummy) {
free(C->stack);
C->cap = top-&dummy;
C->stack = malloc(C->cap);
}
C->size = top - &dummy;
// 从 dummy 拷贝 size 内存到 C->stack
memcpy(C->stack, &dummy, C->size);
}