データ伝送タスクのソースで、テーブルへの列の追加などの DDL 操作が実行された場合、トリガーを使用して DDL イベントの情報をキャプチャし、DML 操作の正常な同期を確保できます。
背景情報
データ伝送サービスを使用して PostgreSQL データベースから増分同期を行う場合、同期できるのは INSERT、DELETE、UPDATE などの DML 操作のみです。しかし、データ移行タスクのソースデータベースでテーブルへの列の追加などの DDL 操作が実行され、データ伝送サービスが新しいテーブルスキーマを取得しない場合、このテーブルオブジェクトに関連する DML 操作の同期に失敗し、タスクが中断されます。したがって、DML 操作の正常な同期を確保するには、まずソースデータベースに DDL レコードテーブルとトリガーを作成して、DDL 操作情報をキャプチャする必要があります。
制限事項
ソース PostgreSQL データベースのバージョンは V10.x 以降である必要があります。
操作手順
ソースの PostgreSQL データベースインスタンスにログインし、データが移行されるデータベースに切り替えます。
この例では、psql を使用してログインします。psql で
\c <database name>コマンドを実行して、データが移行されるデータベースに切り替えます。次の 2 つのテーブルを作成して DDL イベントを記録します。
非 drop イベントの情報を記録するテーブル
----postgreSQL DDL EVENT TABLE---- CREATE TABLE public.oms_non_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", objid integer, top_objid integer, object_identity text COLLATE pg_catalog."default", top_object_identity text COLLATE pg_catalog."default", top_schema text COLLATE pg_catalog."default", all_children_table text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_non_dropped_ddl_command REPLICA IDENTITY FULL;drop イベントの情報を記録するテーブル
----postgreSQL DROP DDL EVENT TABLE---- CREATE TABLE public.oms_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", objid integer, object_type text COLLATE pg_catalog."default", object_name text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_dropped_ddl_command REPLICA IDENTITY FULL;
次の 2 つの DDL イベントトリガー関数を作成します。
非 drop イベントトリガー関数
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_non_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; DECLARE obj record; DECLARE top_objid integer; DECLARE parent_oid integer; DECLARE top_object_identity text; DECLARE top_schema text; DECLARE all_children_table text; BEGIN SELECT current_query() INTO ddl_text; FOR obj IN (SELECT * FROM pg_event_trigger_ddl_commands() WHERE TG_TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA', 'COMMENT')) LOOP record_object:=obj; SELECT inhparent FROM pg_inherits WHERE inhrelid = obj.objid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO parent_oid; WHILE parent_oid IS NOT NULL LOOP SELECT inhparent FROM pg_inherits WHERE inhrelid = parent_oid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO top_objid; IF top_objid IS NULL THEN top_objid := parent_oid; exit; END IF; parent_oid := top_objid; END LOOP; IF top_objid IS NULL THEN top_objid :=record_object.objid; ELSE END IF; IF regexp_matches(ddl_text, 'ALTER\s+TABLE\s+.+\s+RENAME\s+TO\s+.+', 'i') IS NOT NULL THEN WITH RECURSIVE child_tables AS ( SELECT oid AS table_objid FROM pg_class WHERE oid = top_objid UNION SELECT c.oid AS table_objid FROM pg_class c JOIN pg_inherits i ON c.oid = i.inhrelid JOIN child_tables ct ON i.inhparent = ct.table_objid ) SELECT string_agg(table_objid::text, ', ') INTO all_children_table FROM child_tables WHERE table_objid <> top_objid; ELSE END IF; SELECT pn.nspname, ss.relname INTO obj from pg_catalog.pg_class ss JOIN pg_catalog.pg_namespace pn ON ss.relnamespace = pn.oid WHERE ss.oid = top_objid; top_object_identity:=obj.relname; top_schema:=obj.nspname; IF TG_TAG ='CREATE TABLE' AND record_object.object_type='table' THEN EXECUTE 'ALTER TABLE ' || record_object.object_identity || ' REPLICA IDENTITY FULL'; ELSE END IF; INSERT INTO public.oms_non_dropped_ddl_command(id,ddl_text,tag,database,schema,object_type, objid, top_objid, object_identity,top_object_identity,top_schema,all_children_table) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema,record_object.object_type, record_object.objid, top_objid ,record_object.object_identity,top_object_identity,top_schema,all_children_table); END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS ddl trigger error during command process: %', SQLERRM; END $BODY$;drop イベントトリガー関数
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; BEGIN SELECT current_query() INTO ddl_text; FOR record_object in (select * from pg_event_trigger_dropped_objects() WHERE TG_TAG NOT IN ('ALTER TABLE','DROP PUBLICATION','ALTER PUBLICATION')) LOOP IF record_object.object_type = 'type' THEN ELSE INSERT INTO public.oms_dropped_ddl_command(id,ddl_text,tag,database,schema, objid, object_type, object_name) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema, record_object.objid, record_object.object_type, record_object.object_name); END IF; END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS drop ddl trigger error during command process: %', SQLERRM; END $BODY$;
作成した関数のオーナーを、データ伝送サービスをソースデータベースに接続するためのアカウントに変更します。アカウント名が
postgresであると仮定します。ALTER FUNCTION public.oms_capture_ddl_for_non_dropped() OWNER TO postgres; ALTER FUNCTION public.oms_capture_ddl_for_dropped() OWNER TO postgres;次の文を実行して、グローバルイベントトリガーを作成します。
CREATE EVENT TRIGGER oms_capture_ddl_for_non_dropped ON ddl_command_end WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA','COMMENT') EXECUTE PROCEDURE public.oms_capture_ddl_for_non_dropped(); CREATE EVENT TRIGGER oms_capture_ddl_for_dropped ON sql_drop EXECUTE PROCEDURE public.oms_capture_ddl_for_dropped();
次のステップ
データ移行タスクをリリースした後、ソースデータベースにログインし、次の文を実行してトリガー関数と対応するテーブルを削除します。
drop EVENT trigger oms_capture_ddl_for_dropped;
drop EVENT trigger oms_capture_ddl_for_non_dropped;
drop function public.oms_capture_ddl_for_non_dropped();
drop function public.oms_capture_ddl_for_dropped();
drop table public.oms_non_dropped_ddl_command;
drop table public.oms_dropped_ddl_command;