searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

PostgreSQL的逻辑复制

2023-10-27 06:59:06
122
0
WAl日志的复制是数据库高可用方案之一,是保证数据安全的一种常用手段,可以保证数据同步;并且也能保证主库宕机后能够以最快的回复对外服务的能力,对业务影响较小。pg中提供了两种复制方式:物理复制和逻辑复制。

物理复制

物理复制是指将主库WAL日志直接发到备库,随后备库完全应用的一种复制方式。在pg10之前使用。其流程如下所示:主库的WAL通过wal sender将wal发送至从库,从库的wal receiver负责接收,然后写入从库的wal日志中,随后执行相应的事务。

逻辑复制

逻辑复制从pg10开始引入,其与物理复制的主要区别在于,逻辑复制会将WAL日志进行逻辑解码,形成特定的格式,再进行发送。流程如下所示。逻辑复制的原理其实是主节点将其publication(发布)中的表的WAL日志进行解析后,形成一种特殊格式的日志流,传送给从节点;之后从节点上的subscription(订阅)对这些日志流进行进行解析重返,从而达到同步表数据的功能。这之间的WAL传输同样都是通过各自服务器上的wal sender和wal receiver进程说完成的。

复制槽

对于物理日志和逻辑复制,它们都会基于复制槽。复制槽提供了一种自动化的方法来确保主库(发布节点)在所有的备库(订阅节点)收到 WAL 记录之前不会移除它们,并且主库也不会移除可能导致恢复冲突的行,即使备库断开也是如此。比如,可以避免从库在执行较长时间的查询,而此时主库进行删除操作从而产生冲突的情况。

复制槽是记录复制状态的一组信息,由于WAL文件在数据真正落盘后会删除,复制槽会防止过早清除复制所需要的WAL日志。

查看复制槽:

select * from pg_replication_slots;

上图中的列名所代表的含义如下,例如:
slot_name 一个唯一的、集簇范围内的复制槽标识符
plugin 包含这个逻辑槽正在使用的输出插件的共享对象基础名称,对于物理槽为空值。
slot_type  槽类型 - physical或者logical
datoid  (参考 pg_database.oid)与这个槽相关的数据库的OID,或者为空值。只有逻辑槽具有相关的数据库。
database   (参考 pg_database.datname)与这个槽相关的数据库的名称,或者为空值。只有逻辑槽具有相关的数据库。
temporary 如果这是一个临时复制槽则为真。临时槽不会被保存在磁盘上并且会在出错或会话结束时自动被删除掉。
active 如果这个槽当前正在被使用则为真
active_pid 如果槽当前正在被使用,则记录使用这个槽的会话的进程 ID。如果不活动则为NULL。
xmin 这个槽要需要数据库保留的最旧事务的xid。VACUUM不能移除被其后续事务删除的元组。
catalog_xmin 这个槽要需要数据库保留的影响系统目录的最旧事务的xid。VACUUM不能移除被其后续事务删除的目录元组。
restart_lsn 可能仍被这个槽的消费者要求的最旧WAL地址(LSN),并且因此不会在检查点期间自动被移除。 如果这个槽的LSN从未被保留过,则为NULL。
confirmed_flush_lsn 代表逻辑槽的消费者已经确认接收数据到什么位置的地址(LSN)。 比这个地址更旧的数据已经不再可用。对于物理槽这里是NULL。
wal_status 此插槽声称的 WAL 文件的可用性。可能的值为:reserved 意味着声称的文件包含 max_wal_size。extended意味着max_wal_size已超出,但文件仍保留,通过复制插槽或wal_keep_size。unreserved意味着该插槽不再保留所需的 WAL 文件,并且将在下一个检查点删除其中一些文件。 此状态可以返回到reserved或extended。lost意味着某些需要的 WAL 文件已被删除,并且此插槽不再可用。最后两种状态仅在max_slot_wal_keep_size为非负值时才看到。 如果restart_lsn为 NULL,则此字段为空。
safe_wal_size 可写入 WAL 的字节数,以便此插槽不会处于"丢失"状态的危险中。 对丢失插槽它是NULL,以及如果max_slot_wal_keep_size是-1。
two_phase 如果该插槽为解码准备事务所启用则为真。物理插槽总是为假。

在逻辑复制中,创建复制槽的方式如下,主要有两种方法:

前提,主库已经创建了发布:

create publication pub FOR TABLE tt; 
  • 方法一:

        备库端:

create subscription pgoutput_slot connection 'host=主机地址 port=端口号 user=用户名 dbname=数据库名 password=密码' publication pub;

       这里会默认创建订阅以及在主库中创建复制槽,两者名称都为pgoutput_slot。

  • 方法二:

        主库端:

select pg_create_logical_replication_slot('pgoutput_slot', 'pgoutput');

       备库端:

create subscription pgoutput_slot connection 'host=主机地址 port=端口号 user=用户名 dbname=数据库名 password=密码' publication pub WITH (create_slot = false,enabled=true);

逻辑复制中的相关概念

订阅模式

在逻辑复制中,pg采用发布订阅模型作为整体的架构,主库创建发布,备库创建订阅。其特点如下:

  • 操作选择性发布,可以 insert 、delete 、update这些进行选择性操作。
  • 表格选择性发布,可以挑选相应的表格进行发布。
  • 一个数据库可以有多个发布,保证发布不重名即可。
  • 一个发布允许有多个订阅者。
  • 支持复制DML操作,不支持Truncate和DDL操作。

输出插件

输出插件负责将WAL日志解码为可读的格式,常用的插件用pgoutput(默认使用)、test_decoding(输出为字符串),wal2json(输出为json)、decoderbufs(输出为二进制)等。结构图如下所示。

目前,通过测试发现,pg蛀虫之间若要实现逻辑复制,需要使用pg的默认插件(pgoutput),而采用test_decoding或者wal2json是无法实现的,即此情况下主库数据发生变化,从库不会进行相应更改。

若要查看test_decoding或wal2json的输出结果,则在主库端:

pg_recvlogical --start -S 复制槽名称  -d 数据库名称 -f 输出文件路径

以插入数据为例,test_decoding输出:

SQL
BEGIN 793
table public.tt: INSERT: id[integer]:3 name[text]:'3'
COMMIT 793

wal2json输出:

{"change":[{"kind":"insert","schema":"public","table":"tt","columnnames":["id","name"],"columntypes":["integer","text"],"columnvalues":[3,"3"]}]}

逻辑复制流程

pgoutput

发布端:

 其中,WalSndLoop()会启动一个死循环,不断地去查看是否还有需要发送的wal片段,如果有的话,会调用XLogSendLogical()进行发送。

接收端:

其中,LogicalRepApplyLoop()也会启用一个死循环,执行的功能包括接收发送过来的wal,并且还会启动事务将接收到的wal进行执行,就会调用执行事务的相关函数。

wal2json

wal2json的运行流程如下所示(以insert为例),主要经过两部分:

  • 首先是heap_decode(),作用是解析元组的信息,然后会调用DecodeInsert(),随后调用pg_filter_by_origin函数对信息进行过滤。

然后是xact_decode(),它会调用这个DecodeCommit(),随后该函数会依次调用filter、decode_begin、decode_change和decode_commit这四个函数。其中,decode_begin会记录事务的一些信息,比如xid、nextlsn、timestamp,“change”等,如果采用默认输出,则该函数记录的值为“{"change":[”;decode_change则会记录具体的变化信息,比如schema、table、元数据、新数据、操作类型(增删改)等;decode_commit则记录的是]和}。另外,图中黄色区域的函数都通过回调的方式进行。

自定义输出插件

输出插件是通过动态载入一个以输出插件名称作为基础名称的共享库来载入的,使用普通的库搜索路径来定位该库。在pg中,LoadOutputPlugin()用于加载插件,其代码如下。其中,load_external_function()中函数要求,加载的插件中必须有一个名为_PG_output_plugin_init的函数,用于插件的初始化。

C
static void
LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
{
    LogicalOutputPluginInit plugin_init;

    plugin_init = (LogicalOutputPluginInit)
        load_external_function(plugin, "_PG_output_plugin_init", false, NULL);

    if (plugin_init == NULL)
        elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");

    /* ask the output plugin to fill the callback struct */
    plugin_init(callbacks);

    if (callbacks->begin_cb == NULL)
        elog(ERROR, "output plugins have to register a begin callback");
    if (callbacks->change_cb == NULL)
        elog(ERROR, "output plugins have to register a change callback");
    if (callbacks->commit_cb == NULL)
        elog(ERROR, "output plugins have to register a commit callback");
}

通常,插件的初始化作为绑定回调函数。wal2json中的情况如下所示,例如将cb结构体中的startup_cb函数指针指向自己实现的pg_decode_startup函数。

C
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
    AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);

    cb->startup_cb = pg_decode_startup;
    cb->begin_cb = pg_decode_begin_txn;
    cb->change_cb = pg_decode_change;
    cb->commit_cb = pg_decode_commit_txn;
    cb->shutdown_cb = pg_decode_shutdown;
#if PG_VERSION_NUM >= 90500
    cb->filter_by_origin_cb = pg_filter_by_origin;
#endif
#if PG_VERSION_NUM >= 90600
    cb->message_cb = pg_decode_message;
#endif
#if PG_VERSION_NUM >= 110000
    cb->truncate_cb = pg_decode_truncate;
#endif
}

OutputPluginCallbacks结构体是pg提供的用于提供函数指针用于回调,其结构如下。其中,函数begin_cb、change_cb 以及commit_cb是必需的,剩下的则是可选的。

C
typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;

    /* streaming of changes at prepare time */
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;

    /* streaming of changes */
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

对于产生输出,可以把数据写入到ctx->out中的StringInfo输出缓冲区中。在写出到输出缓冲区之前,必须先调用OutputPluginPrepareWrite(ctx,last_write)进行准备,在完成写入到缓冲区后,再调用 OutputPluginWrite(ctx, last_write)来执行写出。基本的调用顺序如下所示:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid); // 输入:“BEGIN ”+ txn的xid
OutputPluginWrite(ctx, true);
0条评论
0 / 1000
z****n
4文章数
0粉丝数
z****n
4 文章 | 0 粉丝
z****n
4文章数
0粉丝数
z****n
4 文章 | 0 粉丝
原创

PostgreSQL的逻辑复制

2023-10-27 06:59:06
122
0
WAl日志的复制是数据库高可用方案之一,是保证数据安全的一种常用手段,可以保证数据同步;并且也能保证主库宕机后能够以最快的回复对外服务的能力,对业务影响较小。pg中提供了两种复制方式:物理复制和逻辑复制。

物理复制

物理复制是指将主库WAL日志直接发到备库,随后备库完全应用的一种复制方式。在pg10之前使用。其流程如下所示:主库的WAL通过wal sender将wal发送至从库,从库的wal receiver负责接收,然后写入从库的wal日志中,随后执行相应的事务。

逻辑复制

逻辑复制从pg10开始引入,其与物理复制的主要区别在于,逻辑复制会将WAL日志进行逻辑解码,形成特定的格式,再进行发送。流程如下所示。逻辑复制的原理其实是主节点将其publication(发布)中的表的WAL日志进行解析后,形成一种特殊格式的日志流,传送给从节点;之后从节点上的subscription(订阅)对这些日志流进行进行解析重返,从而达到同步表数据的功能。这之间的WAL传输同样都是通过各自服务器上的wal sender和wal receiver进程说完成的。

复制槽

对于物理日志和逻辑复制,它们都会基于复制槽。复制槽提供了一种自动化的方法来确保主库(发布节点)在所有的备库(订阅节点)收到 WAL 记录之前不会移除它们,并且主库也不会移除可能导致恢复冲突的行,即使备库断开也是如此。比如,可以避免从库在执行较长时间的查询,而此时主库进行删除操作从而产生冲突的情况。

复制槽是记录复制状态的一组信息,由于WAL文件在数据真正落盘后会删除,复制槽会防止过早清除复制所需要的WAL日志。

查看复制槽:

select * from pg_replication_slots;

上图中的列名所代表的含义如下,例如:
slot_name 一个唯一的、集簇范围内的复制槽标识符
plugin 包含这个逻辑槽正在使用的输出插件的共享对象基础名称,对于物理槽为空值。
slot_type  槽类型 - physical或者logical
datoid  (参考 pg_database.oid)与这个槽相关的数据库的OID,或者为空值。只有逻辑槽具有相关的数据库。
database   (参考 pg_database.datname)与这个槽相关的数据库的名称,或者为空值。只有逻辑槽具有相关的数据库。
temporary 如果这是一个临时复制槽则为真。临时槽不会被保存在磁盘上并且会在出错或会话结束时自动被删除掉。
active 如果这个槽当前正在被使用则为真
active_pid 如果槽当前正在被使用,则记录使用这个槽的会话的进程 ID。如果不活动则为NULL。
xmin 这个槽要需要数据库保留的最旧事务的xid。VACUUM不能移除被其后续事务删除的元组。
catalog_xmin 这个槽要需要数据库保留的影响系统目录的最旧事务的xid。VACUUM不能移除被其后续事务删除的目录元组。
restart_lsn 可能仍被这个槽的消费者要求的最旧WAL地址(LSN),并且因此不会在检查点期间自动被移除。 如果这个槽的LSN从未被保留过,则为NULL。
confirmed_flush_lsn 代表逻辑槽的消费者已经确认接收数据到什么位置的地址(LSN)。 比这个地址更旧的数据已经不再可用。对于物理槽这里是NULL。
wal_status 此插槽声称的 WAL 文件的可用性。可能的值为:reserved 意味着声称的文件包含 max_wal_size。extended意味着max_wal_size已超出,但文件仍保留,通过复制插槽或wal_keep_size。unreserved意味着该插槽不再保留所需的 WAL 文件,并且将在下一个检查点删除其中一些文件。 此状态可以返回到reserved或extended。lost意味着某些需要的 WAL 文件已被删除,并且此插槽不再可用。最后两种状态仅在max_slot_wal_keep_size为非负值时才看到。 如果restart_lsn为 NULL,则此字段为空。
safe_wal_size 可写入 WAL 的字节数,以便此插槽不会处于"丢失"状态的危险中。 对丢失插槽它是NULL,以及如果max_slot_wal_keep_size是-1。
two_phase 如果该插槽为解码准备事务所启用则为真。物理插槽总是为假。

在逻辑复制中,创建复制槽的方式如下,主要有两种方法:

前提,主库已经创建了发布:

create publication pub FOR TABLE tt; 
  • 方法一:

        备库端:

create subscription pgoutput_slot connection 'host=主机地址 port=端口号 user=用户名 dbname=数据库名 password=密码' publication pub;

       这里会默认创建订阅以及在主库中创建复制槽,两者名称都为pgoutput_slot。

  • 方法二:

        主库端:

select pg_create_logical_replication_slot('pgoutput_slot', 'pgoutput');

       备库端:

create subscription pgoutput_slot connection 'host=主机地址 port=端口号 user=用户名 dbname=数据库名 password=密码' publication pub WITH (create_slot = false,enabled=true);

逻辑复制中的相关概念

订阅模式

在逻辑复制中,pg采用发布订阅模型作为整体的架构,主库创建发布,备库创建订阅。其特点如下:

  • 操作选择性发布,可以 insert 、delete 、update这些进行选择性操作。
  • 表格选择性发布,可以挑选相应的表格进行发布。
  • 一个数据库可以有多个发布,保证发布不重名即可。
  • 一个发布允许有多个订阅者。
  • 支持复制DML操作,不支持Truncate和DDL操作。

输出插件

输出插件负责将WAL日志解码为可读的格式,常用的插件用pgoutput(默认使用)、test_decoding(输出为字符串),wal2json(输出为json)、decoderbufs(输出为二进制)等。结构图如下所示。

目前,通过测试发现,pg蛀虫之间若要实现逻辑复制,需要使用pg的默认插件(pgoutput),而采用test_decoding或者wal2json是无法实现的,即此情况下主库数据发生变化,从库不会进行相应更改。

若要查看test_decoding或wal2json的输出结果,则在主库端:

pg_recvlogical --start -S 复制槽名称  -d 数据库名称 -f 输出文件路径

以插入数据为例,test_decoding输出:

SQL
BEGIN 793
table public.tt: INSERT: id[integer]:3 name[text]:'3'
COMMIT 793

wal2json输出:

{"change":[{"kind":"insert","schema":"public","table":"tt","columnnames":["id","name"],"columntypes":["integer","text"],"columnvalues":[3,"3"]}]}

逻辑复制流程

pgoutput

发布端:

 其中,WalSndLoop()会启动一个死循环,不断地去查看是否还有需要发送的wal片段,如果有的话,会调用XLogSendLogical()进行发送。

接收端:

其中,LogicalRepApplyLoop()也会启用一个死循环,执行的功能包括接收发送过来的wal,并且还会启动事务将接收到的wal进行执行,就会调用执行事务的相关函数。

wal2json

wal2json的运行流程如下所示(以insert为例),主要经过两部分:

  • 首先是heap_decode(),作用是解析元组的信息,然后会调用DecodeInsert(),随后调用pg_filter_by_origin函数对信息进行过滤。

然后是xact_decode(),它会调用这个DecodeCommit(),随后该函数会依次调用filter、decode_begin、decode_change和decode_commit这四个函数。其中,decode_begin会记录事务的一些信息,比如xid、nextlsn、timestamp,“change”等,如果采用默认输出,则该函数记录的值为“{"change":[”;decode_change则会记录具体的变化信息,比如schema、table、元数据、新数据、操作类型(增删改)等;decode_commit则记录的是]和}。另外,图中黄色区域的函数都通过回调的方式进行。

自定义输出插件

输出插件是通过动态载入一个以输出插件名称作为基础名称的共享库来载入的,使用普通的库搜索路径来定位该库。在pg中,LoadOutputPlugin()用于加载插件,其代码如下。其中,load_external_function()中函数要求,加载的插件中必须有一个名为_PG_output_plugin_init的函数,用于插件的初始化。

C
static void
LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
{
    LogicalOutputPluginInit plugin_init;

    plugin_init = (LogicalOutputPluginInit)
        load_external_function(plugin, "_PG_output_plugin_init", false, NULL);

    if (plugin_init == NULL)
        elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");

    /* ask the output plugin to fill the callback struct */
    plugin_init(callbacks);

    if (callbacks->begin_cb == NULL)
        elog(ERROR, "output plugins have to register a begin callback");
    if (callbacks->change_cb == NULL)
        elog(ERROR, "output plugins have to register a change callback");
    if (callbacks->commit_cb == NULL)
        elog(ERROR, "output plugins have to register a commit callback");
}

通常,插件的初始化作为绑定回调函数。wal2json中的情况如下所示,例如将cb结构体中的startup_cb函数指针指向自己实现的pg_decode_startup函数。

C
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
    AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);

    cb->startup_cb = pg_decode_startup;
    cb->begin_cb = pg_decode_begin_txn;
    cb->change_cb = pg_decode_change;
    cb->commit_cb = pg_decode_commit_txn;
    cb->shutdown_cb = pg_decode_shutdown;
#if PG_VERSION_NUM >= 90500
    cb->filter_by_origin_cb = pg_filter_by_origin;
#endif
#if PG_VERSION_NUM >= 90600
    cb->message_cb = pg_decode_message;
#endif
#if PG_VERSION_NUM >= 110000
    cb->truncate_cb = pg_decode_truncate;
#endif
}

OutputPluginCallbacks结构体是pg提供的用于提供函数指针用于回调,其结构如下。其中,函数begin_cb、change_cb 以及commit_cb是必需的,剩下的则是可选的。

C
typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;

    /* streaming of changes at prepare time */
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;

    /* streaming of changes */
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

对于产生输出,可以把数据写入到ctx->out中的StringInfo输出缓冲区中。在写出到输出缓冲区之前,必须先调用OutputPluginPrepareWrite(ctx,last_write)进行准备,在完成写入到缓冲区后,再调用 OutputPluginWrite(ctx, last_write)来执行写出。基本的调用顺序如下所示:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid); // 输入:“BEGIN ”+ txn的xid
OutputPluginWrite(ctx, true);
文章来自个人专栏
postgresql
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0