流复制与逻辑复制
+ 流复制:
在pg10之前流复制承载了pg主备之间数据同步的功能,它的实现方式是将wal日志中记录的内容按照确切的块地址逐字节的拷贝到备库,因此主备之间数据分布是一致的,这意味着在主备机器上,同一条记录的ctid是相同的
+ 逻辑复制:
逻辑复制同步数据的原理是,在wal日志产生的数据库上,由逻辑解析模块对wal日志进行初步的解析,将解析出的sql重新发送到订阅端,订阅端重新执行insert、delete、update的操作
逻辑订阅源码分析
订阅端 walsender.c
1.startLogicalreplication
2.XlogSendLogical
static voidXLogSendLogical(void){ XLogRecord *record; char *errm; //XLogReadRecord拿到一条完整的的Xlog record static XLogRecPtr flushPtr = InvalidXLogRecPtr; WalSndCaughtUp = false; record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm); logical_startptr = InvalidXLogRecPtr; /* xlog record was invalid */ if (errm != NULL) elog(ERROR, "%s", errm);//识别record的数据内容,封装到ReorderBufferChange结构中。一个ReorderBufferChange对应一次修改 if (record != NULL) { LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); sentPtr = logical_decoding_ctx->reader->EndRecPtr; } //判断 if (flushPtr == InvalidXLogRecPtr) flushPtr = GetFlushRecPtr(); else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) flushPtr = GetFlushRecPtr(); if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) WalSndCaughtUp = true; if (WalSndCaughtUp && got_STOPPING) got_SIGUSR2 = true; //更新 { WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); walsnd->sentPtr = sentPtr; SpinLockRelease(&walsnd->mutex); }}
void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
{
XLogRecordBuffer buf;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
buf.record = record;
/* cast so we get a warning when new rmgrs are added */
switch ((RmgrIds) XLogRecGetRmid(record))
{
/*
* Rmgrs we care about for logical decoding. Add new rmgrs in
* rmgrlist.h's order.
*/
case RM_XLOG_ID:
DecodeXLogOp(ctx, &buf);
break;
case RM_XACT_ID:
DecodeXactOp(ctx, &buf);
break;
case RM_STANDBY_ID:
DecodeStandbyOp(ctx, &buf);
break;
case RM_HEAP2_ID:
DecodeHeap2Op(ctx, &buf);
break;
case RM_HEAP_ID:
DecodeHeapOp(ctx, &buf);
break;
case RM_LOGICALMSG_ID:
DecodeLogicalMsgOp(ctx, &buf);
break;
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
* order.
*/
case RM_SMGR_ID:
case RM_CLOG_ID:
case RM_DBASE_ID:
case RM_TBLSPC_ID:
case RM_MULTIXACT_ID:
case RM_RELMAP_ID:
case RM_BTREE_ID:
case RM_HASH_ID:
case RM_GIN_ID:
case RM_GIST_ID:
case RM_SEQ_ID:
case RM_SPGIST_ID:
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
case RM_REPLORIGIN_ID:
case RM_GENERIC_ID:
/* just deal with xid, and done */
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
buf.origptr);
break;
case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
}
}
以insert 为例
static void
DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
Size datalen;
char *tupledata;
Size tuplelen;
XLogReaderState *r = buf->record;
xl_heap_insert *xlrec;
ReorderBufferChange *change;
RelFileNode target_node;
xlrec = (xl_heap_insert *) XLogRecGetData(r);
.......
//为change 申请分配空间
change = ReorderBufferGetChange(ctx->reorder);
//组装change
if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
change->action = REORDER_BUFFER_CHANGE_INSERT;
else
change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
tupledata = XLogRecGetBlockData(r, 0, &datalen);
tuplelen = datalen - SizeOfHeapHeader;
change->data.tp.newtuple =
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
change->data.tp.clear_toast_afterwards = true;
//将change加入双向链表
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}