对于建立了主从关系的两台pg,其流复制的过程如下:
一、步骤
(1)启动主、备;
(2)备启动startup进程;
(3)备启动walreceiver进程;
(4)walreceiver向主发送连接请求;
(5)当主收到连接请求,启动walsender进程,建立walsender与walreceiver之间的TCP连接;
(6)walreceiver发送备最新LSN(握手机制);
(7)walsender将主库最新LSN到备库最新的LSN之间的wal 传递给walreceiver 。这个阶段就是备库追赶主库的阶段。
(8)流复制开始工作。
二、调用顺序
Startup进程 ---》 Walreceiver进程 ---》Walsender进程。
- Startup进程通常是负责apply日志的。因此需要Wal日志,它会唤醒Walreceiver从Walsender获取Wal日志。
- Walreceiver收到Startup的唤醒信号后,会通过握手机制向Walsender发起连接请求,并且Walreceiver会告诉主机备机的最新LSN。
- Walsender进程根据Walreceiver发来的备机最新的LSN,发送主库的最新WAL日志给Walreceiver进程。并且根据Walsender进程同步级别,选择是否等待Walreceiver发来的feedback。
- Walreceiver进程就像是Startup和Walsender通信桥梁。Startup进程需要的Wal日志都是通过Walreceiver获得的。
因此在主从复制中,Startup和Walreceiver之间的交互就显得很重要。Walsender进程主要就是发送主库的最新WAL日志给Walreceiver进程。由于时间关系,该部分的源码分析就不细说。下面主要学习Startup进程与Walreceiver进程之间函数调用的逻辑关系。
Startup进程
根据WAL机制,应该先WAL日志落盘,然后再数据落盘。当缓存中的脏页还未被刷入磁盘时,如果数据库宕机了,则数据库的数据处于不一致的状态。在数据库重新启动时,需要借助WAL日志将数据库恢复到一致的状态。
基于上述:Startup进程的作用如下:
- 崩溃后的恢复工作:Startup是崩溃恢复最核心的进程。它在pg启动时就会被调用,读取配置信息,应用WAL日志;
- 主从结构中从库应用WAL日志重放;
- PITR(基于时间点的恢复);
对于一个在主从结构中Startup进程,其调用过程如下所示:
一、StartupXLOG() 源码分析
src\backend\access\transam\xlog.c
获取恢复起点
从哪里开始恢复数据,这是Startup恢复数据需要做的第一步。通常,在pg中有两种获取恢复起点的方式,他们的优先级:backup_label File >> controlFile。
- ControlFile:Backup start location: 会有wal日志备份恢复的redo点。
- backup_label File:START WAL LOCATION:例如pg_basebackup,会生成该文件,能够确定备份恢复的redo点。
- backup_label file的记录如下:
CHECKPOINT LOCATION | 创建检查点的LSN号 |
---|---|
START WAL LOCATION | WAL恢复的起始位置 |
BACKUP METHOD | 备份方法 |
BACKUP FROM | 备份源来自主库还是备库 |
START TIME | pg_start_backup 执行时间 |
LABEL | 备份标识 |
START TIMELINE | 时间线 |
void
StartupXLOG(void)
{
XLogCtlInsert *Insert; //指向当前正在进行 WAL 插入的 XLogCtlInsert 结构体的指针。
CheckPoint checkPoint; //用于存储检查点的信息,包括位置、时间等。
bool wasShutdown; //标志位,指示数据库是否在上次关闭时发生了崩溃。
bool reachedStopPoint = false; //标志位,指示是否已经达到了停止点(stop point)。
bool haveBackupLabel = false; //标志位,指示是否存在备份标签文件。
bool haveTblspcMap = false; //标志位,指示是否存在表空间映射文件。
XLogRecPtr RecPtr, //当前的 WAL 记录位置。
checkPointLoc, //检查点记录的位置。
EndOfLog; //WAL 日志的结束位置。
TimeLineID EndOfLogTLI; //WAL 日志的结束时间线。
TimeLineID PrevTimeLineID; //上一个时间线的标识。
XLogRecord *record; //指向当前读取的 WAL 记录的指针。
TransactionId oldestActiveXID; //最旧的活跃事务的事务ID。
bool backupEndRequired = false; //标志位,指示是否需要备份结束点。
bool backupFromStandby = false; //标志位,指示是否从备机进行备份。
DBState dbstate_at_startup; //数据库启动时的状态。
XLogReaderState *xlogreader; //XLOG 读取器的状态。
XLogPageReadPrivate private;
bool fast_promoted = false;//标志位,指示是否进行了快速提升(fast promotion)
struct stat st;//用于存储文件状态的结构体。
//...前期准备工作。
//1、确定redo位点,如果有backup_label文件
if (read_backup_label(&checkPointLoc, &backupEndRequired,
&backupFromStandby))
{
List *tablespaces = NIL;
/*
* Archive recovery was requested, and thanks to the backup label
* file, we know how far we need to replay to reach consistency. Enter
* archive recovery directly.
*/
InArchiveRecovery = true;
if (StandbyModeRequested)
StandbyMode = true; //设置为startup从库模式
/*
如果有backup_label file,优先使用该文件获取检查点的信息,而不是pg_control。
*/
record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
//...
/* set flag to delete it later */
haveBackupLabel = true;
}
else
//没有backup_label文件,则看control文件
{
//...
if (ArchiveRecoveryRequested &&
(ControlFile->minRecoveryPoint != InvalidXLogRecPtr ||
ControlFile->backupEndRequired ||
ControlFile->backupEndPoint != InvalidXLogRecPtr ||
ControlFile->state == DB_SHUTDOWNED))
{
InArchiveRecovery = true;
if (StandbyModeRequested)
StandbyMode = true; //设置为startup从库模式
}
/* Get the last valid checkpoint record. */
checkPointLoc = ControlFile->checkPoint;
RedoStartLSN = ControlFile->checkPointCopy.redo;
record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1, true);
if (record != NULL)
{
ereport(DEBUG1,
(errmsg("checkpoint record is at %X/%X",
(uint32) (checkPointLoc >> 32), (uint32) checkPointLoc)));
}
else
{
ereport(PANIC,
(errmsg("could not locate a valid checkpoint record")));
}
memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
wasShutdown = ((record->xl_info & ~XLR_INFO_MASK) == XLOG_CHECKPOINT_SHUTDOWN);
}
//...
//2、启动replay日志记录。InRecovery = true;
/*
InRecovery:如果为true,应该理解为进程正在replay日志记录,而不是系统正处于恢复模式,后者应该通 过RecoveryInProgress() 确定。
ArchiveRecoveryRequested:请求进行归档日志恢复
InArchiveRecovery:若为true,说明当前在使用归档日志恢复(通常在执行PITR、或者是从库);若为false,说明当前仅使用pg_wal目录中的wal日志进行恢复(通常是崩溃恢复阶段)
/*
/*
* Check whether we need to force recovery from WAL. If it appears to
* have been a clean shutdown and we did not have a recovery signal file,
* then assume no recovery needed.
*/
if (checkPoint.redo < RecPtr)
{
if (wasShutdown)
ereport(PANIC,
(errmsg("invalid redo record in shutdown checkpoint")));
InRecovery = true;
}
else if (ControlFile->state != DB_SHUTDOWNED)
InRecovery = true;
else if (ArchiveRecoveryRequested)
{
/* force recovery due to presence of recovery signal file */
InRecovery = true;
}
日志来源
Startup进程的3大作用——崩溃恢复、从库日志应用、PITR,对于不同的用途,也有不同的日志来源。pg维护一个专门的状态机(称作state machine)表示待应用日志的来源,用于在不同时段,从不同的日志源获取WAL日志。对应代码如下(在xlog.c):
/*
*/
typedef enum
{
XLOG_FROM_ANY = 0, /* request to read WAL from any source */
XLOG_FROM_ARCHIVE, /* restored using restore_command,从归档日志中获取 */
XLOG_FROM_PG_WAL, /* existing file in pg_wal,从pg_wal目录获取 */
XLOG_FROM_STREAM /* streamed from primary,备库从主库获取 */
} XLogSource;
恢复模式(Reply)
/* REDO开始 */
if (InRecovery)
{
int rmid;
/*
1、更新控制文件状态以示我们正在恢复,并且展示我们选择作为恢复起点的检查点位置。另外还会用从备份历史文件获取的最小恢复结束位置(minimum recovery stop point)标记控制文件
*/
/* 先保存控制文件中的状态,然后更新 */
dbstate_at_startup = ControlFile->state;
/* 如果在使用归档日志进行恢复(PITR或者从库),更新状态 */
if (InArchiveRecovery)
{
ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->SharedRecoveryState = RECOVERY_STATE_ARCHIVE;
SpinLockRelease(&XLogCtl->info_lck);
}
/* 否则,如果在使用WAL文件进行恢复(崩溃恢复) */
else
{
//...
}
/* 更新控制文件中检查点信息 */
ControlFile->checkPoint = checkPointLoc;
ControlFile->checkPointCopy = checkPoint;
if (InArchiveRecovery)
{
/* initialize minRecoveryPoint if not set yet,最小恢复点不应该小于重做点 */
if (ControlFile->minRecoveryPoint < checkPoint.redo)
{
ControlFile->minRecoveryPoint = checkPoint.redo;
ControlFile->minRecoveryPointTLI = checkPoint.ThisTimeLineID;
}
}
/* 如果有backup_label文件 */
if (haveBackupLabel)
{
ControlFile->backupStartPoint = checkPoint.redo;
ControlFile->backupEndRequired = backupEndRequired;
/* 如果是从从库备份的 */
if (backupFromStandby)
{
/* 只可能是以下两种状态,若不是则报错 */
if (dbstate_at_startup != DB_IN_ARCHIVE_RECOVERY &&
dbstate_at_startup != DB_SHUTDOWNED_IN_RECOVERY)
ereport(FATAL,
(errmsg("backup_label contains data inconsistent with control file"),
errhint("This means that the backup is corrupted and you will "
"have to use another backup for recovery.")));
/* 若是,则更新备份结束点 */
ControlFile->backupEndPoint = ControlFile->minRecoveryPoint;
}
}
/* 更新时间 */
ControlFile->time = (pg_time_t) time(NULL);
/* No need to hold ControlFileLock yet, we aren't up far enough */
UpdateControlFile();
//2、...初始化clog、subtrans、事务环境等。初始化redo资源管理器
/*
/3、读取还未reply的第一条record。
*/
if (checkPoint.redo < RecPtr) // 检查点的重做位置(checkPoint.redo)小于当前的记录位置(RecPtr)检查点的下一条。
{
/* back up to find the record */
record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */ //CheckPoint下一条record。
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
}
if (record != NULL)
{
//4、redo操作
}
//...
redo操作:在Rmgr中,其中最重要的就是heap_redo函数。heap_redo函数主要就是一个switch语句,根据不同类型的WAL记录,执行不同的操作。例如对于insert操作,XLOG_HEAP_INSERT,则执行heap_xlog_insert。
void
heap_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
/*
* These operations don't overwrite MVCC data so no conflict processing is
* required. The ones in heap2 rmgr do.
*/
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP_INSERT:
heap_xlog_insert(record);
break;
case XLOG_HEAP_DELETE:
heap_xlog_delete(record);
break;
case XLOG_HEAP_UPDATE:
heap_xlog_update(record, false);
break;
case XLOG_HEAP_TRUNCATE:
/*
* TRUNCATE is a no-op because the actions are already logged as
* SMGR WAL records. TRUNCATE WAL record only exists for logical
* decoding.
*/
break;
…
default:
elog(PANIC, "heap_redo: unknown op code %u", info);
}
}
二、总结
- 读取backup_label File或者pg_control文件,找到redo位点;读取配置文件、若为主从模式,则设置为startup从库模式。
- 开始reply(恢复)前的初始化工作:初始化clog、subtrans、事务环境等。初始化redo资源管理器。
- 读取WAL record(ReadRecord)。 如果record不存在需要调用XLogPageRead->WaitForWALToBecomeAvailable->RequestXLogStreaming唤醒walreceiver从walsender获取WAL record。
- 对读取的WAL record进行redo,通过record->xl_rmid信息,调用相应的redo资源管理器进行redo操作。
- 更新共享内存中XLogCtlData的apply位点和时间线。
- 回到步骤3,读取next WAL record。
Walreceiver进程
当主从关系建立后,主机插入一条数据,跟踪从机的Walreciver进程,其函数调用关系如下(从下至上):
WaitEventSetWaitBlock(WaitEventSet * set, int cur_timeout, WaitEvent * occurred_events, int nevents)
WaitEventSetWait(WaitEventSet * set, long timeout, WaitEvent * occurred_events, int nevents, uint32 wait_event_info)
walrcv_receive();WaitLatchOrSocket()
WalReceiverMain()
AuxiliaryProcessMain(int argc, char ** argv)
StartChildProcess(AuxProcType type)
MaybeStartWalReceiver()
sigusr1_handler
ServerLoop()
PostmasterMain(int argc, char ** argv)
main(int argc, char ** argv)
WalReceiverMain
\src\backend\replication\walreceiver.c
void
WalReceiverMain(void)
{
char conninfo[MAXCONNINFO]; //用于存储连接信息的字符数组。
char *tmp_conninfo; //用于存储临时连接信息的指针。
char slotname[NAMEDATALEN]; //用于存储槽位名称的字符数组。
XLogRecPtr startpoint; //用于存储起始位置的 XLogRecPtr。
TimeLineID startpointTLI; //用于存储起始位置的时间线标识符(TimeLineID)。
TimeLineID primaryTLI; //用于存储主服务器的时间线标识符。
bool first_stream; //用于标识是否是第一次进行流复制。
WalRcvData *walrcv = WalRcv; //指向 WalRcvData 结构体的指针。
TimestampTz last_recv_timestamp; //上次接收到数据的时间戳。
TimestampTz now; //当前时间戳。
bool ping_sent; //标识是否已发送心跳包。
char *err;
char *sender_host = NULL; //发送者的主机名。
int sender_port = 0; //发送者的端口号。
/*
WalRcv应该已经设置好了(如果我们是后端,我们通过fork()或EXEC_BACKEND机制从postmaster继承)。
*/
Assert(walrcv != NULL);
now = GetCurrentTimestamp();
//...设置walreceiver状态(walRCVState)
/* 1、在此之前,Startup已经读取了primary_conninfo。WalReceiver获取关键信息,walrcv->conninfo、walrcv->slotname、walrcv->receiveStart、walrcv->receiveStartTLI*/
walrcv->ready_to_display = false;
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); //主机IP,Port
strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); //复制槽名称
startpoint = walrcv->receiveStart; //备机这边的apply点(最新LSN)
startpointTLI = walrcv->receiveStartTLI; //备机TLI
//...一些信号处理
/*1-续:根据获取的信息与主建立tcp链接 */
wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
if (!wrconn)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s", err)));
//...更新或保存主的一些信息
// 2、流复制正式开始。这里流复制有三个大循环。
first_stream = true;
for (;;) //第一个大循环,只要流复制还在就不会退出
{
char *primary_sysid;
char standby_sysid[32];
WalRcvStreamOptions options;
//2-续:...首先执行一些接收wal日志的前期工作。
//step 3.1 执行wal_startstreaming开始启动流复制.进入第二个大循环
if (walrcv_startstreaming(wrconn, &options))
{
/* Loop until end-of-streaming or error */
for (;;) //第二个大循环 流复制结束后会退出。
{
char *buf;
int len;
bool endofwal = false;
pgsocket wait_fd = PGINVALID_SOCKET;
int rc;
//...
/* step 3.1-续 接受WAL日志。这里有三种情况。
情况一:len > 0,一直有wal日志发送过来,一直读取并先写入XLogWalRcvProcessMsg缓冲区,至到len==0.
情况二:len == 0,已经没有新的wal日志了。那么跳出去,进行flush,并通知startup。然后继续等(WaitLatchOr)
情况三:len < 0 ,主自己断开了连接,这个时候也跳出去。进行flush,并通知startup。然后结束流复制。
walrcv_receive:如果成功读取了一条消息,则返回其长度。如果连接关闭,则返回-1。否则,返回0表示没有可用的数据
*/
len = walrcv_receive(wrconn, &buf, &wait_fd);
if (len != 0)
{
/*
开始接收data
*/
for (;;) //第三个大循环 len==0或者len =-1退出。都表示没有数据要接收了。
{
if (len > 0)
{
//通过XLogWalRcvProcessMsg正式接收Wal日志
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
}
else if (len == 0)
break;
else if (len < 0)
{
ereport(LOG,
(errmsg("replication terminated by primary server"),
errdetail("End of WAL reached on timeline %u at %X/%X.",
startpointTLI,
(uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
endofwal = true;
break;
}
len = walrcv_receive(wrconn, &buf, &wait_fd);
}
/*主库发送write位点、flush位点、apply位点,
发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),
避免vacuum删掉备库正在使用的记录
*/
XLogWalRcvSendReply(false, false);
/*
WAL刷盘,同时唤醒Startup进程。
*/
XLogWalRcvFlush(false);
}
/* 如果是因为主要求断开连接,len < 0,跳出大循环,结束流复制*/
if (endofwal)
break;
//否则,就是len=0,那么继续监听WaitLatchOrSocket。
Assert(wait_fd != PGINVALID_SOCKET);
rc = WaitLatchOrSocket(walrcv->latch,
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET,
wait_fd,
NAPTIME_PER_CYCLE,
WAIT_EVENT_WAL_RECEIVER_MAIN);
if (rc & WL_LATCH_SET)
{
ResetLatch(walrcv->latch);
ProcessWalRcvInterrupts();
if (walrcv->force_reply)
{
walrcv->force_reply = false;
pg_memory_barrier();
XLogWalRcvSendReply(true, false);
}
}
if (rc & WL_TIMEOUT)
{
bool requestReply = false;
if (wal_receiver_timeout > 0)
{
TimestampTz now = GetCurrentTimestamp();
TimestampTz timeout;
timeout =
TimestampTzPlusMilliseconds(last_recv_timestamp,
wal_receiver_timeout);
if (now >= timeout)
ereport(ERROR,
(errmsg("terminating walreceiver due to timeout")));
if (!ping_sent)
{
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
(wal_receiver_timeout / 2));
if (now >= timeout)
{
requestReply = true;
ping_sent = true;
}
}
}
XLogWalRcvSendReply(requestReply, requestReply);
XLogWalRcvSendHSFeedback(false);
}
}
/*
结束流复制,主那边自己断了。
*/
walrcv_endstreaming(wrconn, &primaryTLI);
//...
}
else
ereport(LOG,
(errmsg("primary server contains no more WAL on requested timeline %u",
startpointTLI)));
//4、最后收尾工作,再检查一遍还有没有wal日志没有flush
if (recvFile >= 0)
{
char xlogfname[MAXFNAMELEN];
XLogWalRcvFlush(false);
if (close(recvFile) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
XLogFileNameP(recvFileTLI, recvSegNo))));
/*
* Create .done file forcibly to prevent the streamed segment from
* being archived later.
*/
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
XLogArchiveForceDone(xlogfname);
else
XLogArchiveNotify(xlogfname);
}
recvFile = -1;
//5、WalRcvWaitForStartPosition函数等待startup进程更新receiveStart和receiveStartTLI,一旦更新,进入第一个大循环重新开始。
elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
}
/* not reached */
}
WalReceiver进程进入工作状态后主要执行流程如下所示:
- WalReceiver进程进入流复制之前,Startup进程已经读取了primary_conninfo参数信息解析后填充到walrcv->conninfo、walrcv->slotname、walrcv->receiveStart、walrcv->receiveStartTLI等共享内存WalRcvData中。 WalReceiver进程通过walrcv_connect连接主库。
- 进入第一层死循环,执行identify_system命令,获取主库systemid/timeline/xlogpos等信息,执行TIMELINE_HISTORY命令拉取history文件,如果需要,则创建临时复制槽。
- 循环
- 执行wal_startstreaming开始启动流复制
- 通过walrcv_receive获取WAL日志,并向wal segment文件中写入xlog(XLogWalRcvWrite,写到缓冲区),期间也会回应主库发过来的心跳信息(发送write位点、flush位点、apply位点)
- 如果walrcv_receive获取不到数据,向主库发送write位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),避免vacuum删掉备库正在使用的记录,如果flush了XLOG逻辑位置唤醒startup进程(XLogWalRcvFlush),跳转到4
- 如果walrcv_receive获取EOF(<0),向主库发送write位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),避免vacuum删掉备库正在使用的记录,如果flush了XLOG逻辑位置唤醒startup进程,跳转到5
- 如果wal_startstreaming返回false,说明主库在该时间线上已经没有wal可以发送了,跳转到6
- 执行wal_startstreaming开始启动流复制
- WaitLatchOrSocket等待超时/网络可读/latch被触发:
- 等待超时,向主库发送接收位点、flush位点、apply位点,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),跳转到3.i.x
- startup进程通过latch唤醒WalReceiver进程,向主库发送write位点、flush位点、apply位点,跳转到3.i.x
- 执行walrcv_endstreaming结束流复制,接收wal sender发送过来的时间线文件,进入步骤6
- 关闭目前的xlog segment文件描述符,发送feedback信息(xmin、xmin_epoch、catalog_xmin、catalog_xmin_epoch),如果flush了XLOG逻辑位置唤醒startup进程,进入步骤7
- WalRcvWaitForStartPosition函数等待startup进程更新receiveStart和receiveStartTLI,一旦更新,进入步骤2。
WalReceiver&Startup之间有关交互的函数
一、回顾
在流复制启动过程中,三个进程的启动顺序是从备库到主库,即:startup ---> walreceiver ---> walsender。 当Startup处于主从复制流中时,从primary 节点以流复制方式获取wal日志,这时startup会发送信号请求唤醒wal receiver进程从Primary节点来获取wal数据。
二、几个交互重要函数
1、如果Startup时候,record不存在,需要调用XLogPageRead->WaitForWALToBecomeAvailable->RequestXLogStreaming唤醒walreceiver从walsender获取WAL record。
- walrcv->receiveStart 用于记录已接收的最后一个 WAL 记录的位置(LSN),它的作用是跟踪 WAL reciever进程在主服务器上的复制进度。以便在下一次接收时从该位置开始。
- walrcv->receiveStartTLI 用于记录当前接收到的 WAL 日志的Timeline ID。它的作用是跟踪 WAL reciever在主服务器上的复制进度,并确保从服务器上的复制流程正确地跟随主服务器的时间线切换。
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot) {
WalRcvData *walrcv = WalRcv;
...
walrcv->receiveStart = recptr;
walrcv->receiveStartTLI = tli;
latch = walrcv->latch;
SpinLockRelease(&walrcv->mutex);
// startup进程将WalReceiver状态从WALRCV_STOPPED转变为WALRCV_STARTING时,设置launch为true,向postmaster请求启动WalReceiver进程
// startup进程通过latch唤醒WalReceiver进程
if (launch) SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
else if (latch) SetLatch(latch);
}
2、WalReceiver进程调用XLogWalRcvFlush函数,如果已经flush了XLOG逻辑位置,则唤醒Startup进程进行apply日志。
static void XLogWalRcvFlush(bool dying) {
if (LogstreamResult.Flush < LogstreamResult.Write) { //进行flush
WalRcvData *walrcv = WalRcv;
issue_xlog_fsync(recvFile, recvSegNo);
LogstreamResult.Flush = LogstreamResult.Write;
/* Update shared-memory status 更新一些状态*/
SpinLockAcquire(&walrcv->mutex);
if (walrcv->flushedUpto < LogstreamResult.Flush) {
walrcv->latestChunkStart = walrcv->flushedUpto;
walrcv->flushedUpto = LogstreamResult.Flush;
walrcv->receivedTLI = ThisTimeLineID;
}
SpinLockRelease(&walrcv->mutex);
WakeupRecovery(); /* Signal the startup process and walsender that new WAL has arrived */ // 唤醒startup进程
if (AllowCascadeReplication()) WalSndWakeup();
if (!dying) { /* Also let the primary know that we made some progress 给主机*/
XLogWalRcvSendReply(false, false);
XLogWalRcvSendHSFeedback(false);
}
}
}
3、walReceiver进程WalRcvWaitForStartPosition函数等待Startup进程设置receiveStart和receiveStartTLI
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) {
WalRcvData *walrcv = WalRcv;
int state;
SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
if (state != WALRCV_STREAMING) {
SpinLockRelease(&walrcv->mutex);
if (state == WALRCV_STOPPING) proc_exit(0);
else elog(FATAL, "unexpected walreceiver state");
}
walrcv->walRcvState = WALRCV_WAITING;
walrcv->receiveStart = InvalidXLogRecPtr;
walrcv->receiveStartTLI = 0;
SpinLockRelease(&walrcv->mutex);
/* nudge startup process to notice that we've stopped streaming and are now waiting for instructions. */
WakeupRecovery();
for (;;) {
ResetLatch(MyLatch);
ProcessWalRcvInterrupts();
SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_RESTARTING) {
/* No need to handle changes in primary_conninfo or primary_slotname here. Startup process will signal us to terminate in case those change. */
*startpoint = walrcv->receiveStart;
*startpointTLI = walrcv->receiveStartTLI;
walrcv->walRcvState = WALRCV_STREAMING;
SpinLockRelease(&walrcv->mutex);
break;
}
if (walrcv->walRcvState == WALRCV_STOPPING) { /* We should've received SIGTERM if the startup process wants us to die, but might as well check it here too. */
SpinLockRelease(&walrcv->mutex);
exit(1);
}
SpinLockRelease(&walrcv->mutex);
(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, WAIT_EVENT_WAL_RECEIVER_WAIT_START);
}
}
4、startup进程则通过WaitForWALToBecomeAvailable->RequestXLogStreaming流程设置receiveStart和receiveStartTLI,并且是通过latch唤醒WalReceiver进程。
关系图:
三、同步级别
在主从复制中,synchronous_commit参数是WAL相关配置参数,用于指定当数据库提交事务时是否需要等待WAL日志写入硬盘后才向客户端返回成功,这个参数可选值多(on、off、local、remote_write、remote_apply 五种)
synchronous_commit参数:
同步等级 | 设定值 | 概述 | 保证范围 |
---|---|---|---|
同步 | remote_apply | 应用发起提交后,等到在备库上应用WAL(更新数据)后,返回COMMIT响应,由于完全保证了数据同步,因此它适合需要备库始终保持最新数据的负载分配场景。 | 1-9 |
同步 | on(默认) | 应用发起提交后,在备库上写入WAL后,返回COMMIT响应。该选项是性能和可靠性之间的最佳平衡。 | 1-6 |
准同步 | remote_write | 应用发起提交后,等到WAL已传输到备库后,返回COMMIT响应。 | 1-5 |
异步 | local | 应用发起提交后,写入主库WAL之后,返回COMMIT响应。 | 1-2 |
异步 | off | 应用发起提交后,直接返回COMMIT响应,而无需等待主库WAL完成写入。 | 1 |
当主库synchronous_commit = remote_apply:每当startup进程进行了apply xlog records时,startup进程都会调用此方法,以便 walreceiver 可以检查它是否需要将apply通知发送回主节点。(该部分会在WaitForWALToBecomeAvailable进行调用,只有当startup进程已经apply完了,才会调用这个函数唤醒walreciever。)
void WalRcvForceReply(void) {
Latch *latch;
WalRcv->force_reply = true; // 设置强制回复标志
SpinLockAcquire(&WalRcv->mutex); /* fetching the latch pointer might not be atomic, so use spinlock */
latch = WalRcv->latch;
SpinLockRelease(&WalRcv->mutex);
if (latch) SetLatch(latch);
}