すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for OceanBase:トリガーを作成する

最終更新日:Feb 27, 2025

データ転送タスクのソースで、テーブルへのフィールドの追加などのデータ定義言語(DDL)操作が実行された場合、トリガーを使用して DDL イベントに関する情報をキャプチャし、データ操作言語(DML)操作の正常な同期を確保できます。

背景

PostgreSQL データベースからの増分同期にデータ転送サービスを使用する場合、INSERTDELETEUPDATE などの DML 操作のみを同期できます。ただし、データ移行タスクのソースデータベースで、テーブルへの列の追加などの DDL 操作が実行され、データ転送サービスが新しいテーブルスキーマを取得しない場合、このテーブルオブジェクトに関連する DML 操作は同期に失敗し、タスクが中断されます。そのため、まずソースデータベースに DDL レコードテーブルとトリガーを作成して DDL 操作情報をキャプチャし、DML 操作の正常な同期を確保する必要があります。

制限事項

ソース PostgreSQL データベースのバージョンは V10.x 以降である必要があります。

手順

  1. ソース PostgreSQL データベースインスタンスにログインし、データが移行されるデータベースに切り替えます。

    この例では、psql を使用してログインします。 psql で \c <データベース名> コマンドを実行して、データが移行されるデータベースに切り替えます。

  2. 次の2つのテーブルを作成して、DDL イベントを記録します。

    • 削除以外のイベントに関する情報を記録するテーブル

      ----PostgreSQL DDL イベントテーブル---- 
      
          CREATE TABLE public.oms_non_dropped_ddl_command (
          id bigserial primary key,
          ddl_text text COLLATE pg_catalog."default",
          tag text COLLATE pg_catalog."default",
          database character varying COLLATE pg_catalog."default",
          schema character varying COLLATE pg_catalog."default",
          object_type character varying COLLATE pg_catalog."default",
          objid integer,
          top_objid integer,
          object_identity text COLLATE pg_catalog."default",
          top_object_identity text COLLATE pg_catalog."default",
          top_schema text COLLATE pg_catalog."default",
          all_children_table text COLLATE pg_catalog."default"
          );
          ALTER TABLE public.oms_non_dropped_ddl_command REPLICA IDENTITY FULL;
    • 削除イベントに関する情報を記録するテーブル

      ----PostgreSQL 削除 DDL イベントテーブル----
      
          CREATE TABLE public.oms_dropped_ddl_command
          (
          id bigserial primary key,
          ddl_text text COLLATE pg_catalog."default",
          tag text COLLATE pg_catalog."default",
          database character varying COLLATE pg_catalog."default",
          schema character varying COLLATE pg_catalog."default",
          objid integer,
          object_type text COLLATE pg_catalog."default",
          object_name text COLLATE pg_catalog."default"
          );
          ALTER TABLE public.oms_dropped_ddl_command REPLICA IDENTITY FULL;
  3. 次の2つの DDL イベントトリガー関数を作成します。

    • 削除以外のイベントトリガー関数

        CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_non_dropped()
          RETURNS event_trigger
          LANGUAGE 'plpgsql'
          COST 100
          VOLATILE NOT LEAKPROOF SECURITY DEFINER
          AS $BODY$
          DECLARE ddl_text text;
          DECLARE record_object record;
          DECLARE obj record;
          DECLARE top_objid integer;
          DECLARE parent_oid integer;
          DECLARE top_object_identity text; 
          DECLARE top_schema text;
          DECLARE all_children_table text;
      
          BEGIN
          SELECT current_query() INTO ddl_text;
          FOR obj IN (SELECT * FROM pg_event_trigger_ddl_commands() WHERE TG_TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA')) LOOP
      	   record_object:=obj;
      	   SELECT inhparent FROM pg_inherits WHERE inhrelid = obj.objid
      	   AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO parent_oid;
             WHILE parent_oid IS NOT NULL LOOP
              SELECT inhparent
              FROM pg_inherits
              WHERE inhrelid = parent_oid
              AND inhparent::regclass::text NOT LIKE 'pg_%'
              ORDER BY inhseqno DESC
              LIMIT 1
              INTO top_objid;
      
              IF top_objid IS NULL THEN
                  top_objid := parent_oid;
                 exit;
              END IF;
              parent_oid := top_objid;
          END LOOP;
          IF top_objid IS NULL THEN
          top_objid :=record_object.objid;
          ELSE
          END IF;
          IF regexp_matches(ddl_text, 'ALTER\s+TABLE\s+.+\s+RENAME\s+TO\s+.+', 'i') IS NOT NULL THEN
          WITH RECURSIVE child_tables AS (
          SELECT oid AS table_objid
          FROM pg_class
          WHERE oid = top_objid
          UNION
              SELECT c.oid AS table_objid
          FROM pg_class c
          JOIN pg_inherits i ON c.oid = i.inhrelid
          JOIN child_tables ct ON i.inhparent = ct.table_objid
          )
          SELECT string_agg(table_objid::text, ', ') INTO all_children_table FROM child_tables WHERE table_objid <> top_objid;
          ELSE
          END IF;
          SELECT pn.nspname, ss.relname INTO obj from pg_catalog.pg_class ss JOIN pg_catalog.pg_namespace pn ON ss.relnamespace = pn.oid WHERE ss.oid = top_objid;
          top_object_identity:=obj.relname;
          top_schema:=obj.nspname;
          IF TG_TAG ='CREATE TABLE' AND record_object.object_type='table' THEN
          EXECUTE 'ALTER TABLE ' || record_object.object_identity || ' REPLICA IDENTITY FULL';
          ELSE
          END IF;
          INSERT INTO public.oms_non_dropped_ddl_command(id,ddl_text,tag,database,schema,object_type, objid, top_objid, object_identity,top_object_identity,top_schema,all_children_table)
          VALUES (default,ddl_text, TG_TAG,current_database(),current_schema,record_object.object_type, record_object.objid, top_objid ,record_object.object_identity,top_object_identity,top_schema,all_children_table);
          END LOOP;
          EXCEPTION 
          WHEN OTHERS THEN
              RAISE LOG 'OMS DDL トリガーがコマンド処理中にエラーを発生させました: %', SQLERRM;
          END
          $BODY$;
    • 削除イベントトリガー関数

          CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_dropped()
          RETURNS event_trigger
          LANGUAGE 'plpgsql'
          COST 100
          VOLATILE NOT LEAKPROOF SECURITY DEFINER
          AS $BODY$
          DECLARE ddl_text text;
          DECLARE record_object record;
          BEGIN
          SELECT current_query() INTO ddl_text;
          FOR record_object in (select * from pg_event_trigger_dropped_objects() WHERE TG_TAG NOT IN ('ALTER TABLE','DROP PUBLICATION','ALTER PUBLICATION')) LOOP
          IF record_object.object_type = 'type' THEN
          ELSE
          INSERT INTO public.oms_dropped_ddl_command(id,ddl_text,tag,database,schema, objid, object_type, object_name)
          VALUES (default,ddl_text, TG_TAG,current_database(),current_schema, record_object.objid, record_object.object_type, record_object.object_name);
          END IF;
          END LOOP;
          EXCEPTION
          WHEN OTHERS THEN
          RAISE LOG 'OMS 削除 DDL トリガーがコマンド処理中にエラーを発生させました: %', SQLERRM;
          END
          $BODY$;
  4. 作成した関数の所有者を、データ転送サービスをソースデータベースに接続するためのアカウントに変更します。アカウント名が postgres であるとします。

    ALTER FUNCTION public.oms_capture_ddl_for_non_dropped()
        OWNER TO postgres;
        ALTER FUNCTION public.oms_capture_ddl_for_dropped()
        OWNER TO postgres;
  5. 次のステートメントを実行して、グローバルイベントトリガーを作成します。

        CREATE EVENT TRIGGER oms_capture_ddl_for_non_dropped
        ON ddl_command_end
        WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA')
        EXECUTE PROCEDURE public.oms_capture_ddl_for_non_dropped();
    
        CREATE EVENT TRIGGER oms_capture_ddl_for_dropped ON sql_drop EXECUTE PROCEDURE public.oms_capture_ddl_for_dropped();

次のステップ

データ移行タスクをリリースした後、ソースデータベースにログインし、次のステートメントを実行して、トリガー関数と対応するテーブルを削除します。

drop EVENT trigger oms_capture_ddl_for_dropped;
drop EVENT trigger oms_capture_ddl_for_non_dropped;
drop function public.oms_capture_ddl_for_non_dropped();
drop function public.oms_capture_ddl_for_dropped();
drop table public.oms_non_dropped_ddl_command;
drop table public.oms_dropped_ddl_command;