searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

逻辑解码源码分析

2023-10-27 06:59:07
15
0

流复制与逻辑复制

+ 流复制:

  在pg10之前流复制承载了pg主备之间数据同步的功能,它的实现方式是将wal日志中记录的内容按照确切的块地址逐字节的拷贝到备库,因此主备之间数据分布是一致的,这意味着在主备机器上,同一条记录的ctid是相同的

+ 逻辑复制:
  逻辑复制同步数据的原理是,在wal日志产生的数据库上,由逻辑解析模块对wal日志进行初步的解析,将解析出的sql重新发送到订阅端,订阅端重新执行insert、delete、update的操作

 

逻辑订阅源码分析

订阅端 walsender.c

1.startLogicalreplication

2.XlogSendLogical

static voidXLogSendLogical(void){    XLogRecord *record;    char       *errm;    //XLogReadRecord拿到一条完整的的Xlog record    static XLogRecPtr flushPtr = InvalidXLogRecPtr;    WalSndCaughtUp = false;    record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);    logical_startptr = InvalidXLogRecPtr;​    /* xlog record was invalid */    if (errm != NULL)        elog(ERROR, "%s", errm);//识别record的数据内容,封装到ReorderBufferChange结构中。一个ReorderBufferChange对应一次修改    if (record != NULL)    {        LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);​        sentPtr = logical_decoding_ctx->reader->EndRecPtr;    }​ //判断    if (flushPtr == InvalidXLogRecPtr)        flushPtr = GetFlushRecPtr();    else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)        flushPtr = GetFlushRecPtr();    if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)        WalSndCaughtUp = true;​    if (WalSndCaughtUp && got_STOPPING)        got_SIGUSR2 = true;​    //更新​    {        WalSnd     *walsnd = MyWalSnd;​        SpinLockAcquire(&walsnd->mutex);        walsnd->sentPtr = sentPtr;        SpinLockRelease(&walsnd->mutex);    }}
void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
{
	XLogRecordBuffer buf;

	buf.origptr = ctx->reader->ReadRecPtr;
	buf.endptr = ctx->reader->EndRecPtr;
	buf.record = record;

	/* cast so we get a warning when new rmgrs are added */
	switch ((RmgrIds) XLogRecGetRmid(record))
	{
			/*
			 * Rmgrs we care about for logical decoding. Add new rmgrs in
			 * rmgrlist.h's order.
			 */
		case RM_XLOG_ID:
			DecodeXLogOp(ctx, &buf);
			break;

		case RM_XACT_ID:
			DecodeXactOp(ctx, &buf);
			break;

		case RM_STANDBY_ID:
			DecodeStandbyOp(ctx, &buf);
			break;

		case RM_HEAP2_ID:
			DecodeHeap2Op(ctx, &buf);
			break;

		case RM_HEAP_ID:
			DecodeHeapOp(ctx, &buf);
			break;

		case RM_LOGICALMSG_ID:
			DecodeLogicalMsgOp(ctx, &buf);
			break;

			/*
			 * Rmgrs irrelevant for logical decoding; they describe stuff not
			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
			 * order.
			 */
		case RM_SMGR_ID:
		case RM_CLOG_ID:
		case RM_DBASE_ID:
		case RM_TBLSPC_ID:
		case RM_MULTIXACT_ID:
		case RM_RELMAP_ID:
		case RM_BTREE_ID:
		case RM_HASH_ID:
		case RM_GIN_ID:
		case RM_GIST_ID:
		case RM_SEQ_ID:
		case RM_SPGIST_ID:
		case RM_BRIN_ID:
		case RM_COMMIT_TS_ID:
		case RM_REPLORIGIN_ID:
		case RM_GENERIC_ID:
			/* just deal with xid, and done */
			ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
									buf.origptr);
			break;
		case RM_NEXT_ID:
			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
	}
}

以insert 为例

static void
DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
	Size		datalen;
	char	   *tupledata;
	Size		tuplelen;
	XLogReaderState *r = buf->record;
	xl_heap_insert *xlrec;
	ReorderBufferChange *change;
	RelFileNode target_node;

	xlrec = (xl_heap_insert *) XLogRecGetData(r);

    .......
   //为change 申请分配空间
	change = ReorderBufferGetChange(ctx->reorder);
    
   //组装change
	if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
		change->action = REORDER_BUFFER_CHANGE_INSERT;
	else
		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
	change->origin_id = XLogRecGetOrigin(r);

	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));

	tupledata = XLogRecGetBlockData(r, 0, &datalen);
	tuplelen = datalen - SizeOfHeapHeader;

	change->data.tp.newtuple =
		ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);

	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);

	change->data.tp.clear_toast_afterwards = true;
    //将change加入双向链表
	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}

0条评论
0 / 1000
z****n
4文章数
1粉丝数
z****n
4 文章 | 1 粉丝
z****n
4文章数
1粉丝数
z****n
4 文章 | 1 粉丝
原创

逻辑解码源码分析

2023-10-27 06:59:07
15
0

流复制与逻辑复制

+ 流复制:

  在pg10之前流复制承载了pg主备之间数据同步的功能,它的实现方式是将wal日志中记录的内容按照确切的块地址逐字节的拷贝到备库,因此主备之间数据分布是一致的,这意味着在主备机器上,同一条记录的ctid是相同的

+ 逻辑复制:
  逻辑复制同步数据的原理是,在wal日志产生的数据库上,由逻辑解析模块对wal日志进行初步的解析,将解析出的sql重新发送到订阅端,订阅端重新执行insert、delete、update的操作

 

逻辑订阅源码分析

订阅端 walsender.c

1.startLogicalreplication

2.XlogSendLogical

static voidXLogSendLogical(void){    XLogRecord *record;    char       *errm;    //XLogReadRecord拿到一条完整的的Xlog record    static XLogRecPtr flushPtr = InvalidXLogRecPtr;    WalSndCaughtUp = false;    record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);    logical_startptr = InvalidXLogRecPtr;​    /* xlog record was invalid */    if (errm != NULL)        elog(ERROR, "%s", errm);//识别record的数据内容,封装到ReorderBufferChange结构中。一个ReorderBufferChange对应一次修改    if (record != NULL)    {        LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);​        sentPtr = logical_decoding_ctx->reader->EndRecPtr;    }​ //判断    if (flushPtr == InvalidXLogRecPtr)        flushPtr = GetFlushRecPtr();    else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)        flushPtr = GetFlushRecPtr();    if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)        WalSndCaughtUp = true;​    if (WalSndCaughtUp && got_STOPPING)        got_SIGUSR2 = true;​    //更新​    {        WalSnd     *walsnd = MyWalSnd;​        SpinLockAcquire(&walsnd->mutex);        walsnd->sentPtr = sentPtr;        SpinLockRelease(&walsnd->mutex);    }}
void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
{
	XLogRecordBuffer buf;

	buf.origptr = ctx->reader->ReadRecPtr;
	buf.endptr = ctx->reader->EndRecPtr;
	buf.record = record;

	/* cast so we get a warning when new rmgrs are added */
	switch ((RmgrIds) XLogRecGetRmid(record))
	{
			/*
			 * Rmgrs we care about for logical decoding. Add new rmgrs in
			 * rmgrlist.h's order.
			 */
		case RM_XLOG_ID:
			DecodeXLogOp(ctx, &buf);
			break;

		case RM_XACT_ID:
			DecodeXactOp(ctx, &buf);
			break;

		case RM_STANDBY_ID:
			DecodeStandbyOp(ctx, &buf);
			break;

		case RM_HEAP2_ID:
			DecodeHeap2Op(ctx, &buf);
			break;

		case RM_HEAP_ID:
			DecodeHeapOp(ctx, &buf);
			break;

		case RM_LOGICALMSG_ID:
			DecodeLogicalMsgOp(ctx, &buf);
			break;

			/*
			 * Rmgrs irrelevant for logical decoding; they describe stuff not
			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
			 * order.
			 */
		case RM_SMGR_ID:
		case RM_CLOG_ID:
		case RM_DBASE_ID:
		case RM_TBLSPC_ID:
		case RM_MULTIXACT_ID:
		case RM_RELMAP_ID:
		case RM_BTREE_ID:
		case RM_HASH_ID:
		case RM_GIN_ID:
		case RM_GIST_ID:
		case RM_SEQ_ID:
		case RM_SPGIST_ID:
		case RM_BRIN_ID:
		case RM_COMMIT_TS_ID:
		case RM_REPLORIGIN_ID:
		case RM_GENERIC_ID:
			/* just deal with xid, and done */
			ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
									buf.origptr);
			break;
		case RM_NEXT_ID:
			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
	}
}

以insert 为例

static void
DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
	Size		datalen;
	char	   *tupledata;
	Size		tuplelen;
	XLogReaderState *r = buf->record;
	xl_heap_insert *xlrec;
	ReorderBufferChange *change;
	RelFileNode target_node;

	xlrec = (xl_heap_insert *) XLogRecGetData(r);

    .......
   //为change 申请分配空间
	change = ReorderBufferGetChange(ctx->reorder);
    
   //组装change
	if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
		change->action = REORDER_BUFFER_CHANGE_INSERT;
	else
		change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
	change->origin_id = XLogRecGetOrigin(r);

	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));

	tupledata = XLogRecGetBlockData(r, 0, &datalen);
	tuplelen = datalen - SizeOfHeapHeader;

	change->data.tp.newtuple =
		ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);

	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);

	change->data.tp.clear_toast_afterwards = true;
    //将change加入双向链表
	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}

文章来自个人专栏
postgresql 日志分析
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0