事件驱动框架实现
代码:ae.h、ae.c、ae_epoll.h/ae_epoll.c(对epoll相关函数的封装)
一般来说一个事件驱动框架包括一个主循环,主循环监听IO事件和超时事件,事件到来就调用相应的注册函数进行处理,处理完毕就进行下一次循环,这里也一样,redis对底层的epoll相关函数(epoll_create,epoll_wait,epoll_ctl等函数)进行了封装,可以添加事件或者删除事件到监听列表中,具体代码可以看下ae_epoll.c,就是对epoll_ctl,epoll_wait等相关函数的封装
客户端发起请求,reactor进行事件分发处理
主体流程
事件触发后,如何找到对应的回调函数?
首先添加事件监听,会将fd和对应的回调函数保存下来,通过aeCreateFileEvent函数添加事件监听
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */ // 添加的事件以及对应的回调函数会保存到aeFileEvent中
aeFiredEvent *fired; /* Fired events */ // 触发的事件会放到这里
aeTimeEvent *timeEventHead; // 管理时间事件
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd]; // 通过fd可以得到对应的回调处理函数
if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 最后会调用epoll_ctl完成添加
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc; // 保存处理函数
if (mask & AE_WRITABLE) fe->wfileProc = proc; // 保存处理函数
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
事件触发时,保存触发事件的fd,通过fd查找eventLoop->events即可找到aeFileEvent,该结构体中保存了对应的回调函数
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
时间事件怎么实现
定时事件通过双向链表管理(aeEventLoop中的timeEventHead指向了链表头),添加定时事件时,插入链表头,每次进入epoll_wait时,先遍历链表得到最早的超时事件
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
aeTimeEvent *te = eventLoop->timeEventHead;
if (te == NULL) return -1;
aeTimeEvent *earliest = NULL;
while (te) {
if (!earliest || te->when < earliest->when)
earliest = te;
te = te->next;
}
monotime now = getMonotonicUs();
return (now >= earliest->when) ? 0 : earliest->when - now;
}
得到需要等待的时间,传入到epoll_wait的timeout参数中,超时就触发了,触发后,processTimeEvents函数中遍历双向链表,判断节点中的超时时间和当前时间的大小,如果超过当前时间则触发该事件,调用相应的函数处理,处理完毕后标识该节点的id为AE_DELETED_EVENT_ID,表示删除,遍历时也会将删除节点从链表中删除,具体见processTimeEvents函数代码
InitServer函数中创建时间任务
AE_BARRIER 事件
一般情况下,Redis 会先处理读事件 (AE_READABLE),再处理写事件 (AE_WRITABLE)。这个顺序安排其实也算是一点小优化,先读后写 可以让一个请求的处理和回包都是在同一次循环里面,使得请求可以尽快地回包,前面讲到,网络 IO 事件注册的时候,除了正常的读写事件外,还可以注册一个 AE_BARRIER 事件,这个事件就是会影响到先读后写的处理顺序。如果某个 fd 的 mask 包含了 AE_BARRIER,那它的处理顺序会是 先写后读。针对这个场景,redis 举的例子是,如果在 beforesleep 回调中进行了 fsync 动作,然后需要把结果快速回复给 client。这个情况下就需要用到 AE_BARRIER 事件,此时先处理写,后处理读,用于快速回复给client,因为fsync动作可能就花费了一些时间了,我们需要快速回复客户端。
aeProcessEvents函数中相应代码如下:
主循环代码
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* 若没有事件处理,则立刻返回*/
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/*如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
…
}
/* 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
/* 返回已经处理的文件或时间*/
return processed;
}
读写事件处理
InitServer函数中创建连接请求的处理事件
连接到来:acceptTcpHandler -> anetTcpAccept调用accept获得fd -> acceptCommonHandler -> createClient -> connSetReadHandler -> aeCreateFileEvent 会设置fd的读处理函数为readQueryFromClient,client 发给 server 的数据,都会在这个函数处理,这个函数会解析 client 的数据,找到对应的 cmd 函数执行
readQueryFromClient -> processInputBuffer -> processCommandAndResetClient -> processCommand -> lookupCommand查找得到具体的命令和对应的处理函数 -> call(c,CMD_CALL_FULL) -> c->cmd->proc(c);调用命令相应的函数,最后会调用addReply将数据写到client的输出缓冲区中,即会先把响应数据写到对应 client 的 内存 buffer 中,在下一次处理 IO 事件之前(beforeSleep函数处理),Redis 会把每个 client 的 buffer 数据写到 client 的 socket 中,给 client 响应
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2,
"admin no-script",
0,NULL,0,0,0,0,0,0},
{"get",getCommand,2,
"read-only fast @string",
0,NULL,1,1,1,0,0,0},
{"getex",getexCommand,-2,
"write fast @string",
0,NULL,1,1,1,0,0,0},
{"getdel",getdelCommand,2,
"write fast @string",
0,NULL,1,1,1,0,0,0},
。。。
}
启动时会将 redisCommandTable 复制给server.commands(一个hash表),然后lookupCommand会查commands得到相应的处理函数并调用
真正发送数据:上面处理完后,准备进入下一次循环,在进入下一次epoll_wait调用前会调用beforeSleep -> handleClientsWithPendingWrites -> writeToClient将输出缓冲区中的数据发送出去,如果如果输出缓冲区的数据还没有写完,会调用connSetWriteHandlerWithBarrier -> aeCreateFileEvent 创建可写事件,处理函数为sendReplyToClient,该函数会调用writeToClient继续发送,具体可以看下handleClientsWithPendingWrite函数实现