9版本的时候,表之间的同步都是自己写触发器进行同步,从10开始支持逻辑复制,表之间的同步可以不用再写触发器同步。下面我们具体介绍一下。
逻辑复制由发布端和订阅端组成,订阅端通过回放wal同步发布端的数据.
典型使用场景:
- 满足业务上需求,实现某些指定表数据同步
- 报表系统,采集每个业务系统的数据到仓库
- PostgreSQL 跨版本数据同步
- PostgreSQL跨不同平台的同步
- PostgreSQL 大版本升级
- 多个数据库合并到一个数据库
接下来具体用实例展示一下:
首先在发布端修改鉴权:
修改pg_hba.conf,添加目标机器的访问鉴权
host all repuser 192.168.xxx.xxx/32 md5
注意:
发布端和订阅端需要相同的schema名称和相同的表名,表的字段类型,字段名也必须一样,除了表,其他对象不能被订阅。
发布端涉及需要修改的参数:
wal_level = logical
max_replication_slots //复制槽数量
max_logical_replication_workers //逻辑复制的worker数,数量从max_worker_processes消耗
max_worker_processes //数据库的最大worker数
订阅端涉及需要修改的参数
除以上参数外,要设置以下参数:
max_logical_replication_workers //至少设置为订阅端subscriber的数量,尤其是多个发布端,逻辑复制到同一个订阅端,一定要注意改参数,否则逻辑复制无法正常工作。
max_worker_processes //数量需要大于max_logical_replication_workers
发布端(源端):
创建发布者的时候,可以对单张表,多张表,所有表创建自己所需的发布者。
语法:
Command: CREATE PUBLICATION
Description: define a new publication
Syntax:
CREATE PUBLICATION name
[ FOR TABLE [ ONLY ] table_name [ * ] [, ...]
| FOR ALL TABLES ]
[ WITH ( publication_parameter [= value] [, ... ] ) ]
hank=> \h alter publication
Command: ALTER PUBLICATION
Description: change the definition of a publication
Syntax:
ALTER PUBLICATION name ADD TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name SET TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name DROP TABLE [ ONLY ] table_name [ * ] [, ...]
ALTER PUBLICATION name SET ( publication_parameter [= value] [, ... ] )
ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER PUBLICATION name RENAME TO new_name
创建tb1表,以及tb1的publication,并赋予权限
create user repuser replication login connection limit 10 encrypted password 'repuser';
grant connect on database hank to repuser;
grant USAGE on SCHEMA hank to repuser;
grant all on tb1 to repuser ;
create publication pub for table tb1;
create table tb1 (a int);
insert into tb1 values (1),(2),(3);
select * from tb1;
a
---
1
2
3
(3 rows)
订阅端(目标端):
相关语法:
postgres=# \h alter subscription
Command: ALTER SUBSCRIPTION
Description: change the definition of a subscription
Syntax:
ALTER SUBSCRIPTION name CONNECTION 'conninfo'
ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] [ WITH ( set_publication_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ENABLE
ALTER SUBSCRIPTION name DISABLE
ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] )
ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER SUBSCRIPTION name RENAME TO new_name
postgres=# \h create subscription
Command: CREATE SUBSCRIPTION
Description: define a new subscription
Syntax:
CREATE SUBSCRIPTION subscription_name
CONNECTION 'conninfo'
PUBLICATION publication_name [, ...]
[ WITH ( subscription_parameter [= value] [, ... ] ) ]
创建订阅者
CREATE SUBSCRIPTION sub CONNECTION 'dbname=hank host=192.168.173.69 user=repuser password=repuser' PUBLICATION pub;
如果发布端添加了新的表,在订阅端刷下一下发布者的信息
alter subscription sub refresh publication ;
查看数据已经同步
postgres=# select * from tb1;
a
---
1
2
3
(3 rows)
如果有update和delete,则需要replica identity,一般replica identity都是primary key,当然也可以是整行,整行的时候,效率比较低下。
没有replica identity发布端会报错,需要添加replica identity:
update tb1 set a=100 ;
ERROR: cannot update table "tb1" because it does not have a replica identity and publishes updates
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
alter table tb1 add primary key(a);
alter table tb1 replica identity using index tb1_pkey ;
update tb1 set a=100 ;
hank=> select * from tb1;
a
-----
100
(1 row)
订阅端也要添加主键:
没添加钱,没有同步
postgres=# select * from tb1;
a
---
1
(1 row)
添加主键后,数据已同步
postgres=# alter table tb1 add primary key(a);
ALTER TABLE
postgres=# select * from tb1;
a
-----
100
(1 row)
由于ddl不支持逻辑复制,所有在处理ddl的时候,可以发布端执行ddl,然后订阅端也执行ddl
发布端加字段并清理数据
hank=> select * from tb1;
a
---
1
hank=> alter table tb1 add column b text;
hank=> truncate table tb1;
TRUNCATE TABLE
订阅端刷新后,可见数据没有同步
postgres=# alter subscription sub refresh publication ;
ALTER SUBSCRIPTION
postgres=# select * from tb1;
a
---
1
(1 row)
订阅端也加上字段并刷新,逻辑复制恢复正常
postgres=# alter table tb1 add column b text;
ALTER TABLE
postgres=# select * from tb1;
a | b
---+---
(0 rows)
相关视图和表
#发布端
hank=> \dvt *publication*
List of relations
Schema | Name | Type | Owner
------------+-----------------------+-------+----------
pg_catalog | pg_publication | table | postgres
pg_catalog | pg_publication_rel | table | postgres
pg_catalog | pg_publication_tables | view | postgres
hank=> select * from pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
-------+---------+----------+--------------+-----------+-----------+-----------+-------------
16459 | pub | 16449 | f | t | t | t | t
(1 row)
hank=> select * from pg_publication_tables;
pubname | schemaname | tablename
---------+------------+-----------
pub | hank | tbl_test
pub | hank | tb1
#订阅端
postgres=# \dvt *subscription*
List of relations
Schema | Name | Type | Owner
------------+----------------------+-------+----------
pg_catalog | pg_stat_subscription | view | postgres
pg_catalog | pg_subscription | table | postgres
pg_catalog | pg_subscription_rel | table | postgres
postgres=# \x
Expanded display is on.
postgres=# select * from pg_stat_subscription ;
-[ RECORD 1 ]---------+------------------------------
subid | 16396
subname | sub
pid | 9721
relid |
received_lsn | 0/3393138
last_msg_send_time | 2021-03-17 18:02:34.270487+08
last_msg_receipt_time | 2021-03-17 18:02:34.270927+08
latest_end_lsn | 0/3393138
latest_end_time | 2021-03-17 18:02:34.270487+08
postgres=# select * from pg_subscription;
-[ RECORD 1 ]---+----------------------------------------------------------
oid | 16396
subdbid | 13593
subname | sub
subowner | 10
subenabled | t
subconninfo | dbname=hank host=10.4.9.166 user=repuser password=repuser
subslotname | sub
subsynccommit | off
subpublications | {pub}
逻辑复制限制:
- DDL不支持
- 序列数据不支持
- 大对象数据类型不支持
- 不支持视图,雾化视图,外部表