searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

主从复制(流复制)源码解析

2023-09-22 06:28:45
70
0

对于建立了主从关系的两台pg,其流复制的过程如下:

一、步骤

(1)启动主、备;

(2)备启动startup进程;

(3)备启动walreceiver进程;

(4)walreceiver向主发送连接请求;

(5)当主收到连接请求,启动walsender进程,建立walsender与walreceiver之间的TCP连接;

(6)walreceiver发送备最新LSN(握手机制);

(7)walsender将主库最新LSN到备库最新的LSN之间的wal 传递给walreceiver 。这个阶段就是备库追赶主库的阶段。

(8)流复制开始工作。

二、调用顺序

Startup进程 ---》 Walreceiver进程 ---》Walsender进程。

  • Startup进程通常是负责apply日志的。因此需要Wal日志,它会唤醒Walreceiver从Walsender获取Wal日志。
  • Walreceiver收到Startup的唤醒信号后,会通过握手机制向Walsender发起连接请求,并且Walreceiver会告诉主机备机的最新LSN
  • Walsender进程根据Walreceiver发来的备机最新的LSN,发送主库的最新WAL日志给Walreceiver进程。并且根据Walsender进程同步级别,选择是否等待Walreceiver发来的feedback。
  • Walreceiver进程就像是Startup和Walsender通信桥梁。Startup进程需要的Wal日志都是通过Walreceiver获得的。

因此在主从复制中,Startup和Walreceiver之间的交互就显得很重要。Walsender进程主要就是发送主库的最新WAL日志给Walreceiver进程。由于时间关系,该部分的源码分析就不细说。下面主要学习Startup进程Walreceiver进程之间函数调用的逻辑关系。

Startup进程

根据WAL机制,应该先WAL日志落盘,然后再数据落盘。当缓存中的脏页还未被刷入磁盘时,如果数据库宕机了,则数据库的数据处于不一致的状态。在数据库重新启动时,需要借助WAL日志将数据库恢复到一致的状态。

基于上述:Startup进程的作用如下:

  • 崩溃后的恢复工作:Startup是崩溃恢复最核心的进程。它在pg启动时就会被调用,读取配置信息,应用WAL日志;
  • 主从结构中从库应用WAL日志重放;
  • PITR(基于时间点的恢复);

对于一个在主从结构Startup进程,其调用过程如下所示:

一、StartupXLOG() 源码分析

src\backend\access\transam\xlog.c

获取恢复起点

从哪里开始恢复数据,这是Startup恢复数据需要做的第一步。通常,在pg中有两种获取恢复起点的方式,他们的优先级:backup_label File >> controlFile。

  • ControlFile:Backup start location: 会有wal日志备份恢复的redo点。
  • backup_label File:START WAL LOCATION:例如pg_basebackup,会生成该文件,能够确定备份恢复的redo点。
  • backup_label file的记录如下:
CHECKPOINT LOCATION 创建检查点的LSN号
START WAL LOCATION WAL恢复的起始位置
BACKUP METHOD 备份方法
BACKUP FROM 备份源来自主库还是备库
START TIME pg_start_backup 执行时间
LABEL 备份标识
START TIMELINE 时间线
void
StartupXLOG(void)
{
    XLogCtlInsert *Insert; //指向当前正在进行 WAL 插入的 XLogCtlInsert 结构体的指针。
    CheckPoint    checkPoint; //用于存储检查点的信息,包括位置、时间等。
    bool        wasShutdown;  //标志位,指示数据库是否在上次关闭时发生了崩溃。
    bool        reachedStopPoint = false; //标志位,指示是否已经达到了停止点(stop point)。
    bool        haveBackupLabel = false; //标志位,指示是否存在备份标签文件。
    bool        haveTblspcMap = false; //标志位,指示是否存在表空间映射文件。
    XLogRecPtr    RecPtr,  //当前的 WAL 记录位置。
                checkPointLoc, //检查点记录的位置。
                EndOfLog;  //WAL 日志的结束位置。
    TimeLineID    EndOfLogTLI;  //WAL 日志的结束时间线。
    TimeLineID    PrevTimeLineID;  //上一个时间线的标识。
    XLogRecord *record;  //指向当前读取的 WAL 记录的指针。
    TransactionId oldestActiveXID;  //最旧的活跃事务的事务ID。
    bool        backupEndRequired = false;  //标志位,指示是否需要备份结束点。
    bool        backupFromStandby = false; //标志位,指示是否从备机进行备份。
    DBState        dbstate_at_startup;  //数据库启动时的状态。
    XLogReaderState *xlogreader;  //XLOG 读取器的状态。
    XLogPageReadPrivate private;  
    bool        fast_promoted = false;//标志位,指示是否进行了快速提升(fast promotion)
    struct stat st;//用于存储文件状态的结构体。

  //...前期准备工作。
  //1、确定redo位点,如果有backup_label文件
    if (read_backup_label(&checkPointLoc, &backupEndRequired,
                          &backupFromStandby))
    {
        List       *tablespaces = NIL;

        /*
         * Archive recovery was requested, and thanks to the backup label
         * file, we know how far we need to replay to reach consistency. Enter
         * archive recovery directly.
         */
        InArchiveRecovery = true;
        if (StandbyModeRequested)
            StandbyMode = true;    //设置为startup从库模式

        /*
      如果有backup_label file,优先使用该文件获取检查点的信息,而不是pg_control。
         */
        record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
        //...
        /* set flag to delete it later */
        haveBackupLabel = true;
    }
    else
    //没有backup_label文件,则看control文件
    {
        //...
        if (ArchiveRecoveryRequested &&
            (ControlFile->minRecoveryPoint != InvalidXLogRecPtr ||
             ControlFile->backupEndRequired ||
             ControlFile->backupEndPoint != InvalidXLogRecPtr ||
             ControlFile->state == DB_SHUTDOWNED))
        {
            InArchiveRecovery = true;
            if (StandbyModeRequested)
                StandbyMode = true;  //设置为startup从库模式
        }

        /* Get the last valid checkpoint record. */
        checkPointLoc = ControlFile->checkPoint;
        RedoStartLSN = ControlFile->checkPointCopy.redo;
        record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, true);
        if (record != NULL)
        {
            ereport(DEBUG1,
                    (errmsg("checkpoint record is at %X/%X",
                            (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
        }
        else
        {
            ereport(PANIC,
                    (errmsg("could not locate a valid checkpoint record")));
        }
        memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
        wasShutdown = ((record->xl_info & ~XLR_INFO_MASK) == XLOG_CHECKPOINT_SHUTDOWN);
    }
    //...
    //2、启动replay日志记录。InRecovery = true;
    /*
    InRecovery:如果为true,应该理解为进程正在replay日志记录,而不是系统正处于恢复模式,后者应该通                 过RecoveryInProgress() 确定。
    ArchiveRecoveryRequested:请求进行归档日志恢复
   InArchiveRecovery:若为true,说明当前在使用归档日志恢复(通常在执行PITR、或者是从库);若为false,说明当前仅使用pg_wal目录中的wal日志进行恢复(通常是崩溃恢复阶段)
    /*
    /*
     * Check whether we need to force recovery from WAL.  If it appears to
     * have been a clean shutdown and we did not have a recovery signal file,
     * then assume no recovery needed.
     */
    if (checkPoint.redo < RecPtr)
    {
        if (wasShutdown)
            ereport(PANIC,
                    (errmsg("invalid redo record in shutdown checkpoint")));
        InRecovery = true;
    }
    else if (ControlFile->state != DB_SHUTDOWNED)
        InRecovery = true;
    else if (ArchiveRecoveryRequested)
    {
        /* force recovery due to presence of recovery signal file */
        InRecovery = true;
    }

日志来源

Startup进程的3大作用——崩溃恢复、从库日志应用、PITR,对于不同的用途,也有不同的日志来源。pg维护一个专门的状态机(称作state machine)表示待应用日志的来源,用于在不同时段,从不同的日志源获取WAL日志。对应代码如下(在xlog.c):

/*
 */
typedef enum
{
    XLOG_FROM_ANY = 0,          /* request to read WAL from any source */
    XLOG_FROM_ARCHIVE,          /* restored using restore_command,从归档日志中获取 */
    XLOG_FROM_PG_WAL,           /* existing file in pg_wal,从pg_wal目录获取 */
    XLOG_FROM_STREAM            /* streamed from primary,备库从主库获取 */
} XLogSource;

恢复模式(Reply)

/* REDO开始 */
    if (InRecovery)
    {
        int         rmid;
 
        /*
       1、更新控制文件状态以示我们正在恢复,并且展示我们选择作为恢复起点的检查点位置。另外还会用从备份历史文件获取的最小恢复结束位置(minimum recovery stop point)标记控制文件
         */
        /* 先保存控制文件中的状态,然后更新 */
        dbstate_at_startup = ControlFile->state;
        /* 如果在使用归档日志进行恢复(PITR或者从库),更新状态 */
        if (InArchiveRecovery)
        {
            ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
            SpinLockAcquire(&XLogCtl->info_lck);
            XLogCtl->SharedRecoveryState = RECOVERY_STATE_ARCHIVE;
            SpinLockRelease(&XLogCtl->info_lck);
        }
        /* 否则,如果在使用WAL文件进行恢复(崩溃恢复) */
        else
        {
           //...
        }
 
        /* 更新控制文件中检查点信息 */
        ControlFile->checkPoint = checkPointLoc;
        ControlFile->checkPointCopy = checkPoint;
        if (InArchiveRecovery)
        {
            /* initialize minRecoveryPoint if not set yet,最小恢复点不应该小于重做点 */
            if (ControlFile->minRecoveryPoint < checkPoint.redo)
            {
                ControlFile->minRecoveryPoint = checkPoint.redo;
                ControlFile->minRecoveryPointTLI = checkPoint.ThisTimeLineID;
            }
        }
 
        /* 如果有backup_label文件 */
        if (haveBackupLabel)
        {
            ControlFile->backupStartPoint = checkPoint.redo;
            ControlFile->backupEndRequired = backupEndRequired;
 
            /* 如果是从从库备份的 */
            if (backupFromStandby)
            {
                /* 只可能是以下两种状态,若不是则报错 */
                if (dbstate_at_startup != DB_IN_ARCHIVE_RECOVERY &&
                    dbstate_at_startup != DB_SHUTDOWNED_IN_RECOVERY)
                    ereport(FATAL,
                            (errmsg("backup_label contains data inconsistent with control file"),
                             errhint("This means that the backup is corrupted and you will "
                                     "have to use another backup for recovery.")));
                /* 若是,则更新备份结束点 */
                ControlFile->backupEndPoint = ControlFile->minRecoveryPoint;
            }
        }
        /* 更新时间 */
        ControlFile->time = (pg_time_t) time(NULL);
        /* No need to hold ControlFileLock yet, we aren't up far enough */
        UpdateControlFile();
        
         //2、...初始化clog、subtrans、事务环境等。初始化redo资源管理器
    
        /*
         /3、读取还未reply的第一条record。
            */
        if (checkPoint.redo < RecPtr) // 检查点的重做位置(checkPoint.redo)小于当前的记录位置(RecPtr)检查点的下一条。
        {
            /* back up to find the record */
            record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
        }
        else
        {
            /* just have to read next record after CheckPoint */    //CheckPoint下一条record。
            record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
        }
        if (record != NULL)
        {
          //4、redo操作
        }
        //...

redo操作:在Rmgr中,其中最重要的就是heap_redo函数。heap_redo函数主要就是一个switch语句,根据不同类型的WAL记录,执行不同的操作。例如对于insert操作,XLOG_HEAP_INSERT,则执行heap_xlog_insert。

void
heap_redo(XLogReaderState *record)
{
    uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
    /*
     * These operations don't overwrite MVCC data so no conflict processing is
     * required. The ones in heap2 rmgr do.
     */
 
    switch (info & XLOG_HEAP_OPMASK)
    {
        case XLOG_HEAP_INSERT:
            heap_xlog_insert(record);
            break;
        case XLOG_HEAP_DELETE:
            heap_xlog_delete(record);
            break;
        case XLOG_HEAP_UPDATE:
            heap_xlog_update(record, false);
            break;
        case XLOG_HEAP_TRUNCATE:
 
            /*
             * TRUNCATE is a no-op because the actions are already logged as
             * SMGR WAL records.  TRUNCATE WAL record only exists for logical
             * decoding.
             */
            break;
 …
        default:
            elog(PANIC, "heap_redo: unknown op code %u", info);
    }
}   

二、总结

  1. 读取backup_label File或者pg_control文件,找到redo位点;读取配置文件、若为主从模式,则设置为startup从库模式。
  2. 开始reply(恢复)前的初始化工作:初始化clog、subtrans、事务环境等。初始化redo资源管理器。
  3. 读取WAL record(ReadRecord)。 如果record不存在需要调用XLogPageRead->WaitForWALToBecomeAvailable->RequestXLogStreaming唤醒walreceiver从walsender获取WAL record。
  4. 对读取的WAL record进行redo,通过record->xl_rmid信息,调用相应的redo资源管理器进行redo操作。
  5. 更新共享内存中XLogCtlData的apply位点和时间线。
  6. 回到步骤3,读取next WAL record。

Walreceiver进程

当主从关系建立后,主机插入一条数据,跟踪从机的Walreciver进程,其函数调用关系如下(从下至上):

WaitEventSetWaitBlock(WaitEventSet * set, int cur_timeout, WaitEvent * occurred_events, int nevents) 
WaitEventSetWait(WaitEventSet * set, long timeout, WaitEvent * occurred_events, int nevents, uint32 wait_event_info) 
walrcv_receive();WaitLatchOrSocket()
WalReceiverMain() 
AuxiliaryProcessMain(int argc, char ** argv)
StartChildProcess(AuxProcType type) 
MaybeStartWalReceiver() 
sigusr1_handler
ServerLoop()
PostmasterMain(int argc, char ** argv) 
main(int argc, char ** argv) 

WalReceiverMain

\src\backend\replication\walreceiver.c

void
WalReceiverMain(void)
{
    char        conninfo[MAXCONNINFO]; //用于存储连接信息的字符数组。
    char       *tmp_conninfo;  //用于存储临时连接信息的指针。
    char        slotname[NAMEDATALEN]; //用于存储槽位名称的字符数组。
    XLogRecPtr    startpoint; //用于存储起始位置的 XLogRecPtr。
    TimeLineID    startpointTLI;  //用于存储起始位置的时间线标识符(TimeLineID)。
    TimeLineID    primaryTLI; //用于存储主服务器的时间线标识符。
    bool        first_stream; //用于标识是否是第一次进行流复制。
    WalRcvData *walrcv = WalRcv; //指向 WalRcvData 结构体的指针。
    TimestampTz last_recv_timestamp;  //上次接收到数据的时间戳。
    TimestampTz now;  //当前时间戳。
    bool        ping_sent;  //标识是否已发送心跳包。
    char       *err;
    char       *sender_host = NULL;  //发送者的主机名。
    int            sender_port = 0;  //发送者的端口号。

    /*
     WalRcv应该已经设置好了(如果我们是后端,我们通过fork()或EXEC_BACKEND机制从postmaster继承)。
     */
    Assert(walrcv != NULL);
    now = GetCurrentTimestamp();

    //...设置walreceiver状态(walRCVState)

    /* 1、在此之前,Startup已经读取了primary_conninfo。WalReceiver获取关键信息,walrcv->conninfo、walrcv->slotname、walrcv->receiveStart、walrcv->receiveStartTLI*/
    walrcv->ready_to_display = false;
    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);  //主机IP,Port
    strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); //复制槽名称
    startpoint = walrcv->receiveStart;   //备机这边的apply点(最新LSN)
    startpointTLI = walrcv->receiveStartTLI; //备机TLI

     //...一些信号处理

    /*1-续:根据获取的信息与主建立tcp链接 */
    wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
    if (!wrconn)
        ereport(ERROR,
                (errmsg("could not connect to the primary server: %s", err)));

    //...更新或保存主的一些信息
   // 2、流复制正式开始。这里流复制有三个大循环。
    first_stream = true;
    for (;;)   //第一个大循环,只要流复制还在就不会退出
    {
        char       *primary_sysid;
        char        standby_sysid[32];
        WalRcvStreamOptions options;

        //2-续:...首先执行一些接收wal日志的前期工作。
        
        //step 3.1 执行wal_startstreaming开始启动流复制.进入第二个大循环
        if (walrcv_startstreaming(wrconn, &options))   
            {
            /* Loop until end-of-streaming or error */
            for (;;)   //第二个大循环 流复制结束后会退出。
            {
                char       *buf;
                int            len;
                bool        endofwal = false;
                pgsocket    wait_fd = PGINVALID_SOCKET;
                int            rc;
                //...

                /* step 3.1-续 接受WAL日志。这里有三种情况。 
                情况一:len > 0,一直有wal日志发送过来,一直读取并先写入XLogWalRcvProcessMsg缓冲区,至到len==0.
                情况二:len == 0,已经没有新的wal日志了。那么跳出去,进行flush,并通知startup。然后继续等(WaitLatchOr)
                情况三:len < 0 ,主自己断开了连接,这个时候也跳出去。进行flush,并通知startup。然后结束流复制。
                walrcv_receive:如果成功读取了一条消息,则返回其长度。如果连接关闭,则返回-1。否则,返回0表示没有可用的数据
                */
                len = walrcv_receive(wrconn, &buf, &wait_fd);
                if (len != 0)
                {
                    /*
                    开始接收data
                     */
                    for (;;)    //第三个大循环  len==0或者len =-1退出。都表示没有数据要接收了。
                    {
                        if (len > 0)
                        {
                            //通过XLogWalRcvProcessMsg正式接收Wal日志
                            last_recv_timestamp = GetCurrentTimestamp();
                            ping_sent = false;
                            XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
                        }
                        else if (len == 0)
                            break;
                        else if (len < 0)
                        {
                            ereport(LOG,
                                    (errmsg("replication terminated by primary server"),
                                     errdetail("End of WAL reached on timeline %u at %X/%X.",
                                               startpointTLI,
                                               (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
                            endofwal = true;
                            break;
                        }
                        len = walrcv_receive(wrconn, &buf, &wait_fd);
                    }

                    /*主库发送write位点、flush位点、apply位点,
                    发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),
                    避免vacuum删掉备库正在使用的记录 
                    */
                    XLogWalRcvSendReply(false, false);  

                    /*
                    WAL刷盘,同时唤醒Startup进程。
                     */
                    XLogWalRcvFlush(false);
                }

                /* 如果是因为主要求断开连接,len < 0,跳出大循环,结束流复制*/
                if (endofwal)   
                    break;
                //否则,就是len=0,那么继续监听WaitLatchOrSocket。
                Assert(wait_fd != PGINVALID_SOCKET);
                rc = WaitLatchOrSocket(walrcv->latch,
                                       WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
                                       WL_TIMEOUT | WL_LATCH_SET,
                                       wait_fd,
                                       NAPTIME_PER_CYCLE,
                                       WAIT_EVENT_WAL_RECEIVER_MAIN);
                if (rc & WL_LATCH_SET)
                {
                    ResetLatch(walrcv->latch);
                    ProcessWalRcvInterrupts();

                    if (walrcv->force_reply)
                    {
                        walrcv->force_reply = false;
                        pg_memory_barrier();
                        XLogWalRcvSendReply(true, false);
                    }
                }
                if (rc & WL_TIMEOUT)
                {

                    bool        requestReply = false;
                    if (wal_receiver_timeout > 0)
                    {
                        TimestampTz now = GetCurrentTimestamp();
                        TimestampTz timeout;

                        timeout =
                            TimestampTzPlusMilliseconds(last_recv_timestamp,
                                                        wal_receiver_timeout);
                        if (now >= timeout)
                            ereport(ERROR,
                                    (errmsg("terminating walreceiver due to timeout")));

                        if (!ping_sent)
                        {
                            timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
                                                                  (wal_receiver_timeout / 2));
                            if (now >= timeout)
                            {
                                requestReply = true;
                                ping_sent = true;
                            }
                        }
                    }                
                    XLogWalRcvSendReply(requestReply, requestReply);
                    XLogWalRcvSendHSFeedback(false);
                }
            }

            /*
              结束流复制,主那边自己断了。
             */
            walrcv_endstreaming(wrconn, &primaryTLI);

           //...
        }
        else
            ereport(LOG,
                    (errmsg("primary server contains no more WAL on requested timeline %u",
                            startpointTLI)));
        //4、最后收尾工作,再检查一遍还有没有wal日志没有flush
        if (recvFile >= 0)
        {
            char        xlogfname[MAXFNAMELEN];

            XLogWalRcvFlush(false);
            if (close(recvFile) != 0)
                ereport(PANIC,
                        (errcode_for_file_access(),
                         errmsg("could not close log segment %s: %m",
                                XLogFileNameP(recvFileTLI, recvSegNo))));

            /*
             * Create .done file forcibly to prevent the streamed segment from
             * being archived later.
             */
            XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
            if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
                XLogArchiveForceDone(xlogfname);
            else
                XLogArchiveNotify(xlogfname);
        }
        recvFile = -1;
        
        //5、WalRcvWaitForStartPosition函数等待startup进程更新receiveStart和receiveStartTLI,一旦更新,进入第一个大循环重新开始。
        elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
        WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
    }
    /* not reached */
}

WalReceiver进程进入工作状态后主要执行流程如下所示:

  1. WalReceiver进程进入流复制之前,Startup进程已经读取了primary_conninfo参数信息解析后填充到walrcv->conninfo、walrcv->slotname、walrcv->receiveStart、walrcv->receiveStartTLI等共享内存WalRcvData中。 WalReceiver进程通过walrcv_connect连接主库。
  2. 进入第一层死循环,执行identify_system命令,获取主库systemid/timeline/xlogpos等信息,执行TIMELINE_HISTORY命令拉取history文件,如果需要,则创建临时复制槽。
  3. 循环
    1. 执行wal_startstreaming开始启动流复制
      1. 通过walrcv_receive获取WAL日志,并向wal segment文件中写入xlog(XLogWalRcvWrite,写到缓冲区),期间也会回应主库发过来的心跳信息(发送write位点、flush位点、apply位点)
      2. 如果walrcv_receive获取不到数据,向主库发送write位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),避免vacuum删掉备库正在使用的记录,如果flush了XLOG逻辑位置唤醒startup进程(XLogWalRcvFlush),跳转到4
      3. 如果walrcv_receive获取EOF(<0),向主库发送write位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),避免vacuum删掉备库正在使用的记录,如果flush了XLOG逻辑位置唤醒startup进程,跳转到5
    2. 如果wal_startstreaming返回false,说明主库在该时间线上已经没有wal可以发送了,跳转到6
  4. WaitLatchOrSocket等待超时/网络可读/latch被触发:
    1. 等待超时,向主库发送接收位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),跳转到3.i.x
    2. startup进程通过latch唤醒WalReceiver进程,向主库发送write位点、flush位点、apply位点,跳转到3.i.x
  5. 执行walrcv_endstreaming结束流复制,接收wal sender发送过来的时间线文件,进入步骤6
  6. 关闭目前的xlog segment文件描述符,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),如果flush了XLOG逻辑位置唤醒startup进程,进入步骤7
  7. WalRcvWaitForStartPosition函数等待startup进程更新receiveStart和receiveStartTLI,一旦更新,进入步骤2。

WalReceiver&Startup之间有关交互的函数

一、回顾

在流复制启动过程中,三个进程的启动顺序是从备库到主库,即:startup ---> walreceiver ---> walsender。 当Startup处于主从复制流中时,从primary 节点以流复制方式获取wal日志,这时startup会发送信号请求唤醒wal receiver进程从Primary节点来获取wal数据。

二、几个交互重要函数

1、如果Startup时候,record不存在,需要调用XLogPageRead->WaitForWALToBecomeAvailable->RequestXLogStreaming唤醒walreceiver从walsender获取WAL record。

  • walrcv->receiveStart 用于记录已接收的最后一个 WAL 记录的位置(LSN),它的作用是跟踪 WAL reciever进程在主服务器上的复制进度。以便在下一次接收时从该位置开始。
  • walrcv->receiveStartTLI 用于记录当前接收到的 WAL 日志的Timeline ID。它的作用是跟踪 WAL reciever在主服务器上的复制进度,并确保从服务器上的复制流程正确地跟随主服务器的时间线切换。
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot) {
    WalRcvData *walrcv = WalRcv;
    ...
    walrcv->receiveStart = recptr;
    walrcv->receiveStartTLI = tli;
    latch = walrcv->latch;
    SpinLockRelease(&walrcv->mutex);
    // startup进程将WalReceiver状态从WALRCV_STOPPED转变为WALRCV_STARTING时,设置launch为true,向postmaster请求启动WalReceiver进程
    // startup进程通过latch唤醒WalReceiver进程
    if (launch) SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
    else if (latch) SetLatch(latch);
}

2、WalReceiver进程调用XLogWalRcvFlush函数,如果已经flush了XLOG逻辑位置,则唤醒Startup进程进行apply日志。

static void XLogWalRcvFlush(bool dying) {
    if (LogstreamResult.Flush < LogstreamResult.Write) { //进行flush
        WalRcvData *walrcv = WalRcv;
        issue_xlog_fsync(recvFile, recvSegNo);
        LogstreamResult.Flush = LogstreamResult.Write;
        /* Update shared-memory status 更新一些状态*/
        SpinLockAcquire(&walrcv->mutex);
        if (walrcv->flushedUpto < LogstreamResult.Flush) {
            walrcv->latestChunkStart = walrcv->flushedUpto;
            walrcv->flushedUpto = LogstreamResult.Flush;
            walrcv->receivedTLI = ThisTimeLineID;
        }
        SpinLockRelease(&walrcv->mutex);        
        WakeupRecovery(); /* Signal the startup process and walsender that new WAL has arrived */ // 唤醒startup进程
        if (AllowCascadeReplication()) WalSndWakeup();      
        if (!dying) { /* Also let the primary know that we made some progress 给主机*/
            XLogWalRcvSendReply(false, false);
            XLogWalRcvSendHSFeedback(false);
        }
    }
}

3、walReceiver进程WalRcvWaitForStartPosition函数等待Startup进程设置receiveStart和receiveStartTLI

static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) {
    WalRcvData *walrcv = WalRcv;
    int         state;
    SpinLockAcquire(&walrcv->mutex);
    state = walrcv->walRcvState;
    if (state != WALRCV_STREAMING) {
        SpinLockRelease(&walrcv->mutex);
        if (state == WALRCV_STOPPING) proc_exit(0);
        else elog(FATAL, "unexpected walreceiver state");
    }
    walrcv->walRcvState = WALRCV_WAITING;
    walrcv->receiveStart = InvalidXLogRecPtr;
    walrcv->receiveStartTLI = 0;
    SpinLockRelease(&walrcv->mutex);
    /* nudge startup process to notice that we've stopped streaming and are now waiting for instructions. */
    WakeupRecovery();
    for (;;) {
        ResetLatch(MyLatch);
        ProcessWalRcvInterrupts();
        SpinLockAcquire(&walrcv->mutex);
        if (walrcv->walRcvState == WALRCV_RESTARTING) {
            /* No need to handle changes in primary_conninfo or primary_slotname here. Startup process will signal us to terminate in case those change. */
            *startpoint = walrcv->receiveStart;
            *startpointTLI = walrcv->receiveStartTLI;
            walrcv->walRcvState = WALRCV_STREAMING;
            SpinLockRelease(&walrcv->mutex);
            break;
        }
        if (walrcv->walRcvState == WALRCV_STOPPING) { /* We should've received SIGTERM if the startup process wants us to die, but might as well check it here too. */
            SpinLockRelease(&walrcv->mutex);
            exit(1);
        }
        SpinLockRelease(&walrcv->mutex);
        (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, WAIT_EVENT_WAL_RECEIVER_WAIT_START);
    }
}

4、startup进程则通过WaitForWALToBecomeAvailable->RequestXLogStreaming流程设置receiveStart和receiveStartTLI,并且是通过latch唤醒WalReceiver进程。

关系图:

三、同步级别

在主从复制中,synchronous_commit参数是WAL相关配置参数,用于指定当数据库提交事务时是否需要等待WAL日志写入硬盘后才向客户端返回成功,这个参数可选值多(on、off、local、remote_write、remote_apply 五种)

synchronous_commit参数:

同步等级 设定值 概述 保证范围
同步 remote_apply 应用发起提交后,等到在备库上应用WAL(更新数据)后,返回COMMIT响应,由于完全保证了数据同步,因此它适合需要备库始终保持最新数据的负载分配场景。 1-9
同步 on(默认) 应用发起提交后,在备库上写入WAL后,返回COMMIT响应。该选项是性能和可靠性之间的最佳平衡。 1-6
准同步 remote_write 应用发起提交后,等到WAL已传输到备库后,返回COMMIT响应。 1-5
异步 local 应用发起提交后,写入主库WAL之后,返回COMMIT响应。 1-2
异步 off 应用发起提交后,直接返回COMMIT响应,而无需等待主库WAL完成写入。 1

主库synchronous_commit = remote_apply:每当startup进程进行了apply xlog records时,startup进程都会调用此方法,以便 walreceiver 可以检查它是否需要将apply通知发送回主节点。(该部分会在WaitForWALToBecomeAvailable进行调用,只有当startup进程已经apply完了,才会调用这个函数唤醒walreciever。)

void WalRcvForceReply(void) {
    Latch      *latch;
    WalRcv->force_reply = true;  // 设置强制回复标志    
    SpinLockAcquire(&WalRcv->mutex); /* fetching the latch pointer might not be atomic, so use spinlock */
    latch = WalRcv->latch;
    SpinLockRelease(&WalRcv->mutex);
    if (latch) SetLatch(latch);
}
0条评论
0 / 1000
8****m
4文章数
0粉丝数
8****m
4 文章 | 0 粉丝
8****m
4文章数
0粉丝数
8****m
4 文章 | 0 粉丝
原创

主从复制(流复制)源码解析

2023-09-22 06:28:45
70
0

对于建立了主从关系的两台pg,其流复制的过程如下:

一、步骤

(1)启动主、备;

(2)备启动startup进程;

(3)备启动walreceiver进程;

(4)walreceiver向主发送连接请求;

(5)当主收到连接请求,启动walsender进程,建立walsender与walreceiver之间的TCP连接;

(6)walreceiver发送备最新LSN(握手机制);

(7)walsender将主库最新LSN到备库最新的LSN之间的wal 传递给walreceiver 。这个阶段就是备库追赶主库的阶段。

(8)流复制开始工作。

二、调用顺序

Startup进程 ---》 Walreceiver进程 ---》Walsender进程。

  • Startup进程通常是负责apply日志的。因此需要Wal日志,它会唤醒Walreceiver从Walsender获取Wal日志。
  • Walreceiver收到Startup的唤醒信号后,会通过握手机制向Walsender发起连接请求,并且Walreceiver会告诉主机备机的最新LSN
  • Walsender进程根据Walreceiver发来的备机最新的LSN,发送主库的最新WAL日志给Walreceiver进程。并且根据Walsender进程同步级别,选择是否等待Walreceiver发来的feedback。
  • Walreceiver进程就像是Startup和Walsender通信桥梁。Startup进程需要的Wal日志都是通过Walreceiver获得的。

因此在主从复制中,Startup和Walreceiver之间的交互就显得很重要。Walsender进程主要就是发送主库的最新WAL日志给Walreceiver进程。由于时间关系,该部分的源码分析就不细说。下面主要学习Startup进程Walreceiver进程之间函数调用的逻辑关系。

Startup进程

根据WAL机制,应该先WAL日志落盘,然后再数据落盘。当缓存中的脏页还未被刷入磁盘时,如果数据库宕机了,则数据库的数据处于不一致的状态。在数据库重新启动时,需要借助WAL日志将数据库恢复到一致的状态。

基于上述:Startup进程的作用如下:

  • 崩溃后的恢复工作:Startup是崩溃恢复最核心的进程。它在pg启动时就会被调用,读取配置信息,应用WAL日志;
  • 主从结构中从库应用WAL日志重放;
  • PITR(基于时间点的恢复);

对于一个在主从结构Startup进程,其调用过程如下所示:

一、StartupXLOG() 源码分析

src\backend\access\transam\xlog.c

获取恢复起点

从哪里开始恢复数据,这是Startup恢复数据需要做的第一步。通常,在pg中有两种获取恢复起点的方式,他们的优先级:backup_label File >> controlFile。

  • ControlFile:Backup start location: 会有wal日志备份恢复的redo点。
  • backup_label File:START WAL LOCATION:例如pg_basebackup,会生成该文件,能够确定备份恢复的redo点。
  • backup_label file的记录如下:
CHECKPOINT LOCATION 创建检查点的LSN号
START WAL LOCATION WAL恢复的起始位置
BACKUP METHOD 备份方法
BACKUP FROM 备份源来自主库还是备库
START TIME pg_start_backup 执行时间
LABEL 备份标识
START TIMELINE 时间线
void
StartupXLOG(void)
{
    XLogCtlInsert *Insert; //指向当前正在进行 WAL 插入的 XLogCtlInsert 结构体的指针。
    CheckPoint    checkPoint; //用于存储检查点的信息,包括位置、时间等。
    bool        wasShutdown;  //标志位,指示数据库是否在上次关闭时发生了崩溃。
    bool        reachedStopPoint = false; //标志位,指示是否已经达到了停止点(stop point)。
    bool        haveBackupLabel = false; //标志位,指示是否存在备份标签文件。
    bool        haveTblspcMap = false; //标志位,指示是否存在表空间映射文件。
    XLogRecPtr    RecPtr,  //当前的 WAL 记录位置。
                checkPointLoc, //检查点记录的位置。
                EndOfLog;  //WAL 日志的结束位置。
    TimeLineID    EndOfLogTLI;  //WAL 日志的结束时间线。
    TimeLineID    PrevTimeLineID;  //上一个时间线的标识。
    XLogRecord *record;  //指向当前读取的 WAL 记录的指针。
    TransactionId oldestActiveXID;  //最旧的活跃事务的事务ID。
    bool        backupEndRequired = false;  //标志位,指示是否需要备份结束点。
    bool        backupFromStandby = false; //标志位,指示是否从备机进行备份。
    DBState        dbstate_at_startup;  //数据库启动时的状态。
    XLogReaderState *xlogreader;  //XLOG 读取器的状态。
    XLogPageReadPrivate private;  
    bool        fast_promoted = false;//标志位,指示是否进行了快速提升(fast promotion)
    struct stat st;//用于存储文件状态的结构体。

  //...前期准备工作。
  //1、确定redo位点,如果有backup_label文件
    if (read_backup_label(&checkPointLoc, &backupEndRequired,
                          &backupFromStandby))
    {
        List       *tablespaces = NIL;

        /*
         * Archive recovery was requested, and thanks to the backup label
         * file, we know how far we need to replay to reach consistency. Enter
         * archive recovery directly.
         */
        InArchiveRecovery = true;
        if (StandbyModeRequested)
            StandbyMode = true;    //设置为startup从库模式

        /*
      如果有backup_label file,优先使用该文件获取检查点的信息,而不是pg_control。
         */
        record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
        //...
        /* set flag to delete it later */
        haveBackupLabel = true;
    }
    else
    //没有backup_label文件,则看control文件
    {
        //...
        if (ArchiveRecoveryRequested &&
            (ControlFile->minRecoveryPoint != InvalidXLogRecPtr ||
             ControlFile->backupEndRequired ||
             ControlFile->backupEndPoint != InvalidXLogRecPtr ||
             ControlFile->state == DB_SHUTDOWNED))
        {
            InArchiveRecovery = true;
            if (StandbyModeRequested)
                StandbyMode = true;  //设置为startup从库模式
        }

        /* Get the last valid checkpoint record. */
        checkPointLoc = ControlFile->checkPoint;
        RedoStartLSN = ControlFile->checkPointCopy.redo;
        record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, true);
        if (record != NULL)
        {
            ereport(DEBUG1,
                    (errmsg("checkpoint record is at %X/%X",
                            (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
        }
        else
        {
            ereport(PANIC,
                    (errmsg("could not locate a valid checkpoint record")));
        }
        memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
        wasShutdown = ((record->xl_info & ~XLR_INFO_MASK) == XLOG_CHECKPOINT_SHUTDOWN);
    }
    //...
    //2、启动replay日志记录。InRecovery = true;
    /*
    InRecovery:如果为true,应该理解为进程正在replay日志记录,而不是系统正处于恢复模式,后者应该通                 过RecoveryInProgress() 确定。
    ArchiveRecoveryRequested:请求进行归档日志恢复
   InArchiveRecovery:若为true,说明当前在使用归档日志恢复(通常在执行PITR、或者是从库);若为false,说明当前仅使用pg_wal目录中的wal日志进行恢复(通常是崩溃恢复阶段)
    /*
    /*
     * Check whether we need to force recovery from WAL.  If it appears to
     * have been a clean shutdown and we did not have a recovery signal file,
     * then assume no recovery needed.
     */
    if (checkPoint.redo < RecPtr)
    {
        if (wasShutdown)
            ereport(PANIC,
                    (errmsg("invalid redo record in shutdown checkpoint")));
        InRecovery = true;
    }
    else if (ControlFile->state != DB_SHUTDOWNED)
        InRecovery = true;
    else if (ArchiveRecoveryRequested)
    {
        /* force recovery due to presence of recovery signal file */
        InRecovery = true;
    }

日志来源

Startup进程的3大作用——崩溃恢复、从库日志应用、PITR,对于不同的用途,也有不同的日志来源。pg维护一个专门的状态机(称作state machine)表示待应用日志的来源,用于在不同时段,从不同的日志源获取WAL日志。对应代码如下(在xlog.c):

/*
 */
typedef enum
{
    XLOG_FROM_ANY = 0,          /* request to read WAL from any source */
    XLOG_FROM_ARCHIVE,          /* restored using restore_command,从归档日志中获取 */
    XLOG_FROM_PG_WAL,           /* existing file in pg_wal,从pg_wal目录获取 */
    XLOG_FROM_STREAM            /* streamed from primary,备库从主库获取 */
} XLogSource;

恢复模式(Reply)

/* REDO开始 */
    if (InRecovery)
    {
        int         rmid;
 
        /*
       1、更新控制文件状态以示我们正在恢复,并且展示我们选择作为恢复起点的检查点位置。另外还会用从备份历史文件获取的最小恢复结束位置(minimum recovery stop point)标记控制文件
         */
        /* 先保存控制文件中的状态,然后更新 */
        dbstate_at_startup = ControlFile->state;
        /* 如果在使用归档日志进行恢复(PITR或者从库),更新状态 */
        if (InArchiveRecovery)
        {
            ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
            SpinLockAcquire(&XLogCtl->info_lck);
            XLogCtl->SharedRecoveryState = RECOVERY_STATE_ARCHIVE;
            SpinLockRelease(&XLogCtl->info_lck);
        }
        /* 否则,如果在使用WAL文件进行恢复(崩溃恢复) */
        else
        {
           //...
        }
 
        /* 更新控制文件中检查点信息 */
        ControlFile->checkPoint = checkPointLoc;
        ControlFile->checkPointCopy = checkPoint;
        if (InArchiveRecovery)
        {
            /* initialize minRecoveryPoint if not set yet,最小恢复点不应该小于重做点 */
            if (ControlFile->minRecoveryPoint < checkPoint.redo)
            {
                ControlFile->minRecoveryPoint = checkPoint.redo;
                ControlFile->minRecoveryPointTLI = checkPoint.ThisTimeLineID;
            }
        }
 
        /* 如果有backup_label文件 */
        if (haveBackupLabel)
        {
            ControlFile->backupStartPoint = checkPoint.redo;
            ControlFile->backupEndRequired = backupEndRequired;
 
            /* 如果是从从库备份的 */
            if (backupFromStandby)
            {
                /* 只可能是以下两种状态,若不是则报错 */
                if (dbstate_at_startup != DB_IN_ARCHIVE_RECOVERY &&
                    dbstate_at_startup != DB_SHUTDOWNED_IN_RECOVERY)
                    ereport(FATAL,
                            (errmsg("backup_label contains data inconsistent with control file"),
                             errhint("This means that the backup is corrupted and you will "
                                     "have to use another backup for recovery.")));
                /* 若是,则更新备份结束点 */
                ControlFile->backupEndPoint = ControlFile->minRecoveryPoint;
            }
        }
        /* 更新时间 */
        ControlFile->time = (pg_time_t) time(NULL);
        /* No need to hold ControlFileLock yet, we aren't up far enough */
        UpdateControlFile();
        
         //2、...初始化clog、subtrans、事务环境等。初始化redo资源管理器
    
        /*
         /3、读取还未reply的第一条record。
            */
        if (checkPoint.redo < RecPtr) // 检查点的重做位置(checkPoint.redo)小于当前的记录位置(RecPtr)检查点的下一条。
        {
            /* back up to find the record */
            record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
        }
        else
        {
            /* just have to read next record after CheckPoint */    //CheckPoint下一条record。
            record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
        }
        if (record != NULL)
        {
          //4、redo操作
        }
        //...

redo操作:在Rmgr中,其中最重要的就是heap_redo函数。heap_redo函数主要就是一个switch语句,根据不同类型的WAL记录,执行不同的操作。例如对于insert操作,XLOG_HEAP_INSERT,则执行heap_xlog_insert。

void
heap_redo(XLogReaderState *record)
{
    uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
    /*
     * These operations don't overwrite MVCC data so no conflict processing is
     * required. The ones in heap2 rmgr do.
     */
 
    switch (info & XLOG_HEAP_OPMASK)
    {
        case XLOG_HEAP_INSERT:
            heap_xlog_insert(record);
            break;
        case XLOG_HEAP_DELETE:
            heap_xlog_delete(record);
            break;
        case XLOG_HEAP_UPDATE:
            heap_xlog_update(record, false);
            break;
        case XLOG_HEAP_TRUNCATE:
 
            /*
             * TRUNCATE is a no-op because the actions are already logged as
             * SMGR WAL records.  TRUNCATE WAL record only exists for logical
             * decoding.
             */
            break;
 …
        default:
            elog(PANIC, "heap_redo: unknown op code %u", info);
    }
}   

二、总结

  1. 读取backup_label File或者pg_control文件,找到redo位点;读取配置文件、若为主从模式,则设置为startup从库模式。
  2. 开始reply(恢复)前的初始化工作:初始化clog、subtrans、事务环境等。初始化redo资源管理器。
  3. 读取WAL record(ReadRecord)。 如果record不存在需要调用XLogPageRead->WaitForWALToBecomeAvailable->RequestXLogStreaming唤醒walreceiver从walsender获取WAL record。
  4. 对读取的WAL record进行redo,通过record->xl_rmid信息,调用相应的redo资源管理器进行redo操作。
  5. 更新共享内存中XLogCtlData的apply位点和时间线。
  6. 回到步骤3,读取next WAL record。

Walreceiver进程

当主从关系建立后,主机插入一条数据,跟踪从机的Walreciver进程,其函数调用关系如下(从下至上):

WaitEventSetWaitBlock(WaitEventSet * set, int cur_timeout, WaitEvent * occurred_events, int nevents) 
WaitEventSetWait(WaitEventSet * set, long timeout, WaitEvent * occurred_events, int nevents, uint32 wait_event_info) 
walrcv_receive();WaitLatchOrSocket()
WalReceiverMain() 
AuxiliaryProcessMain(int argc, char ** argv)
StartChildProcess(AuxProcType type) 
MaybeStartWalReceiver() 
sigusr1_handler
ServerLoop()
PostmasterMain(int argc, char ** argv) 
main(int argc, char ** argv) 

WalReceiverMain

\src\backend\replication\walreceiver.c

void
WalReceiverMain(void)
{
    char        conninfo[MAXCONNINFO]; //用于存储连接信息的字符数组。
    char       *tmp_conninfo;  //用于存储临时连接信息的指针。
    char        slotname[NAMEDATALEN]; //用于存储槽位名称的字符数组。
    XLogRecPtr    startpoint; //用于存储起始位置的 XLogRecPtr。
    TimeLineID    startpointTLI;  //用于存储起始位置的时间线标识符(TimeLineID)。
    TimeLineID    primaryTLI; //用于存储主服务器的时间线标识符。
    bool        first_stream; //用于标识是否是第一次进行流复制。
    WalRcvData *walrcv = WalRcv; //指向 WalRcvData 结构体的指针。
    TimestampTz last_recv_timestamp;  //上次接收到数据的时间戳。
    TimestampTz now;  //当前时间戳。
    bool        ping_sent;  //标识是否已发送心跳包。
    char       *err;
    char       *sender_host = NULL;  //发送者的主机名。
    int            sender_port = 0;  //发送者的端口号。

    /*
     WalRcv应该已经设置好了(如果我们是后端,我们通过fork()或EXEC_BACKEND机制从postmaster继承)。
     */
    Assert(walrcv != NULL);
    now = GetCurrentTimestamp();

    //...设置walreceiver状态(walRCVState)

    /* 1、在此之前,Startup已经读取了primary_conninfo。WalReceiver获取关键信息,walrcv->conninfo、walrcv->slotname、walrcv->receiveStart、walrcv->receiveStartTLI*/
    walrcv->ready_to_display = false;
    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);  //主机IP,Port
    strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); //复制槽名称
    startpoint = walrcv->receiveStart;   //备机这边的apply点(最新LSN)
    startpointTLI = walrcv->receiveStartTLI; //备机TLI

     //...一些信号处理

    /*1-续:根据获取的信息与主建立tcp链接 */
    wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
    if (!wrconn)
        ereport(ERROR,
                (errmsg("could not connect to the primary server: %s", err)));

    //...更新或保存主的一些信息
   // 2、流复制正式开始。这里流复制有三个大循环。
    first_stream = true;
    for (;;)   //第一个大循环,只要流复制还在就不会退出
    {
        char       *primary_sysid;
        char        standby_sysid[32];
        WalRcvStreamOptions options;

        //2-续:...首先执行一些接收wal日志的前期工作。
        
        //step 3.1 执行wal_startstreaming开始启动流复制.进入第二个大循环
        if (walrcv_startstreaming(wrconn, &options))   
            {
            /* Loop until end-of-streaming or error */
            for (;;)   //第二个大循环 流复制结束后会退出。
            {
                char       *buf;
                int            len;
                bool        endofwal = false;
                pgsocket    wait_fd = PGINVALID_SOCKET;
                int            rc;
                //...

                /* step 3.1-续 接受WAL日志。这里有三种情况。 
                情况一:len > 0,一直有wal日志发送过来,一直读取并先写入XLogWalRcvProcessMsg缓冲区,至到len==0.
                情况二:len == 0,已经没有新的wal日志了。那么跳出去,进行flush,并通知startup。然后继续等(WaitLatchOr)
                情况三:len < 0 ,主自己断开了连接,这个时候也跳出去。进行flush,并通知startup。然后结束流复制。
                walrcv_receive:如果成功读取了一条消息,则返回其长度。如果连接关闭,则返回-1。否则,返回0表示没有可用的数据
                */
                len = walrcv_receive(wrconn, &buf, &wait_fd);
                if (len != 0)
                {
                    /*
                    开始接收data
                     */
                    for (;;)    //第三个大循环  len==0或者len =-1退出。都表示没有数据要接收了。
                    {
                        if (len > 0)
                        {
                            //通过XLogWalRcvProcessMsg正式接收Wal日志
                            last_recv_timestamp = GetCurrentTimestamp();
                            ping_sent = false;
                            XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
                        }
                        else if (len == 0)
                            break;
                        else if (len < 0)
                        {
                            ereport(LOG,
                                    (errmsg("replication terminated by primary server"),
                                     errdetail("End of WAL reached on timeline %u at %X/%X.",
                                               startpointTLI,
                                               (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
                            endofwal = true;
                            break;
                        }
                        len = walrcv_receive(wrconn, &buf, &wait_fd);
                    }

                    /*主库发送write位点、flush位点、apply位点,
                    发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),
                    避免vacuum删掉备库正在使用的记录 
                    */
                    XLogWalRcvSendReply(false, false);  

                    /*
                    WAL刷盘,同时唤醒Startup进程。
                     */
                    XLogWalRcvFlush(false);
                }

                /* 如果是因为主要求断开连接,len < 0,跳出大循环,结束流复制*/
                if (endofwal)   
                    break;
                //否则,就是len=0,那么继续监听WaitLatchOrSocket。
                Assert(wait_fd != PGINVALID_SOCKET);
                rc = WaitLatchOrSocket(walrcv->latch,
                                       WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
                                       WL_TIMEOUT | WL_LATCH_SET,
                                       wait_fd,
                                       NAPTIME_PER_CYCLE,
                                       WAIT_EVENT_WAL_RECEIVER_MAIN);
                if (rc & WL_LATCH_SET)
                {
                    ResetLatch(walrcv->latch);
                    ProcessWalRcvInterrupts();

                    if (walrcv->force_reply)
                    {
                        walrcv->force_reply = false;
                        pg_memory_barrier();
                        XLogWalRcvSendReply(true, false);
                    }
                }
                if (rc & WL_TIMEOUT)
                {

                    bool        requestReply = false;
                    if (wal_receiver_timeout > 0)
                    {
                        TimestampTz now = GetCurrentTimestamp();
                        TimestampTz timeout;

                        timeout =
                            TimestampTzPlusMilliseconds(last_recv_timestamp,
                                                        wal_receiver_timeout);
                        if (now >= timeout)
                            ereport(ERROR,
                                    (errmsg("terminating walreceiver due to timeout")));

                        if (!ping_sent)
                        {
                            timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
                                                                  (wal_receiver_timeout / 2));
                            if (now >= timeout)
                            {
                                requestReply = true;
                                ping_sent = true;
                            }
                        }
                    }                
                    XLogWalRcvSendReply(requestReply, requestReply);
                    XLogWalRcvSendHSFeedback(false);
                }
            }

            /*
              结束流复制,主那边自己断了。
             */
            walrcv_endstreaming(wrconn, &primaryTLI);

           //...
        }
        else
            ereport(LOG,
                    (errmsg("primary server contains no more WAL on requested timeline %u",
                            startpointTLI)));
        //4、最后收尾工作,再检查一遍还有没有wal日志没有flush
        if (recvFile >= 0)
        {
            char        xlogfname[MAXFNAMELEN];

            XLogWalRcvFlush(false);
            if (close(recvFile) != 0)
                ereport(PANIC,
                        (errcode_for_file_access(),
                         errmsg("could not close log segment %s: %m",
                                XLogFileNameP(recvFileTLI, recvSegNo))));

            /*
             * Create .done file forcibly to prevent the streamed segment from
             * being archived later.
             */
            XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
            if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
                XLogArchiveForceDone(xlogfname);
            else
                XLogArchiveNotify(xlogfname);
        }
        recvFile = -1;
        
        //5、WalRcvWaitForStartPosition函数等待startup进程更新receiveStart和receiveStartTLI,一旦更新,进入第一个大循环重新开始。
        elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
        WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
    }
    /* not reached */
}

WalReceiver进程进入工作状态后主要执行流程如下所示:

  1. WalReceiver进程进入流复制之前,Startup进程已经读取了primary_conninfo参数信息解析后填充到walrcv->conninfo、walrcv->slotname、walrcv->receiveStart、walrcv->receiveStartTLI等共享内存WalRcvData中。 WalReceiver进程通过walrcv_connect连接主库。
  2. 进入第一层死循环,执行identify_system命令,获取主库systemid/timeline/xlogpos等信息,执行TIMELINE_HISTORY命令拉取history文件,如果需要,则创建临时复制槽。
  3. 循环
    1. 执行wal_startstreaming开始启动流复制
      1. 通过walrcv_receive获取WAL日志,并向wal segment文件中写入xlog(XLogWalRcvWrite,写到缓冲区),期间也会回应主库发过来的心跳信息(发送write位点、flush位点、apply位点)
      2. 如果walrcv_receive获取不到数据,向主库发送write位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),避免vacuum删掉备库正在使用的记录,如果flush了XLOG逻辑位置唤醒startup进程(XLogWalRcvFlush),跳转到4
      3. 如果walrcv_receive获取EOF(<0),向主库发送write位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),避免vacuum删掉备库正在使用的记录,如果flush了XLOG逻辑位置唤醒startup进程,跳转到5
    2. 如果wal_startstreaming返回false,说明主库在该时间线上已经没有wal可以发送了,跳转到6
  4. WaitLatchOrSocket等待超时/网络可读/latch被触发:
    1. 等待超时,向主库发送接收位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),跳转到3.i.x
    2. startup进程通过latch唤醒WalReceiver进程,向主库发送write位点、flush位点、apply位点,跳转到3.i.x
  5. 执行walrcv_endstreaming结束流复制,接收wal sender发送过来的时间线文件,进入步骤6
  6. 关闭目前的xlog segment文件描述符,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),如果flush了XLOG逻辑位置唤醒startup进程,进入步骤7
  7. WalRcvWaitForStartPosition函数等待startup进程更新receiveStart和receiveStartTLI,一旦更新,进入步骤2。

WalReceiver&Startup之间有关交互的函数

一、回顾

在流复制启动过程中,三个进程的启动顺序是从备库到主库,即:startup ---> walreceiver ---> walsender。 当Startup处于主从复制流中时,从primary 节点以流复制方式获取wal日志,这时startup会发送信号请求唤醒wal receiver进程从Primary节点来获取wal数据。

二、几个交互重要函数

1、如果Startup时候,record不存在,需要调用XLogPageRead->WaitForWALToBecomeAvailable->RequestXLogStreaming唤醒walreceiver从walsender获取WAL record。

  • walrcv->receiveStart 用于记录已接收的最后一个 WAL 记录的位置(LSN),它的作用是跟踪 WAL reciever进程在主服务器上的复制进度。以便在下一次接收时从该位置开始。
  • walrcv->receiveStartTLI 用于记录当前接收到的 WAL 日志的Timeline ID。它的作用是跟踪 WAL reciever在主服务器上的复制进度,并确保从服务器上的复制流程正确地跟随主服务器的时间线切换。
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot) {
    WalRcvData *walrcv = WalRcv;
    ...
    walrcv->receiveStart = recptr;
    walrcv->receiveStartTLI = tli;
    latch = walrcv->latch;
    SpinLockRelease(&walrcv->mutex);
    // startup进程将WalReceiver状态从WALRCV_STOPPED转变为WALRCV_STARTING时,设置launch为true,向postmaster请求启动WalReceiver进程
    // startup进程通过latch唤醒WalReceiver进程
    if (launch) SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
    else if (latch) SetLatch(latch);
}

2、WalReceiver进程调用XLogWalRcvFlush函数,如果已经flush了XLOG逻辑位置,则唤醒Startup进程进行apply日志。

static void XLogWalRcvFlush(bool dying) {
    if (LogstreamResult.Flush < LogstreamResult.Write) { //进行flush
        WalRcvData *walrcv = WalRcv;
        issue_xlog_fsync(recvFile, recvSegNo);
        LogstreamResult.Flush = LogstreamResult.Write;
        /* Update shared-memory status 更新一些状态*/
        SpinLockAcquire(&walrcv->mutex);
        if (walrcv->flushedUpto < LogstreamResult.Flush) {
            walrcv->latestChunkStart = walrcv->flushedUpto;
            walrcv->flushedUpto = LogstreamResult.Flush;
            walrcv->receivedTLI = ThisTimeLineID;
        }
        SpinLockRelease(&walrcv->mutex);        
        WakeupRecovery(); /* Signal the startup process and walsender that new WAL has arrived */ // 唤醒startup进程
        if (AllowCascadeReplication()) WalSndWakeup();      
        if (!dying) { /* Also let the primary know that we made some progress 给主机*/
            XLogWalRcvSendReply(false, false);
            XLogWalRcvSendHSFeedback(false);
        }
    }
}

3、walReceiver进程WalRcvWaitForStartPosition函数等待Startup进程设置receiveStart和receiveStartTLI

static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) {
    WalRcvData *walrcv = WalRcv;
    int         state;
    SpinLockAcquire(&walrcv->mutex);
    state = walrcv->walRcvState;
    if (state != WALRCV_STREAMING) {
        SpinLockRelease(&walrcv->mutex);
        if (state == WALRCV_STOPPING) proc_exit(0);
        else elog(FATAL, "unexpected walreceiver state");
    }
    walrcv->walRcvState = WALRCV_WAITING;
    walrcv->receiveStart = InvalidXLogRecPtr;
    walrcv->receiveStartTLI = 0;
    SpinLockRelease(&walrcv->mutex);
    /* nudge startup process to notice that we've stopped streaming and are now waiting for instructions. */
    WakeupRecovery();
    for (;;) {
        ResetLatch(MyLatch);
        ProcessWalRcvInterrupts();
        SpinLockAcquire(&walrcv->mutex);
        if (walrcv->walRcvState == WALRCV_RESTARTING) {
            /* No need to handle changes in primary_conninfo or primary_slotname here. Startup process will signal us to terminate in case those change. */
            *startpoint = walrcv->receiveStart;
            *startpointTLI = walrcv->receiveStartTLI;
            walrcv->walRcvState = WALRCV_STREAMING;
            SpinLockRelease(&walrcv->mutex);
            break;
        }
        if (walrcv->walRcvState == WALRCV_STOPPING) { /* We should've received SIGTERM if the startup process wants us to die, but might as well check it here too. */
            SpinLockRelease(&walrcv->mutex);
            exit(1);
        }
        SpinLockRelease(&walrcv->mutex);
        (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, WAIT_EVENT_WAL_RECEIVER_WAIT_START);
    }
}

4、startup进程则通过WaitForWALToBecomeAvailable->RequestXLogStreaming流程设置receiveStart和receiveStartTLI,并且是通过latch唤醒WalReceiver进程。

关系图:

三、同步级别

在主从复制中,synchronous_commit参数是WAL相关配置参数,用于指定当数据库提交事务时是否需要等待WAL日志写入硬盘后才向客户端返回成功,这个参数可选值多(on、off、local、remote_write、remote_apply 五种)

synchronous_commit参数:

同步等级 设定值 概述 保证范围
同步 remote_apply 应用发起提交后,等到在备库上应用WAL(更新数据)后,返回COMMIT响应,由于完全保证了数据同步,因此它适合需要备库始终保持最新数据的负载分配场景。 1-9
同步 on(默认) 应用发起提交后,在备库上写入WAL后,返回COMMIT响应。该选项是性能和可靠性之间的最佳平衡。 1-6
准同步 remote_write 应用发起提交后,等到WAL已传输到备库后,返回COMMIT响应。 1-5
异步 local 应用发起提交后,写入主库WAL之后,返回COMMIT响应。 1-2
异步 off 应用发起提交后,直接返回COMMIT响应,而无需等待主库WAL完成写入。 1

主库synchronous_commit = remote_apply:每当startup进程进行了apply xlog records时,startup进程都会调用此方法,以便 walreceiver 可以检查它是否需要将apply通知发送回主节点。(该部分会在WaitForWALToBecomeAvailable进行调用,只有当startup进程已经apply完了,才会调用这个函数唤醒walreciever。)

void WalRcvForceReply(void) {
    Latch      *latch;
    WalRcv->force_reply = true;  // 设置强制回复标志    
    SpinLockAcquire(&WalRcv->mutex); /* fetching the latch pointer might not be atomic, so use spinlock */
    latch = WalRcv->latch;
    SpinLockRelease(&WalRcv->mutex);
    if (latch) SetLatch(latch);
}
文章来自个人专栏
pg主从
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
1
1