searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

redis AOF实现原理剖析

2023-10-19 07:23:21
15
0

代码:aof.c

因为RDB不是很可靠,所以有了AOF(append only file),每当 Redis 接受到会修改数据集的命令时,就会把命令追加到 AOF 文件里,当你重启 Redis 时,AOF 文件里的命令会被重新执行一次,重建数据。

AOF因为是保存的修改的命令,随着时间推移,文件会越来越大,可以通过AOF重写来降低文件大小

AOF重写主要流程为:用户执行bgrewriteaof命令(也有其他触发时机,见下文),触发AOF,fork子进程进行AOF重写,子进程继承了父进程的进程空间,遍历 Redis server 上的所有数据库,把每个键值对以插入操作的形式写入日志文件,在这期间父进程继续提供服务,父进程的写操作会先缓存到 buf 中,之后父进程把 buf 中的数据,通过管道发给子进程,子进程写完AOF后,从管道中读取修改命令并写入,之后发送ack告诉父进程(父进程收到子进程ack后设置server.aof_stop_sending_diff为1,此时父进程不会再发送数据到管道,父进程保存修改命令数据,即累计的修改数据),然后父进程回复ack,子进程收到ack后会再尝试把管道中的数据读出来写入AOF文件,之后重命名AOF文件,退出,父进程感知这一状态(serverCron -> checkChildrenDone -> backgroundRewriteDoneHandler -> waitpid),读取子进程重写的aof文件,在文件末尾再次写入父进程积累的数据,然后将文件名重命名成最终文件,重写结束,最后该文件被重新打开作为新的AOF文件,父进程继续写入修改命令到该文件中。

 

AOF文件内容


*2 #当前命令有两个参数
$6 #第一个参数长度是6字节,即SELECT
SELECT #当前命令的第一个参数值
$1 #当前命令的第二个参数,长度是1,即0
0 #当前命令的第二个参数值
*3
$3
SET
$4
name
$4
jack
*2
$6
SELECT
$1
0
*3
$3
set
$3
sex
$4
male

 

AOF 重写的触发时机

实现 AOF 重写的函数是 rewriteAppendOnlyFileBackground

rewriteAppendOnlyFileBackground 函数一共会在三个函数中被调用

bgrewriteaofCommand 函数,执行 bgrewriteaof 命令,也就是说,我们手动触发了 AOF rewrite 的执行

void bgrewriteaofCommand(client *c) {
 if (server.child_type == CHILD_TYPE_AOF) { // 已经在执行了就不再执行
 addReplyError(c,"Background append only file rewriting already in progress");
 } else if (hasActiveChildProcess()) { // 有子进程,即rdb
 server.aof_rewrite_scheduled = 1; // 会在ServerCron中判断并调用rewriteAppendOnlyFileBackground进行AOF重写
 addReplyStatus(c,"Background append only file rewriting scheduled");
 } else if (rewriteAppendOnlyFileBackground() == C_OK) { // 进行AOF重写
 addReplyStatus(c,"Background append only file rewriting started");
 } else {
 addReplyError(c,"Can't execute an AOF background rewriting. "
 "Please check the server logs for more information.");
 }
}

即只有当前既没有 AOF 重写子进程也没有 RDB 子进程,bgrewriteaofCommand 函数才会立即调用 rewriteAppendOnlyFileBackground 函数,实际执行 AOF 重写。

startAppendOnly函数,该函数会被 configSetCommand 函数(在config.c文件中,在 Redis 中执行 config 命令启用 AOF 功能会调用,即执行"config set appendonly yes"命令)和 restartAOFAfterSYNC 函数(在replication.c文件中,会在主从节点的复制过程中被调用,就是当主从节点在进行复制时,如果从节点的 AOF 选项被打开,那么在加载解析 RDB 文件时,AOF 选项就会被关闭。然后,无论从节点是否成功加载了 RDB 文件,restartAOFAfterSYNC 函数都会被调用,用来恢复被关闭的 AOF 功能)调用

createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, server.aof_enabled, 0, NULL, updateAppendonly)

configSetCommand调用流程:configSetCommand -> interface.set -> boolConfigSet -> data.yesno.update_fn -> updateAppendonly ->startAppendOnly

serverCron 函数

serverCron 函数会检测当前是否没有 RDB 子进程和 AOF 重写子进程在执行,并检测是否有 AOF 重写操作被设置为了待调度执行,也就aof_rewrite_scheduled 变量值为 1。如果这三个条件都满足,那么 serverCron 函数就会调用rewriteAppendOnlyFileBackground 函数来执行 AOF 重写

判断AOF 功能已启用、AOF 文件大小比例超出阈值,以及 AOF 文件大小绝对值超出阈值,都满足则进行AOF重写

总结来说下面几点会进行重写:

时机一:bgrewriteaof 命令被执行。

时机二:主从复制完成 RDB 文件解析和加载(无论是否成功)。

时机三:AOF 重写被设置为待调度执行。

时机四:AOF 被启用,同时 AOF 文件的大小比例超出阈值,以及 AOF 文件的大小绝对值超出阈值。

 

重写过程

在rewriteAppendOnlyFileBackground函数中实现的

/* ----------------------------------------------------------------------------
 * AOF background rewrite
 * ------------------------------------------------------------------------- */

/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
 *    2b) the parent accumulates differences in server.aof_rewrite_buf.
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, will append the
 *    data accumulated into server.aof_rewrite_buf into the temp file, and
 *    finally will rename(2) the temp file in the actual file name.
 *    The the new file is reopened as the new append only file. Profit!
 */
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;

    if (hasActiveChildProcess()) return C_ERR;
    if (aofCreatePipes() != C_OK) return C_ERR;
    if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
        char tmpfile[256];

        /* Child */
        redisSetProcTitle("redis-aof-rewrite");
        redisSetCpuAffinity(server.aof_rewrite_cpulist);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* Parent */
        if (childpid == -1) {
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            aofClosePipes();
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %ld",(long) childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);

        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK; /* unreached */
}

 

管道通信

在子进程AOF期间,父进程继续提供服务,产生的修改命令需要传输给子进程,子进程将这部分修改命令写入到AOF文件中,通信方式是管道信息


aofCreatePipes 函数创建了包含 6 个文件描述符元素的数组 fds

/* Create the pipes used for parent - child process IPC during rewrite.
 * We have a data pipe used to send AOF incremental diffs to the child,
 * and two other pipes used by the children to signal it finished with
 * the rewrite so no more data should be written, and another for the
 * parent to acknowledge it understood this new condition. */
int aofCreatePipes(void) {
 int fds[6] = {-1, -1, -1, -1, -1, -1};
 int j;

 if (pipe(fds) == -1) goto error; /* parent -> children data. */
 if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
 if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
 /* Parent -> children data is non blocking. */
 if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
 if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
 if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

 server.aof_pipe_write_data_to_child = fds[1];
 server.aof_pipe_read_data_from_parent = fds[0];
 server.aof_pipe_write_ack_to_parent = fds[3];
 server.aof_pipe_read_ack_from_child = fds[2];
 server.aof_pipe_write_ack_to_child = fds[5];
 server.aof_pipe_read_ack_from_parent = fds[4];
 server.aof_stop_sending_diff = 0;
 return C_OK;

error:
 serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
 strerror(errno));
 for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
 return C_ERR;
}


创建的管道用途如下:

fds[0]和 fds[1]:对应了主进程和重写子进程间用于传递操作命令的管道,它们分别对应读描述符和写描述符。

fds[2]和 fds[3]:对应了重写子进程向父进程发送 ACK 信息的管道,它们分别对应读描述符和写描述符。

fds[4]和 fds[5]:对应了父进程向重写子进程发送 ACK 信息的管道,它们分别对应读描述符和写描述符。

 

操作命令传输管道的使用

当 AOF 重写子进程在执行时,主进程还会继续接收和处理客户端写请求。这些写操作会被主进程正常写入 AOF 日志文件,这个过程是由 feedAppendOnlyFile 函数(在 aof.c 文件中)来完成

feedAppendOnlyFile -> aofRewriteBufferAppend,该函数会将修改命令写入到下面结构体中,然后挂到server.aof_rewrite_buf_blocks链表中

typedef struct aofrwblock {

unsigned long used, free;

char buf[AOF_RW_BUF_BLOCK_SIZE];

} aofrwblock;

接着调用aeCreateFileEvent创建可写事件,处理函数为aofChildWriteDiffData,该函数就循环从server.aof_rewrite_buf_blocks取节点数据,写入到管道中

子进程会调用aofReadDiffFromParent 函数读取管道中的数据,读取的数据放到 server.aof_child_diff(sds结构),它会将读取的操作命令追加到全局变量 server 的 aof_child_diff 字符串中。而在 AOF 重写函数 rewriteAppendOnlyFile 的执行过程最后,aof_child_diff 字符串会被写入 AOF 重写日志文件,以便我们在使用 AOF 重写日志时,能尽可能地恢复重写期间收到的操作

其实,aofReadDiffFromParent 函数一共会被以下三个函数调用。

rewriteAppendOnlyFileRio 函数:这个函数是由重写子进程执行的,它负责遍历 Redis 每个数据库,生成 AOF 重写日志,在这个过程中,它会不时地调用 aofReadDiffFromParent 函数。

rewriteAppendOnlyFile 函数:这个函数是重写日志的主体函数,也是由重写子进程执行的,它本身会调用 rewriteAppendOnlyFileRio 函数。此外,它在调用完 rewriteAppendOnlyFileRio 函数后,还会多次调用 aofReadDiffFromParent 函数,以尽可能多地读取主进程在重写日志期间收到的操作命令。

rdbSaveRio 函数:这个函数是创建 RDB 文件的主体函数。当我们使用 AOF 和 RDB 混合持久化机制时,这个函数也会调用 aofReadDiffFromParent 函数

从这里,我们可以看到,Redis 源码在实现 AOF 重写过程中,其实会多次让重写子进程向主进程读取新收到的操作命令,这也是为了让重写日志尽可能多地记录最新的操作,提供更加完整的操作记录。

ACK 管道的使用

子进程完成AOF重写后, rewriteAppendOnlyFile 这个函数在完成日志重写后,就会调用 write 函数,向 aof_pipe_write_ack_to_parent 描述符对应的管道中写入“!”,这就是重写子进程向主进程发送 ACK 信号,让主进程停止发送收到的新写操作。即告诉父进程,不要写修改命令道管道中了。

注册了回调函数aofChildPipeReadable,这个函数会判断从 aof_pipe_read_ack_from_child 管道描述符读取的数据是否是“!”,如果是的话,那它就会调用 write 函数,往 aof_pipe_write_ack_to_child 管道描述符上写入“!”,表示主进程已经收到重写子进程发送的 ACK 信息,同时它会给重写子进程回复一个 ACK 信息。

最后,重写子进程执行的 rewriteAppendOnlyFile 函数,会调用 syncRead 函数,从 aof_pipe_read_ack_from_parent 管道描述符上,读取主进程发送给它的 ACK 信息。

这样一来,重写子进程和主进程之间就通过两个 ACK 管道,相互确认重写过程结束了。

AOF 重写子进程和主进程是使用了一个操作命令传输管道和两个 ACK 信息发送管道。操作命令传输管道是用于主进程写入收到的新操作命令,以及用于重写子进程读取操作命令,而 ACK 信息发送管道是在重写结束时,重写子进程和主进程用来相互确认重写过程的结束。最后,重写子进程会进一步将收到的操作命令记录到重写日志文件中

这样一来,AOF 重写过程中主进程收到的新写操作,就不会被遗漏了。因为一方面,这些新写操作会被记录在正常的 AOF 日志中,另一方面,主进程会将新写操作缓存在 aof_rewrite_buf_blocks 数据块列表中,并通过管道发送给重写子进程。这样,就能尽可能地保证重写日志具有最新、最完整的写操作了。

子进程什么时候发送ack

子进程在正常的重写完成后至多再等一秒,在这一秒内如果有连续20ms没有可读事件发生,那么直接发送ack

 

主进程执行写操作写AOF的流程

前面说了执行命令会调用到call(client *c, int flags),该命令会调用propagate函数将命令传播到AOF文件,具体是调用feedAppendOnlyFile函数中会将命令写入server.aof_buf缓存中,然后serverCron定时函数会调用flushAppendOnlyFile考虑是否需要将aof_buf缓冲区中的内容写入和保存到AOF 文件里面

call(client *c, int flags) -> propagate -> feedAppendOnlyFile -> flushAppendOnlyFile -> 写入server.aof_buf

serverCron -> flushAppendOnlyFile -> 写AOF文件

 

RDB-AOF混合持久化

即AOF文件的前半段是RDB格式的全量数据后半段是redis命令格式的增量数据,server.aof_use_rdb_preamble大于0(混合持久化开关打开)时,就会进入rdbSaveRio函数先以RDB格式来保存全量数据

前半段是RDB格式的全量数据后半段是redis命令格式的增量数据。

 

加载RDB/AOF

main -> loadDataFromDisk -> rdbLoad -> rdbLoadRio 完成rdb文件的加载

main -> loadDataFromDisk -> loadAppendOnlyFile 完成AOF文件的加载,如果AOF文件是混合持久化的文件,则会首先调用rdbLoadRio加载RDB,加载完后该函数会新建一个fake的client来一行一行执行AOF文件中的命令,完成AOF文件的加载

因为AOF文件更完整,所以代码中会优先加载AOF文件

0条评论
0 / 1000
9****m
15文章数
1粉丝数
9****m
15 文章 | 1 粉丝
原创

redis AOF实现原理剖析

2023-10-19 07:23:21
15
0

代码:aof.c

因为RDB不是很可靠,所以有了AOF(append only file),每当 Redis 接受到会修改数据集的命令时,就会把命令追加到 AOF 文件里,当你重启 Redis 时,AOF 文件里的命令会被重新执行一次,重建数据。

AOF因为是保存的修改的命令,随着时间推移,文件会越来越大,可以通过AOF重写来降低文件大小

AOF重写主要流程为:用户执行bgrewriteaof命令(也有其他触发时机,见下文),触发AOF,fork子进程进行AOF重写,子进程继承了父进程的进程空间,遍历 Redis server 上的所有数据库,把每个键值对以插入操作的形式写入日志文件,在这期间父进程继续提供服务,父进程的写操作会先缓存到 buf 中,之后父进程把 buf 中的数据,通过管道发给子进程,子进程写完AOF后,从管道中读取修改命令并写入,之后发送ack告诉父进程(父进程收到子进程ack后设置server.aof_stop_sending_diff为1,此时父进程不会再发送数据到管道,父进程保存修改命令数据,即累计的修改数据),然后父进程回复ack,子进程收到ack后会再尝试把管道中的数据读出来写入AOF文件,之后重命名AOF文件,退出,父进程感知这一状态(serverCron -> checkChildrenDone -> backgroundRewriteDoneHandler -> waitpid),读取子进程重写的aof文件,在文件末尾再次写入父进程积累的数据,然后将文件名重命名成最终文件,重写结束,最后该文件被重新打开作为新的AOF文件,父进程继续写入修改命令到该文件中。

 

AOF文件内容


*2 #当前命令有两个参数
$6 #第一个参数长度是6字节,即SELECT
SELECT #当前命令的第一个参数值
$1 #当前命令的第二个参数,长度是1,即0
0 #当前命令的第二个参数值
*3
$3
SET
$4
name
$4
jack
*2
$6
SELECT
$1
0
*3
$3
set
$3
sex
$4
male

 

AOF 重写的触发时机

实现 AOF 重写的函数是 rewriteAppendOnlyFileBackground

rewriteAppendOnlyFileBackground 函数一共会在三个函数中被调用

bgrewriteaofCommand 函数,执行 bgrewriteaof 命令,也就是说,我们手动触发了 AOF rewrite 的执行

void bgrewriteaofCommand(client *c) {
 if (server.child_type == CHILD_TYPE_AOF) { // 已经在执行了就不再执行
 addReplyError(c,"Background append only file rewriting already in progress");
 } else if (hasActiveChildProcess()) { // 有子进程,即rdb
 server.aof_rewrite_scheduled = 1; // 会在ServerCron中判断并调用rewriteAppendOnlyFileBackground进行AOF重写
 addReplyStatus(c,"Background append only file rewriting scheduled");
 } else if (rewriteAppendOnlyFileBackground() == C_OK) { // 进行AOF重写
 addReplyStatus(c,"Background append only file rewriting started");
 } else {
 addReplyError(c,"Can't execute an AOF background rewriting. "
 "Please check the server logs for more information.");
 }
}

即只有当前既没有 AOF 重写子进程也没有 RDB 子进程,bgrewriteaofCommand 函数才会立即调用 rewriteAppendOnlyFileBackground 函数,实际执行 AOF 重写。

startAppendOnly函数,该函数会被 configSetCommand 函数(在config.c文件中,在 Redis 中执行 config 命令启用 AOF 功能会调用,即执行"config set appendonly yes"命令)和 restartAOFAfterSYNC 函数(在replication.c文件中,会在主从节点的复制过程中被调用,就是当主从节点在进行复制时,如果从节点的 AOF 选项被打开,那么在加载解析 RDB 文件时,AOF 选项就会被关闭。然后,无论从节点是否成功加载了 RDB 文件,restartAOFAfterSYNC 函数都会被调用,用来恢复被关闭的 AOF 功能)调用

createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, server.aof_enabled, 0, NULL, updateAppendonly)

configSetCommand调用流程:configSetCommand -> interface.set -> boolConfigSet -> data.yesno.update_fn -> updateAppendonly ->startAppendOnly

serverCron 函数

serverCron 函数会检测当前是否没有 RDB 子进程和 AOF 重写子进程在执行,并检测是否有 AOF 重写操作被设置为了待调度执行,也就aof_rewrite_scheduled 变量值为 1。如果这三个条件都满足,那么 serverCron 函数就会调用rewriteAppendOnlyFileBackground 函数来执行 AOF 重写

判断AOF 功能已启用、AOF 文件大小比例超出阈值,以及 AOF 文件大小绝对值超出阈值,都满足则进行AOF重写

总结来说下面几点会进行重写:

时机一:bgrewriteaof 命令被执行。

时机二:主从复制完成 RDB 文件解析和加载(无论是否成功)。

时机三:AOF 重写被设置为待调度执行。

时机四:AOF 被启用,同时 AOF 文件的大小比例超出阈值,以及 AOF 文件的大小绝对值超出阈值。

 

重写过程

在rewriteAppendOnlyFileBackground函数中实现的

/* ----------------------------------------------------------------------------
 * AOF background rewrite
 * ------------------------------------------------------------------------- */

/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
 *    2b) the parent accumulates differences in server.aof_rewrite_buf.
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, will append the
 *    data accumulated into server.aof_rewrite_buf into the temp file, and
 *    finally will rename(2) the temp file in the actual file name.
 *    The the new file is reopened as the new append only file. Profit!
 */
int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;

    if (hasActiveChildProcess()) return C_ERR;
    if (aofCreatePipes() != C_OK) return C_ERR;
    if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
        char tmpfile[256];

        /* Child */
        redisSetProcTitle("redis-aof-rewrite");
        redisSetCpuAffinity(server.aof_rewrite_cpulist);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        /* Parent */
        if (childpid == -1) {
            serverLog(LL_WARNING,
                "Can't rewrite append only file in background: fork: %s",
                strerror(errno));
            aofClosePipes();
            return C_ERR;
        }
        serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %ld",(long) childpid);
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);

        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    }
    return C_OK; /* unreached */
}

 

管道通信

在子进程AOF期间,父进程继续提供服务,产生的修改命令需要传输给子进程,子进程将这部分修改命令写入到AOF文件中,通信方式是管道信息


aofCreatePipes 函数创建了包含 6 个文件描述符元素的数组 fds

/* Create the pipes used for parent - child process IPC during rewrite.
 * We have a data pipe used to send AOF incremental diffs to the child,
 * and two other pipes used by the children to signal it finished with
 * the rewrite so no more data should be written, and another for the
 * parent to acknowledge it understood this new condition. */
int aofCreatePipes(void) {
 int fds[6] = {-1, -1, -1, -1, -1, -1};
 int j;

 if (pipe(fds) == -1) goto error; /* parent -> children data. */
 if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
 if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
 /* Parent -> children data is non blocking. */
 if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
 if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
 if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

 server.aof_pipe_write_data_to_child = fds[1];
 server.aof_pipe_read_data_from_parent = fds[0];
 server.aof_pipe_write_ack_to_parent = fds[3];
 server.aof_pipe_read_ack_from_child = fds[2];
 server.aof_pipe_write_ack_to_child = fds[5];
 server.aof_pipe_read_ack_from_parent = fds[4];
 server.aof_stop_sending_diff = 0;
 return C_OK;

error:
 serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
 strerror(errno));
 for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
 return C_ERR;
}


创建的管道用途如下:

fds[0]和 fds[1]:对应了主进程和重写子进程间用于传递操作命令的管道,它们分别对应读描述符和写描述符。

fds[2]和 fds[3]:对应了重写子进程向父进程发送 ACK 信息的管道,它们分别对应读描述符和写描述符。

fds[4]和 fds[5]:对应了父进程向重写子进程发送 ACK 信息的管道,它们分别对应读描述符和写描述符。

 

操作命令传输管道的使用

当 AOF 重写子进程在执行时,主进程还会继续接收和处理客户端写请求。这些写操作会被主进程正常写入 AOF 日志文件,这个过程是由 feedAppendOnlyFile 函数(在 aof.c 文件中)来完成

feedAppendOnlyFile -> aofRewriteBufferAppend,该函数会将修改命令写入到下面结构体中,然后挂到server.aof_rewrite_buf_blocks链表中

typedef struct aofrwblock {

unsigned long used, free;

char buf[AOF_RW_BUF_BLOCK_SIZE];

} aofrwblock;

接着调用aeCreateFileEvent创建可写事件,处理函数为aofChildWriteDiffData,该函数就循环从server.aof_rewrite_buf_blocks取节点数据,写入到管道中

子进程会调用aofReadDiffFromParent 函数读取管道中的数据,读取的数据放到 server.aof_child_diff(sds结构),它会将读取的操作命令追加到全局变量 server 的 aof_child_diff 字符串中。而在 AOF 重写函数 rewriteAppendOnlyFile 的执行过程最后,aof_child_diff 字符串会被写入 AOF 重写日志文件,以便我们在使用 AOF 重写日志时,能尽可能地恢复重写期间收到的操作

其实,aofReadDiffFromParent 函数一共会被以下三个函数调用。

rewriteAppendOnlyFileRio 函数:这个函数是由重写子进程执行的,它负责遍历 Redis 每个数据库,生成 AOF 重写日志,在这个过程中,它会不时地调用 aofReadDiffFromParent 函数。

rewriteAppendOnlyFile 函数:这个函数是重写日志的主体函数,也是由重写子进程执行的,它本身会调用 rewriteAppendOnlyFileRio 函数。此外,它在调用完 rewriteAppendOnlyFileRio 函数后,还会多次调用 aofReadDiffFromParent 函数,以尽可能多地读取主进程在重写日志期间收到的操作命令。

rdbSaveRio 函数:这个函数是创建 RDB 文件的主体函数。当我们使用 AOF 和 RDB 混合持久化机制时,这个函数也会调用 aofReadDiffFromParent 函数

从这里,我们可以看到,Redis 源码在实现 AOF 重写过程中,其实会多次让重写子进程向主进程读取新收到的操作命令,这也是为了让重写日志尽可能多地记录最新的操作,提供更加完整的操作记录。

ACK 管道的使用

子进程完成AOF重写后, rewriteAppendOnlyFile 这个函数在完成日志重写后,就会调用 write 函数,向 aof_pipe_write_ack_to_parent 描述符对应的管道中写入“!”,这就是重写子进程向主进程发送 ACK 信号,让主进程停止发送收到的新写操作。即告诉父进程,不要写修改命令道管道中了。

注册了回调函数aofChildPipeReadable,这个函数会判断从 aof_pipe_read_ack_from_child 管道描述符读取的数据是否是“!”,如果是的话,那它就会调用 write 函数,往 aof_pipe_write_ack_to_child 管道描述符上写入“!”,表示主进程已经收到重写子进程发送的 ACK 信息,同时它会给重写子进程回复一个 ACK 信息。

最后,重写子进程执行的 rewriteAppendOnlyFile 函数,会调用 syncRead 函数,从 aof_pipe_read_ack_from_parent 管道描述符上,读取主进程发送给它的 ACK 信息。

这样一来,重写子进程和主进程之间就通过两个 ACK 管道,相互确认重写过程结束了。

AOF 重写子进程和主进程是使用了一个操作命令传输管道和两个 ACK 信息发送管道。操作命令传输管道是用于主进程写入收到的新操作命令,以及用于重写子进程读取操作命令,而 ACK 信息发送管道是在重写结束时,重写子进程和主进程用来相互确认重写过程的结束。最后,重写子进程会进一步将收到的操作命令记录到重写日志文件中

这样一来,AOF 重写过程中主进程收到的新写操作,就不会被遗漏了。因为一方面,这些新写操作会被记录在正常的 AOF 日志中,另一方面,主进程会将新写操作缓存在 aof_rewrite_buf_blocks 数据块列表中,并通过管道发送给重写子进程。这样,就能尽可能地保证重写日志具有最新、最完整的写操作了。

子进程什么时候发送ack

子进程在正常的重写完成后至多再等一秒,在这一秒内如果有连续20ms没有可读事件发生,那么直接发送ack

 

主进程执行写操作写AOF的流程

前面说了执行命令会调用到call(client *c, int flags),该命令会调用propagate函数将命令传播到AOF文件,具体是调用feedAppendOnlyFile函数中会将命令写入server.aof_buf缓存中,然后serverCron定时函数会调用flushAppendOnlyFile考虑是否需要将aof_buf缓冲区中的内容写入和保存到AOF 文件里面

call(client *c, int flags) -> propagate -> feedAppendOnlyFile -> flushAppendOnlyFile -> 写入server.aof_buf

serverCron -> flushAppendOnlyFile -> 写AOF文件

 

RDB-AOF混合持久化

即AOF文件的前半段是RDB格式的全量数据后半段是redis命令格式的增量数据,server.aof_use_rdb_preamble大于0(混合持久化开关打开)时,就会进入rdbSaveRio函数先以RDB格式来保存全量数据

前半段是RDB格式的全量数据后半段是redis命令格式的增量数据。

 

加载RDB/AOF

main -> loadDataFromDisk -> rdbLoad -> rdbLoadRio 完成rdb文件的加载

main -> loadDataFromDisk -> loadAppendOnlyFile 完成AOF文件的加载,如果AOF文件是混合持久化的文件,则会首先调用rdbLoadRio加载RDB,加载完后该函数会新建一个fake的client来一行一行执行AOF文件中的命令,完成AOF文件的加载

因为AOF文件更完整,所以代码中会优先加载AOF文件

文章来自个人专栏
redis代码剖析
9 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0