前提条件
- 当前支持的DDL操作包含如下:
- 表级同步支持:TRUNCATE(仅PostgreSQL 11及以上版本支持)、DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN、ADD CONSTRAINT、DROP CONSTRAINT、RENAME)。
- 库级同步支持:TRUNCATE(仅PostgreSQL 11及以上版本支持)、CREATE SCHEMA/TABLE、DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN、ADD CONSTRAINT、DROP CONSTRAINT、RENAME)、CREATE SEQUENCE、DROP SEQUENCE、ALTER SEQUENCE、CREATE INDEX、ALTER INDEX、DROP INDEX、CREATE VIEW、ALTER VIEW。
注意
- 表级同步:RENAME表名之后,向更改名称后的表插入新的数据时,DRS不会同步新的数据到目标库。
- 库级同步:源库使用非CREATE TABLE方式创建的表不会同步到目标库。常见地如:使用CREATE TABLE AS创建表、调用函数创建表。
- 暂不支持以注释开头的DDL语句的同步,以注释开头的DDL语句将被忽略。
- 不支持函数和存储过程中DDL语句的同步,函数和存储过程中执行的DDL语句将被忽略。
- 源库和目标库版本不同时,请使用源库和目标库都兼容的SQL语句执行DDL操作。例如:源库为pg11,目标库为pg12,要将源库表的列类型从char修改为int时,请使用如下语句:
alter table tablename alter column columnname type int USING columnname::int;
- 执行如下操作步骤前,请检查待同步的源数据库public模式下,是否存在名为hwdrs_ddl_info的表、名为hwdrs_ddl_function()的函数、名为hwdrs_ddl_event的触发器。如存在,请将其删除。
- 库级同步时,如创建无主键表,请执行如下命令,将无主键表复制属性设置为full。
alter table tablename replica identity full;
操作步骤
说明若源库为本云RDS for PostgreSQL,可以使用root用户创建相关对象,如果执行时报“Must be superuser to create an event trigger”错误,可以通过工单申请处理。本云RDS for PostgreSQL的root用户权限请参见RDS用户指南。
步骤 1 使用拥有创建事件触发器权限的用户连接要同步的数据库。
步骤 2 执行如下语句,创建存储DDL信息的表。
DROP TABLE IF EXISTS public.hwdrs_ddl_info;
DROP SEQUENCE IF EXISTS public.hwdrs_ddl_info_id_seq;
CREATE TABLE public.hwdrs_ddl_info(
id bigserial primary key,
ddl text,
username varchar(64) default current_user,
txid varchar(16) default txid_current()::varchar(16),
tag varchar(64),
database varchar(64) default current_database(),
schema varchar(64) default current_schema,
client_address varchar(64) default inet_client_addr(),
client_port integer default inet_client_port(),
event_time timestamp default current_timestamp
);
步骤 3 执行如下语句,创建函数。
CREATE OR REPLACE FUNCTION public.hwdrs_ddl_function()
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY INVOKER
AS BODYBODY
declare ddl text;
declare real_num int;
declare max_num int := 50000;
begin
if (tg_tag in ('CREATE TABLE','ALTER TABLE','DROP TABLE','CREATE SCHEMA','CREATE SEQUENCE','ALTER SEQUENCE','DROP SEQUENCE','CREATE VIEW','ALTER VIEW','DROP VIEW','CREATE INDEX','ALTER INDEX','DROP INDEX')) then
select current_query() into ddl;
insert into public.hwdrs_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time)
values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema, inet_client_addr(), inet_client_port(), current_timestamp);
select count(id) into real_num from public.hwdrs_ddl_info;
if real_num > max_num then
if current_setting('server_version_num')::int<100000 then
delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hwdrs_ddl_info_pkey')::oid and mode='RowExclusiveLock');
else
delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress');
end if;
end if;
end if;
end;
BODYBODY;
步骤 4 执行以下语句,为步骤2和步骤3中创建的对象赋予必要权限。
GRANT USAGE ON SCHEMA public TO public;
GRANT SELECT,INSERT,DELETE ON TABLE public.hwdrs_ddl_info TO public;
GRANT SELECT,USAGE ON SEQUENCE public.hwdrs_ddl_info_id_seq TO public;
GRANT EXECUTE ON FUNCTION public.hwdrs_ddl_function() TO public;
步骤 5 执行以下语句,创建DDL事件触发器。
CREATE EVENT TRIGGER hwdrs_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.hwdrs_ddl_function();
步骤 6 执行以下语句,将创建的事件触发器设置为enable。
ALTER EVENT TRIGGER hwdrs_ddl_event ENABLE ALWAYS;
步骤 7 返回数据复制服务控制台,创建PostgreSQL->RDS for PostgreSQL的同步任务。
步骤 8 待同步任务结束后,请执行下语句删除创建的表、函数、触发器。
DROP EVENT trigger hwdrs_ddl_event;
DROP FUNCTION public.hwdrs_ddl_function();
DROP TABLE public.hwdrs_ddl_info;