データ転送タスクのソースで、テーブルへのフィールドの追加などのデータ定義言語(DDL)操作が実行された場合、トリガーを使用して DDL イベントに関する情報をキャプチャし、データ操作言語(DML)操作の正常な同期を確保できます。
背景
PostgreSQL データベースからの増分同期にデータ転送サービスを使用する場合、INSERT、DELETE、UPDATE などの DML 操作のみを同期できます。ただし、データ移行タスクのソースデータベースで、テーブルへの列の追加などの DDL 操作が実行され、データ転送サービスが新しいテーブルスキーマを取得しない場合、このテーブルオブジェクトに関連する DML 操作は同期に失敗し、タスクが中断されます。そのため、まずソースデータベースに DDL レコードテーブルとトリガーを作成して DDL 操作情報をキャプチャし、DML 操作の正常な同期を確保する必要があります。
制限事項
ソース PostgreSQL データベースのバージョンは V10.x 以降である必要があります。
手順
ソース PostgreSQL データベースインスタンスにログインし、データが移行されるデータベースに切り替えます。
この例では、psql を使用してログインします。 psql で
\c <データベース名>コマンドを実行して、データが移行されるデータベースに切り替えます。次の2つのテーブルを作成して、DDL イベントを記録します。
削除以外のイベントに関する情報を記録するテーブル
----PostgreSQL DDL イベントテーブル---- CREATE TABLE public.oms_non_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", objid integer, top_objid integer, object_identity text COLLATE pg_catalog."default", top_object_identity text COLLATE pg_catalog."default", top_schema text COLLATE pg_catalog."default", all_children_table text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_non_dropped_ddl_command REPLICA IDENTITY FULL;削除イベントに関する情報を記録するテーブル
----PostgreSQL 削除 DDL イベントテーブル---- CREATE TABLE public.oms_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", objid integer, object_type text COLLATE pg_catalog."default", object_name text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_dropped_ddl_command REPLICA IDENTITY FULL;
次の2つの DDL イベントトリガー関数を作成します。
削除以外のイベントトリガー関数
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_non_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; DECLARE obj record; DECLARE top_objid integer; DECLARE parent_oid integer; DECLARE top_object_identity text; DECLARE top_schema text; DECLARE all_children_table text; BEGIN SELECT current_query() INTO ddl_text; FOR obj IN (SELECT * FROM pg_event_trigger_ddl_commands() WHERE TG_TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA')) LOOP record_object:=obj; SELECT inhparent FROM pg_inherits WHERE inhrelid = obj.objid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO parent_oid; WHILE parent_oid IS NOT NULL LOOP SELECT inhparent FROM pg_inherits WHERE inhrelid = parent_oid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO top_objid; IF top_objid IS NULL THEN top_objid := parent_oid; exit; END IF; parent_oid := top_objid; END LOOP; IF top_objid IS NULL THEN top_objid :=record_object.objid; ELSE END IF; IF regexp_matches(ddl_text, 'ALTER\s+TABLE\s+.+\s+RENAME\s+TO\s+.+', 'i') IS NOT NULL THEN WITH RECURSIVE child_tables AS ( SELECT oid AS table_objid FROM pg_class WHERE oid = top_objid UNION SELECT c.oid AS table_objid FROM pg_class c JOIN pg_inherits i ON c.oid = i.inhrelid JOIN child_tables ct ON i.inhparent = ct.table_objid ) SELECT string_agg(table_objid::text, ', ') INTO all_children_table FROM child_tables WHERE table_objid <> top_objid; ELSE END IF; SELECT pn.nspname, ss.relname INTO obj from pg_catalog.pg_class ss JOIN pg_catalog.pg_namespace pn ON ss.relnamespace = pn.oid WHERE ss.oid = top_objid; top_object_identity:=obj.relname; top_schema:=obj.nspname; IF TG_TAG ='CREATE TABLE' AND record_object.object_type='table' THEN EXECUTE 'ALTER TABLE ' || record_object.object_identity || ' REPLICA IDENTITY FULL'; ELSE END IF; INSERT INTO public.oms_non_dropped_ddl_command(id,ddl_text,tag,database,schema,object_type, objid, top_objid, object_identity,top_object_identity,top_schema,all_children_table) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema,record_object.object_type, record_object.objid, top_objid ,record_object.object_identity,top_object_identity,top_schema,all_children_table); END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS DDL トリガーがコマンド処理中にエラーを発生させました: %', SQLERRM; END $BODY$;削除イベントトリガー関数
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; BEGIN SELECT current_query() INTO ddl_text; FOR record_object in (select * from pg_event_trigger_dropped_objects() WHERE TG_TAG NOT IN ('ALTER TABLE','DROP PUBLICATION','ALTER PUBLICATION')) LOOP IF record_object.object_type = 'type' THEN ELSE INSERT INTO public.oms_dropped_ddl_command(id,ddl_text,tag,database,schema, objid, object_type, object_name) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema, record_object.objid, record_object.object_type, record_object.object_name); END IF; END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS 削除 DDL トリガーがコマンド処理中にエラーを発生させました: %', SQLERRM; END $BODY$;
作成した関数の所有者を、データ転送サービスをソースデータベースに接続するためのアカウントに変更します。アカウント名が
postgresであるとします。ALTER FUNCTION public.oms_capture_ddl_for_non_dropped() OWNER TO postgres; ALTER FUNCTION public.oms_capture_ddl_for_dropped() OWNER TO postgres;次のステートメントを実行して、グローバルイベントトリガーを作成します。
CREATE EVENT TRIGGER oms_capture_ddl_for_non_dropped ON ddl_command_end WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA') EXECUTE PROCEDURE public.oms_capture_ddl_for_non_dropped(); CREATE EVENT TRIGGER oms_capture_ddl_for_dropped ON sql_drop EXECUTE PROCEDURE public.oms_capture_ddl_for_dropped();
次のステップ
データ移行タスクをリリースした後、ソースデータベースにログインし、次のステートメントを実行して、トリガー関数と対応するテーブルを削除します。
drop EVENT trigger oms_capture_ddl_for_dropped;
drop EVENT trigger oms_capture_ddl_for_non_dropped;
drop function public.oms_capture_ddl_for_non_dropped();
drop function public.oms_capture_ddl_for_dropped();
drop table public.oms_non_dropped_ddl_command;
drop table public.oms_dropped_ddl_command;