searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

redis事件驱动框架实现

2023-10-19 03:51:41
27
0

事件驱动框架实现

代码:ae.h、ae.c、ae_epoll.h/ae_epoll.c(对epoll相关函数的封装)

一般来说一个事件驱动框架包括一个主循环,主循环监听IO事件和超时事件,事件到来就调用相应的注册函数进行处理,处理完毕就进行下一次循环,这里也一样,redis对底层的epoll相关函数(epoll_create,epoll_wait,epoll_ctl等函数)进行了封装,可以添加事件或者删除事件到监听列表中,具体代码可以看下ae_epoll.c,就是对epoll_ctl,epoll_wait等相关函数的封装
客户端发起请求,reactor进行事件分发处理

主体流程

事件触发后,如何找到对应的回调函数?
首先添加事件监听,会将fd和对应的回调函数保存下来,通过aeCreateFileEvent函数添加事件监听

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */  // 添加的事件以及对应的回调函数会保存到aeFileEvent中
    aeFiredEvent *fired; /* Fired events */  // 触发的事件会放到这里
    aeTimeEvent *timeEventHead;  // 管理时间事件
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];  // 通过fd可以得到对应的回调处理函数

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)  // 最后会调用epoll_ctl完成添加
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;  // 保存处理函数
    if (mask & AE_WRITABLE) fe->wfileProc = proc;  // 保存处理函数
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}


事件触发时,保存触发事件的fd,通过fd查找eventLoop->events即可找到aeFileEvent,该结构体中保存了对应的回调函数

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;


时间事件怎么实现


定时事件通过双向链表管理(aeEventLoop中的timeEventHead指向了链表头),添加定时事件时,插入链表头,每次进入epoll_wait时,先遍历链表得到最早的超时事件

static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
    aeTimeEvent *te = eventLoop->timeEventHead;
    if (te == NULL) return -1;

    aeTimeEvent *earliest = NULL;
    while (te) {
        if (!earliest || te->when < earliest->when)
            earliest = te;
        te = te->next;
    }

    monotime now = getMonotonicUs();
    return (now >= earliest->when) ? 0 : earliest->when - now;
}


得到需要等待的时间,传入到epoll_wait的timeout参数中,超时就触发了,触发后,processTimeEvents函数中遍历双向链表,判断节点中的超时时间和当前时间的大小,如果超过当前时间则触发该事件,调用相应的函数处理,处理完毕后标识该节点的id为AE_DELETED_EVENT_ID,表示删除,遍历时也会将删除节点从链表中删除,具体见processTimeEvents函数代码

InitServer函数中创建时间任务

AE_BARRIER 事件
一般情况下,Redis 会先处理读事件 (AE_READABLE),再处理写事件 (AE_WRITABLE)。这个顺序安排其实也算是一点小优化,先读后写 可以让一个请求的处理和回包都是在同一次循环里面,使得请求可以尽快地回包,前面讲到,网络 IO 事件注册的时候,除了正常的读写事件外,还可以注册一个 AE_BARRIER 事件,这个事件就是会影响到先读后写的处理顺序。如果某个 fd 的 mask 包含了 AE_BARRIER,那它的处理顺序会是 先写后读。针对这个场景,redis 举的例子是,如果在 beforesleep 回调中进行了 fsync 动作,然后需要把结果快速回复给 client。这个情况下就需要用到 AE_BARRIER 事件,此时先处理写,后处理读,用于快速回复给client,因为fsync动作可能就花费了一些时间了,我们需要快速回复客户端。
aeProcessEvents函数中相应代码如下:


主循环代码

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}


int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
 
    /* 若没有事件处理,则立刻返回*/
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    /*如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/
    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
       …
    }
    /* 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    /* 返回已经处理的文件或时间*/
    return processed; 
}


读写事件处理

InitServer函数中创建连接请求的处理事件

连接到来:acceptTcpHandler -> anetTcpAccept调用accept获得fd -> acceptCommonHandler -> createClient -> connSetReadHandler -> aeCreateFileEvent  会设置fd的读处理函数为readQueryFromClient,client 发给 server 的数据,都会在这个函数处理,这个函数会解析 client 的数据,找到对应的 cmd 函数执行

readQueryFromClient -> processInputBuffer -> processCommandAndResetClient -> processCommand -> lookupCommand查找得到具体的命令和对应的处理函数 -> call(c,CMD_CALL_FULL) -> c->cmd->proc(c);调用命令相应的函数,最后会调用addReply将数据写到client的输出缓冲区中,即会先把响应数据写到对应 client 的 内存 buffer 中,在下一次处理 IO 事件之前(beforeSleep函数处理),Redis 会把每个 client 的 buffer 数据写到 client 的 socket 中,给 client 响应

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,
     "admin no-script",
     0,NULL,0,0,0,0,0,0},

    {"get",getCommand,2,
     "read-only fast @string",
     0,NULL,1,1,1,0,0,0},

    {"getex",getexCommand,-2,
     "write fast @string",
     0,NULL,1,1,1,0,0,0},

    {"getdel",getdelCommand,2,
     "write fast @string",
     0,NULL,1,1,1,0,0,0},
     。。。
     }

启动时会将 redisCommandTable 复制给server.commands(一个hash表),然后lookupCommand会查commands得到相应的处理函数并调用
真正发送数据:上面处理完后,准备进入下一次循环,在进入下一次epoll_wait调用前会调用beforeSleep -> handleClientsWithPendingWrites -> writeToClient将输出缓冲区中的数据发送出去,如果如果输出缓冲区的数据还没有写完,会调用connSetWriteHandlerWithBarrier -> aeCreateFileEvent 创建可写事件,处理函数为sendReplyToClient,该函数会调用writeToClient继续发送,具体可以看下handleClientsWithPendingWrite函数实现

 

0条评论
0 / 1000
9****m
15文章数
1粉丝数
9****m
15 文章 | 1 粉丝
原创

redis事件驱动框架实现

2023-10-19 03:51:41
27
0

事件驱动框架实现

代码:ae.h、ae.c、ae_epoll.h/ae_epoll.c(对epoll相关函数的封装)

一般来说一个事件驱动框架包括一个主循环,主循环监听IO事件和超时事件,事件到来就调用相应的注册函数进行处理,处理完毕就进行下一次循环,这里也一样,redis对底层的epoll相关函数(epoll_create,epoll_wait,epoll_ctl等函数)进行了封装,可以添加事件或者删除事件到监听列表中,具体代码可以看下ae_epoll.c,就是对epoll_ctl,epoll_wait等相关函数的封装
客户端发起请求,reactor进行事件分发处理

主体流程

事件触发后,如何找到对应的回调函数?
首先添加事件监听,会将fd和对应的回调函数保存下来,通过aeCreateFileEvent函数添加事件监听

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */  // 添加的事件以及对应的回调函数会保存到aeFileEvent中
    aeFiredEvent *fired; /* Fired events */  // 触发的事件会放到这里
    aeTimeEvent *timeEventHead;  // 管理时间事件
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];  // 通过fd可以得到对应的回调处理函数

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)  // 最后会调用epoll_ctl完成添加
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;  // 保存处理函数
    if (mask & AE_WRITABLE) fe->wfileProc = proc;  // 保存处理函数
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}


事件触发时,保存触发事件的fd,通过fd查找eventLoop->events即可找到aeFileEvent,该结构体中保存了对应的回调函数

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;


时间事件怎么实现


定时事件通过双向链表管理(aeEventLoop中的timeEventHead指向了链表头),添加定时事件时,插入链表头,每次进入epoll_wait时,先遍历链表得到最早的超时事件

static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
    aeTimeEvent *te = eventLoop->timeEventHead;
    if (te == NULL) return -1;

    aeTimeEvent *earliest = NULL;
    while (te) {
        if (!earliest || te->when < earliest->when)
            earliest = te;
        te = te->next;
    }

    monotime now = getMonotonicUs();
    return (now >= earliest->when) ? 0 : earliest->when - now;
}


得到需要等待的时间,传入到epoll_wait的timeout参数中,超时就触发了,触发后,processTimeEvents函数中遍历双向链表,判断节点中的超时时间和当前时间的大小,如果超过当前时间则触发该事件,调用相应的函数处理,处理完毕后标识该节点的id为AE_DELETED_EVENT_ID,表示删除,遍历时也会将删除节点从链表中删除,具体见processTimeEvents函数代码

InitServer函数中创建时间任务

AE_BARRIER 事件
一般情况下,Redis 会先处理读事件 (AE_READABLE),再处理写事件 (AE_WRITABLE)。这个顺序安排其实也算是一点小优化,先读后写 可以让一个请求的处理和回包都是在同一次循环里面,使得请求可以尽快地回包,前面讲到,网络 IO 事件注册的时候,除了正常的读写事件外,还可以注册一个 AE_BARRIER 事件,这个事件就是会影响到先读后写的处理顺序。如果某个 fd 的 mask 包含了 AE_BARRIER,那它的处理顺序会是 先写后读。针对这个场景,redis 举的例子是,如果在 beforesleep 回调中进行了 fsync 动作,然后需要把结果快速回复给 client。这个情况下就需要用到 AE_BARRIER 事件,此时先处理写,后处理读,用于快速回复给client,因为fsync动作可能就花费了一些时间了,我们需要快速回复客户端。
aeProcessEvents函数中相应代码如下:


主循环代码

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}


int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
 
    /* 若没有事件处理,则立刻返回*/
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    /*如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/
    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
       …
    }
    /* 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    /* 返回已经处理的文件或时间*/
    return processed; 
}


读写事件处理

InitServer函数中创建连接请求的处理事件

连接到来:acceptTcpHandler -> anetTcpAccept调用accept获得fd -> acceptCommonHandler -> createClient -> connSetReadHandler -> aeCreateFileEvent  会设置fd的读处理函数为readQueryFromClient,client 发给 server 的数据,都会在这个函数处理,这个函数会解析 client 的数据,找到对应的 cmd 函数执行

readQueryFromClient -> processInputBuffer -> processCommandAndResetClient -> processCommand -> lookupCommand查找得到具体的命令和对应的处理函数 -> call(c,CMD_CALL_FULL) -> c->cmd->proc(c);调用命令相应的函数,最后会调用addReply将数据写到client的输出缓冲区中,即会先把响应数据写到对应 client 的 内存 buffer 中,在下一次处理 IO 事件之前(beforeSleep函数处理),Redis 会把每个 client 的 buffer 数据写到 client 的 socket 中,给 client 响应

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,
     "admin no-script",
     0,NULL,0,0,0,0,0,0},

    {"get",getCommand,2,
     "read-only fast @string",
     0,NULL,1,1,1,0,0,0},

    {"getex",getexCommand,-2,
     "write fast @string",
     0,NULL,1,1,1,0,0,0},

    {"getdel",getdelCommand,2,
     "write fast @string",
     0,NULL,1,1,1,0,0,0},
     。。。
     }

启动时会将 redisCommandTable 复制给server.commands(一个hash表),然后lookupCommand会查commands得到相应的处理函数并调用
真正发送数据:上面处理完后,准备进入下一次循环,在进入下一次epoll_wait调用前会调用beforeSleep -> handleClientsWithPendingWrites -> writeToClient将输出缓冲区中的数据发送出去,如果如果输出缓冲区的数据还没有写完,会调用connSetWriteHandlerWithBarrier -> aeCreateFileEvent 创建可写事件,处理函数为sendReplyToClient,该函数会调用writeToClient继续发送,具体可以看下handleClientsWithPendingWrite函数实现

 

文章来自个人专栏
redis代码剖析
9 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0