Sebelum menggunakan Data Transmission Service (DTS) untuk memigrasikan data antara database PostgreSQL, buat fungsi dan trigger di database sumber. Fungsi dan trigger ini mengambil informasi bahasa definisi data (DDL) dari database sumber. Selama migrasi data inkremental, DTS memigrasikan operasi DDL ke database tujuan.
Prasyarat
- Database sumber harus memenuhi persyaratan berikut:
- Jika database sumber adalah database PostgreSQL yang dikelola sendiri, versi database harus V9.4 atau lebih baru.
- Jika database sumber adalah instance ApsaraDB RDS for PostgreSQL, versi instance ApsaraDB RDS for PostgreSQL harus V10 atau lebih baru.
- ApsaraDB RDS for PostgreSQL V9.4 tidak mendukung event trigger.
- Versi kernel ApsaraDB RDS for PostgreSQL V10, V11, dan V12 harus 20201130 atau lebih baru.
- Versi kernel ApsaraDB RDS for PostgreSQL V13 harus 20210228 atau lebih baru.
Catatan Untuk informasi lebih lanjut tentang cara memperbarui versi mesin minor ApsaraDB RDS for PostgreSQL, lihat Perbarui versi mesin minor.
- Tugas migrasi data dibuat setelah 1 Oktober 2020.
Informasi latar belakang
Saat menggunakan DTS untuk memigrasikan data antara database PostgreSQL, DTS hanya mensinkronkan operasi bahasa manipulasi data (DML) selama migrasi data inkremental. Operasi DML mencakup INSERT, DELETE, dan UPDATE. DTS tidak mensinkronkan operasi DDL selama migrasi data inkremental.
Untuk menyinkronkan operasi DDL, Anda dapat membuat trigger dan fungsi untuk mengambil informasi DDL dari database sumber. Selama migrasi data inkremental, DTS memigrasikan operasi DDL ke database tujuan.
Prosedur
- Masuk ke database PostgreSQL sumber. Untuk informasi lebih lanjut, lihat Hubungkan ke Instance ApsaraDB RDS for PostgreSQL atau psql.
- Beralih ke database sumber.Catatan Alat psql digunakan dalam contoh ini. Anda dapat menjalankan perintah
\c <Nama Database>untuk beralih ke database sumber, misalnya,\c dtststdata. - Jalankan pernyataan berikut untuk membuat tabel yang menyimpan informasi DDL:
CREATE TABLE public.dts_ddl_command ( ddl_text text COLLATE pg_catalog."default", id bigserial primary key, event text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", username character varying COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", object_name character varying COLLATE pg_catalog."default", client_address character varying COLLATE pg_catalog."default", client_port integer, event_time timestamp with time zone, txid_current character varying(128) COLLATE pg_catalog."default", message text COLLATE pg_catalog."default" ); - Jalankan pernyataan berikut untuk membuat fungsi yang mengambil informasi DDL:
CREATE FUNCTION public.dts_capture_ddl() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ declare ddl_text text; declare max_rows int := 10000; declare current_rows int; declare pg_version_95 int := 90500; declare pg_version_10 int := 100000; declare current_version int; declare object_id varchar; declare alter_table varchar; declare record_object record; declare message text; declare pub RECORD; begin select current_query() into ddl_text; if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL; show server_version_num into current_version; if current_version >= pg_version_95 then for record_object in (select * from pg_event_trigger_ddl_commands()) loop if record_object.command_tag = 'CREATE TABLE' then object_id := record_object.object_identity; end if; end loop; else select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id; end if; if object_id = '' or object_id is null then message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query(); else alter_table := 'ALTER TABLE ' || object_id || ' REPLICA IDENTITY FULL'; message := 'alter_sql=' || alter_table; execute alter_table; end if; if current_version >= pg_version_10 then for pub in (select * from pg_publication where pubname like 'dts_sync_%') loop raise notice 'pubname=%',pub.pubname; BEGIN execute 'alter publication ' || pub.pubname || ' add table ' || object_id; EXCEPTION WHEN OTHERS THEN END; end loop; end if; end if; insert into public.dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message) values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message); select count(id) into current_rows from public.dts_ddl_command; if current_rows > max_rows then delete from public.dts_ddl_command where id in (select min(id) from public.dts_ddl_command); end if; end $BODY$; - Ubah pemilik fungsi menjadi akun yang digunakan untuk terhubung ke database sumber, misalnya postgresql.
ALTER FUNCTION public.dts_capture_ddl() OWNER TO postgres; - Jalankan pernyataan berikut untuk membuat global event trigger:
CREATE EVENT TRIGGER dts_intercept_ddl ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
Apa yang harus dilakukan selanjutnya
Konfigurasikan tugas migrasi data. Untuk informasi lebih lanjut, lihat topik-topik berikut:- Migrasikan Data Inkremental dari Database PostgreSQL yang Dikelola Sendiri (Versi 10.0 atau Lebih Awal) ke Instance ApsaraDB RDS for PostgreSQL
- Migrasikan Data Inkremental dari Database PostgreSQL yang Dikelola Sendiri (Versi 10.1 hingga 13.0) ke Instance ApsaraDB RDS for PostgreSQL
drop EVENT trigger dts_intercept_ddl;
drop function public.dts_capture_ddl();
drop table public.dts_ddl_command;