全部产品
Search
文档中心

Data Transmission Service:Gunakan trigger dan fungsi untuk mengimplementasikan migrasi DDL inkremental untuk database PostgreSQL

更新时间:Jul 02, 2025

Sebelum menggunakan Data Transmission Service (DTS) untuk memigrasikan data antara database PostgreSQL, buat fungsi dan trigger di database sumber. Fungsi dan trigger ini mengambil informasi bahasa definisi data (DDL) dari database sumber. Selama migrasi data inkremental, DTS memigrasikan operasi DDL ke database tujuan.

Prasyarat

  • Database sumber harus memenuhi persyaratan berikut:
    • Jika database sumber adalah database PostgreSQL yang dikelola sendiri, versi database harus V9.4 atau lebih baru.
    • Jika database sumber adalah instance ApsaraDB RDS for PostgreSQL, versi instance ApsaraDB RDS for PostgreSQL harus V10 atau lebih baru.
      • ApsaraDB RDS for PostgreSQL V9.4 tidak mendukung event trigger.
      • Versi kernel ApsaraDB RDS for PostgreSQL V10, V11, dan V12 harus 20201130 atau lebih baru.
      • Versi kernel ApsaraDB RDS for PostgreSQL V13 harus 20210228 atau lebih baru.
      Catatan Untuk informasi lebih lanjut tentang cara memperbarui versi mesin minor ApsaraDB RDS for PostgreSQL, lihat Perbarui versi mesin minor.
  • Tugas migrasi data dibuat setelah 1 Oktober 2020.

Informasi latar belakang

Saat menggunakan DTS untuk memigrasikan data antara database PostgreSQL, DTS hanya mensinkronkan operasi bahasa manipulasi data (DML) selama migrasi data inkremental. Operasi DML mencakup INSERT, DELETE, dan UPDATE. DTS tidak mensinkronkan operasi DDL selama migrasi data inkremental.

Untuk menyinkronkan operasi DDL, Anda dapat membuat trigger dan fungsi untuk mengambil informasi DDL dari database sumber. Selama migrasi data inkremental, DTS memigrasikan operasi DDL ke database tujuan.

Catatan Hanya operasi DDL berikut yang dapat disinkronkan: CREATE TABLE, DROP TABLE, dan ALTER TABLE. Operasi ALTER TABLE mencakup RENAME TABLE, ADD COLUMN, dan DROP COLUMN.

Prosedur

Peringatan Jika Anda perlu memigrasikan data inkremental dari beberapa database, ulangi Langkah 2 hingga 5 untuk setiap database.
  1. Masuk ke database PostgreSQL sumber. Untuk informasi lebih lanjut, lihat Hubungkan ke Instance ApsaraDB RDS for PostgreSQL atau psql.
  2. Beralih ke database sumber.
    Catatan Alat psql digunakan dalam contoh ini. Anda dapat menjalankan perintah \c <Nama Database> untuk beralih ke database sumber, misalnya, \c dtststdata.
  3. Jalankan pernyataan berikut untuk membuat tabel yang menyimpan informasi DDL:
    CREATE TABLE public.dts_ddl_command
    (
        ddl_text text COLLATE pg_catalog."default",
       id bigserial primary key,
       event text COLLATE pg_catalog."default",
       tag text COLLATE pg_catalog."default",
       username character varying COLLATE pg_catalog."default",
       database character varying COLLATE pg_catalog."default",
       schema character varying COLLATE pg_catalog."default",
       object_type character varying COLLATE pg_catalog."default",
       object_name character varying COLLATE pg_catalog."default",
       client_address character varying COLLATE pg_catalog."default",
       client_port integer,
       event_time timestamp with time zone,
       txid_current character varying(128) COLLATE pg_catalog."default",
       message text COLLATE pg_catalog."default"
    );
  4. Jalankan pernyataan berikut untuk membuat fungsi yang mengambil informasi DDL:
    CREATE FUNCTION public.dts_capture_ddl()
        RETURNS event_trigger
        LANGUAGE 'plpgsql'
        COST 100
        VOLATILE NOT LEAKPROOF SECURITY DEFINER
    AS $BODY$
      declare ddl_text text;
      declare max_rows int := 10000;
      declare current_rows int;
      declare pg_version_95 int := 90500;
      declare pg_version_10 int := 100000;
      declare current_version int;
      declare object_id varchar;
      declare alter_table varchar;
      declare record_object record;
      declare message text;
      declare pub RECORD;
    begin
    
      select current_query() into ddl_text;
    
      if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL;
        show server_version_num into current_version;
        if current_version >= pg_version_95 then
          for record_object in (select * from pg_event_trigger_ddl_commands()) loop
            if record_object.command_tag = 'CREATE TABLE' then
              object_id := record_object.object_identity;
            end if;
          end loop;
        else
          select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id;
        end if;
        if object_id = '' or object_id is null then
          message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query();
        else
          alter_table := 'ALTER TABLE ' || object_id || ' REPLICA IDENTITY FULL';
          message := 'alter_sql=' || alter_table;
          execute alter_table;
        end if;
        if current_version >= pg_version_10 then
          for pub in (select * from pg_publication where pubname like 'dts_sync_%') loop
            raise notice 'pubname=%',pub.pubname;
            BEGIN
              execute 'alter publication ' || pub.pubname || ' add table ' || object_id;
            EXCEPTION WHEN OTHERS THEN
            END;
          end loop;
        end if;
      end if;
    
      insert into public.dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message)
      values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message);
    
      select count(id) into current_rows from public.dts_ddl_command;
      if current_rows > max_rows then
        delete from public.dts_ddl_command where id in (select min(id) from public.dts_ddl_command);
      end if;
    end
    $BODY$;
  5. Ubah pemilik fungsi menjadi akun yang digunakan untuk terhubung ke database sumber, misalnya postgresql.
    ALTER FUNCTION public.dts_capture_ddl()
        OWNER TO postgres;
  6. Jalankan pernyataan berikut untuk membuat global event trigger:
    CREATE EVENT TRIGGER dts_intercept_ddl ON ddl_command_end
    EXECUTE PROCEDURE public.dts_capture_ddl();

Apa yang harus dilakukan selanjutnya

Konfigurasikan tugas migrasi data. Untuk informasi lebih lanjut, lihat topik-topik berikut:
Catatan Setelah tugas migrasi data dilepaskan, masuk ke database PostgreSQL sumber dan jalankan pernyataan berikut untuk menghapus trigger dan fungsi.
drop EVENT trigger dts_intercept_ddl;
drop function public.dts_capture_ddl();
drop table public.dts_ddl_command;