AnalyticDB for PostgreSQL menyediakan konektor Change Data Capture (CDC) bawaan yang berlangganan data penuh dan inkremental berdasarkan fitur replikasi logis PostgreSQL. Konektor ini terintegrasi mulus dengan Flink dan secara efisien menangkap perubahan data real-time dari tabel sumber untuk sinkronisasi data real-time dan pemrosesan aliran, sehingga membantu perusahaan merespons kebutuhan data dinamis dengan cepat. Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink CDC untuk berlangganan data penuh dan inkremental dari AnalyticDB for PostgreSQL secara real time.
Batasan
Fitur ini hanya tersedia untuk instans AnalyticDB for PostgreSQL V7.0 yang menjalankan versi mesin minor 7.2.1.4 atau lebih baru.
CatatanAnda dapat melihat versi minor di halaman Informasi Dasar suatu instans di konsol AnalyticDB for PostgreSQL. Jika instans Anda tidak memenuhi versi yang dipersyaratkan, perbarui versi minor instans tersebut.
Mode serverless AnalyticDB for PostgreSQL tidak didukung.
Prasyarat
Instans AnalyticDB for PostgreSQL dan ruang kerja Flink yang sepenuhnya dikelola harus berada dalam VPC yang sama.
Anda harus menyesuaikan pengaturan parameter untuk instans AnalyticDB for PostgreSQL:
Aktifkan replikasi logis dengan mengatur parameter
wal_levelke logical.Jika Anda menggunakan instans Edisi Ketersediaan Tinggi AnalyticDB for PostgreSQL, Anda harus mengatur parameter
hot_standby,hot_standby_feedback, dansync_replication_slotske on. Hal ini memastikan bahwa langganan logis tidak terganggu oleh failover primer/sekunder.
Anda harus menggunakan akun awal atau pengguna dengan hak istimewa yang memiliki izin RDS_SUPERUSER untuk instans AnalyticDB for PostgreSQL. Pengguna tersebut harus diberikan hak istimewa REPLICATION.
ALTER USER <username> WITH REPLICATION;.Blok CIDR ruang kerja Flink harus ditambahkan ke daftar putih instans AnalyticDB for PostgreSQL.
Anda harus mengunduh flink-sql-connector-adbpg-cdc-3.3.jar dan mengunggah konektor CDC ke ruang kerja Flink Anda.
Prosedur
Langkah 1: Siapkan tabel uji dan data uji
Masuk ke konsol AnalyticDB for PostgreSQL. Temukan instans yang ingin Anda kelola dan klik ID instans tersebut.
Di pojok kanan bawah halaman Basic Information, klik Log On To Database.
Buat database uji dan tabel sumber bernama adbpg_source_table. Lalu, masukkan 50 baris data ke dalam tabel sumber.
-- Buat database uji. CREATE DATABASE testdb; -- Beralih ke database testdb dan buat skema. CREATE SCHEMA testschema; -- Buat tabel sumber bernama adbpg_source_table. CREATE TABLE testschema.adbpg_source_table( id int, username text, PRIMARY KEY(id) ); -- Masukkan 50 baris data ke dalam tabel adbpg_source_table. INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);Buat tabel sink bernama adbpg_sink_table agar Flink dapat menulis data hasilnya.
CREATE TABLE testschema.adbpg_sink_table( id int, username text, score int );
Langkah 2: Buat pekerjaan Flink
Masuk ke konsol Realtime Compute for Apache Flink. Di tab Flink yang Sepenuhnya Dikelola, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi sebelah kiri, pilih .
Di bilah menu atas, klik New. Pilih Blank Stream Draft, lalu klik Next. Di kotak dialog New File Draft, konfigurasikan parameter pekerjaan.
Parameter
Deskripsi
Contoh
Name
Nama draf yang ingin Anda buat.
CatatanNama draf harus unik dalam proyek saat ini.
adbpg-test
Location
Folder tempat file kode draf disimpan.
Anda juga dapat mengklik ikon
di sebelah kanan folder yang sudah ada untuk membuat subfolder. Draf
Engine Version
Versi mesin Flink yang digunakan oleh draf. Untuk informasi selengkapnya tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi mesin.
vvr-6.0.7-flink-1.15
Klik Create.
Langkah 3: Tulis kode pekerjaan dan deploy pekerjaan
Buat sumber bernama datagen_source untuk menghasilkan data analog dan sumber bernama source_adbpg untuk menangkap perubahan data real-time dari database AnalyticDB for PostgreSQL. Kemudian, gabungkan kedua sumber tersebut dan tulis hasilnya ke tabel sink bernama sink_adbpg. Data yang diproses ditulis ke AnalyticDB for PostgreSQL.
Salin kode pekerjaan berikut ke editor.
---Buat tabel sumber Datagen untuk menghasilkan data streaming menggunakan konektor Datagen. CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --Buat tabel sumber adbpg untuk menangkap perubahan data tabel adbpg_source_table berdasarkan slot.name dan pgoutput menggunakan konektor adbpg-cdc. CREATE TEMPORARY TABLE source_adbpg( id int, username varchar, PRIMARY KEY(id) NOT ENFORCED ) WITH( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'testschema', 'table-name' = 'adbpg_source_table', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput' ); --Buat tabel sink adbpg untuk menulis hasil pemrosesan ke tabel tujuan adbpg_sink_table di database. CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb', 'tablename' = 'testschema.adbpg_sink_table', 'username' = 'account****', 'password' = 'password****', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- Tulis hasil penggabungan tabel datagen_source dan source_adbpg ke tabel sink adbpg. INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN source_adbpg AS ts ON ds.id = ts.id;Parameter
Parameter
Wajib
Tipe Data
Deskripsi
connector
Ya
STRING
Jenis konektor. Tetapkan nilai ke
adbpg-cdcuntuk tabel sumber danadbpguntuk tabel sink.hostname
Ya
STRING
Titik akhir internal instans AnalyticDB for PostgreSQL. Anda dapat memperoleh titik akhir internal di halaman Basic Information instans.
username
Ya
STRING
Akun database dan kata sandi instans AnalyticDB for PostgreSQL.
password
Ya
STRING
database-name
Ya
STRING
Nama database.
schema-name
Ya
STRING
Nama skema. Parameter ini mendukung ekspresi reguler. Anda dapat berlangganan beberapa skema sekaligus.
table-name
Ya
STRING
Nama tabel. Parameter ini mendukung ekspresi reguler. Anda dapat berlangganan beberapa tabel sekaligus.
port
Ya
INTEGER
Port AnalyticDB for PostgreSQL. Nilainya tetap 5432.
decoding.plugin.name
Ya
STRING
Nama plug-in decoding logis PostgreSQL. Nilainya tetap pgoutput.
slot.name
Ya
STRING
Nama slot decoding logis.
Untuk tabel sumber dalam pekerjaan Flink yang sama, gunakan nilai yang sama untuk
slot.name.Jika pekerjaan Flink yang berbeda melibatkan tabel yang sama, tetapkan
slot.nameunik untuk setiap pekerjaan. Hal ini mencegah terjadinya kesalahan berikut:PSQLException: ERROR: replication slot "debezium" is active for PID 974.
debezium.*
Tidak
STRING
Mengontrol perilaku klien Debezium dengan granularitas lebih halus. Misalnya, menetapkan
'debezium.snapshot.mode' = 'never'menonaktifkan fitur snapshot. Untuk informasi selengkapnya, lihat properti konfigurasi.scan.incremental.snapshot.enabled
Tidak
BOOLEAN
Menentukan apakah akan mengaktifkan snapshot inkremental. Nilai yang valid:
false (default): Snapshot inkremental dinonaktifkan.
true: Snapshot inkremental diaktifkan.
scan.startup.mode
Tidak
STRING
Mode startup untuk konsumsi data. Nilai yang valid:
initial (default): Saat pekerjaan pertama kali dimulai, ia memindai semua data historis lalu membaca data write-ahead logging (WAL) terbaru. Hal ini memberikan transisi mulus antara data penuh dan inkremental.
latest-offset: Saat pekerjaan pertama kali dimulai, ia tidak memindai data historis. Ia mulai membaca dari akhir WAL, yaitu posisi log terbaru. Ia hanya menangkap perubahan data yang terjadi setelah konektor dimulai.
snapshot: Memindai semua data historis dan membaca entri WAL baru yang dihasilkan selama pemindaian penuh. Pekerjaan berhenti setelah pemindaian penuh selesai.
changelog-mode
Tidak
STRING
Mode changelog untuk mengenkripsi perubahan aliran. Nilai yang valid:
ALL (default): Mendukung semua jenis operasi, termasuk
INSERT,DELETE,UPDATE_BEFORE, danUPDATE_AFTER.UPSERT: Hanya mendukung operasi
UPSERT, termasukINSERT,DELETE, danUPDATE_AFTER.
heartbeat.interval.ms
Tidak
DURATION
Interval pengiriman paket heartbeat. Nilai default adalah 30 detik. Satuan dalam milidetik.
Konektor CDC AnalyticDB for PostgreSQL mengirim paket heartbeat ke database untuk memastikan offset slot terus maju. Jika data tabel tidak sering berubah, atur parameter ini ke nilai yang wajar agar log WAL dapat dibersihkan tepat waktu dan menghindari pemborosan ruang disk.
scan.incremental.snapshot.chunk.key-column
Tidak
STRING
Menentukan kolom yang digunakan untuk chunking selama fase snapshot. Secara default, kolom pertama dari kunci primer dipilih.
url
Ya
STRING
Formatnya adalah
jdbc:postgresql://<Address>:<PortId>/<DatabaseName>.Di pojok kanan atas halaman Editor SQL, klik Validate untuk melakukan pemeriksaan sintaks.
Klik Deploy, lalu klik OK.
Di pojok kanan atas, klik Go To O&M. Di halaman Job O&M, klik Start.
Langkah 4: Lihat data yang ditulis oleh Flink
Jalankan pernyataan berikut di database uji untuk melihat data yang ditulis oleh Flink.
SELECT * FROM testschema.adbpg_sink_table; SELECT COUNT(*) FROM testschema.adbpg_sink_table;Masukkan 50 baris data tambahan ke dalam tabel sumber. Lalu, periksa jumlah total baris data inkremental yang ditulis Flink ke tabel sink.
-- Masukkan 50 baris data inkremental ke dalam tabel sumber. INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(51, 100) AS t(i); -- Periksa data baru di tabel tujuan. SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;Hasilnya ditampilkan di bawah ini.
count ------- 50 (1 row)
Catatan penggunaan
Kelola Slot Replikasi secara tepat waktu untuk menghindari pemborosan ruang disk.
Untuk mencegah kehilangan data akibat pembersihan log WAL yang sesuai dengan checkpoint selama restart pekerjaan Flink, Flink tidak secara otomatis menghapus Slot Replikasi. Oleh karena itu, jika Anda memastikan bahwa pekerjaan Flink tidak perlu dijalankan ulang lagi, Anda harus menghapus Slot Replikasi yang sesuai secara manual untuk melepaskan sumber daya yang digunakannya. Selain itu, jika posisi yang dikonfirmasi dari Slot Replikasi tidak maju dalam waktu lama, AnalyticDB for PostgreSQL tidak dapat membersihkan entri WAL setelah posisi tersebut. Hal ini dapat menyebabkan akumulasi data WAL yang tidak digunakan dan menghabiskan banyak ruang disk.
Selama operasi normal instans AnalyticDB for PostgreSQL, semantik pemrosesan data exactly-once dijamin. Namun, dalam skenario kegagalan, hanya semantik at-least-once yang didukung.
Konektor CDC mengubah parameter REPLICA IDENTITY dari tabel yang dilanggan menjadi
FULLuntuk memastikan konsistensi sinkronisasi data. Perubahan ini memiliki efek berikut:Peningkatan penggunaan ruang disk. Dalam skenario dengan operasi pembaruan atau penghapusan yang sering, pengaturan ini meningkatkan ukuran log WAL, sehingga meningkatkan penggunaan ruang disk.
Penurunan kinerja penulisan. Dalam skenario dengan penulisan konkurensi tinggi, kinerja dapat terpengaruh secara signifikan.
Peningkatan tekanan checkpoint. Log WAL yang lebih besar berarti checkpoint perlu memproses lebih banyak data, yang dapat memperpanjang waktu yang dibutuhkan untuk checkpoint.
Praktik terbaik
Flink CDC mendukung pengembangan pekerjaan menggunakan API Flink SQL atau API DataStream. Anda dapat menggunakan Flink CDC untuk mengimplementasikan sinkronisasi data penuh dan inkremental terpadu untuk satu atau beberapa tabel dalam database sumber. Anda juga dapat melakukan komputasi seperti penggabungan tabel pada sumber data yang berbeda. Kerangka kerja Flink memastikan semantik pemrosesan event exactly-once sepanjang prosedur pemrosesan data. Namun, Flink CDC tidak cocok untuk menyinkronkan seluruh database yang kompatibel dengan PostgreSQL karena tidak mendukung sinkronisasi DDL, dan Anda harus mendefinisikan struktur setiap tabel dalam Flink SQL, yang membuat pemeliharaan menjadi kompleks.
Bagian ini menggunakan contoh penyinkronan data dari AnalyticDB for PostgreSQL ke Kafka untuk menjelaskan praktik terbaik pengembangan pekerjaan SQL Flink CDC. Sebelum mengembangkan pekerjaan Flink CDC, pastikan Anda telah menyiapkan dan mengonfigurasi sumber daya seperti yang dijelaskan di bagian Prasyarat.
Langkah 1: Siapkan tabel uji
Buat dua tabel sumber di instans AnalyticDB for PostgreSQL.
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(200) NOT NULL,
sku CHAR(12) NOT NULL,
description TEXT,
price NUMERIC(10,2) NOT NULL,
discount_price DECIMAL(10,2),
stock_quantity INTEGER DEFAULT 0,
weight REAL,
volume DOUBLE PRECISION,
dimensions BOX,
release_date DATE,
is_featured BOOLEAN DEFAULT FALSE,
rating FLOAT,
warranty_period INTERVAL,
metadata JSON,
tags TEXT[]
);
CREATE TABLE documents (
document_id UUID PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT,
summary TEXT,
publication_date TIMESTAMP WITHOUT TIME ZONE,
last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
author_id BIGINT,
file_data BYTEA,
xml_content XML,
json_metadata JSON,
reading_time INTERVAL,
is_public BOOLEAN DEFAULT TRUE,
views_count INTEGER DEFAULT 0,
category VARCHAR(50),
tags TEXT[]
);Langkah 2: Siapkan sumber daya Kafka
Tambahkan blok CIDR ruang kerja Flink ke daftar putih instans Kafka.
Langkah 3: Buat pekerjaan Flink
Masuk ke konsol Realtime Compute for Apache Flink. Di tab Flink yang Sepenuhnya Dikelola, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi sebelah kiri, pilih .
Di bilah menu atas, klik New. Pilih Blank Stream Draft, lalu klik Next. Di kotak dialog New File Draft, konfigurasikan parameter pekerjaan.
Parameter
Deskripsi
Contoh
Name
Nama draf yang ingin Anda buat.
CatatanNama draf harus unik dalam proyek saat ini.
adbpg-test
Location
Folder tempat file kode draf disimpan.
Anda juga dapat mengklik ikon
di sebelah kanan folder yang sudah ada untuk membuat subfolder. Draf
Engine Version
Versi mesin Flink yang digunakan oleh draf. Untuk informasi selengkapnya tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi mesin.
vvr-6.0.7-flink-1.15
Klik Create.
Langkah 4: Tulis kode pekerjaan dan deploy pekerjaan
Tulis pekerjaan SQL di ruang kerja Flink. Salin kode pekerjaan berikut ke editor dan ganti konfigurasi dengan nilai aktual Anda.
-- Gunakan satu sumber untuk menangkap data dari beberapa tabel CREATE TEMPORARY TABLE ADBPGSource( table_name STRING METADATA FROM 'table_name' VIRTUAL, row_kind STRING METADATA FROM 'row_kind' VIRTUAL, product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING ) WITH ( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'public', 'table-name' = '(products|documents)', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput', 'debezium.snapshot.mode' = 'never' ); CREATE TEMPORARY TABLE KafkaProducts ( product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, PRIMARY KEY(product_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); CREATE TEMPORARY TABLE KafkaDocuments ( document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING, tags STRING, PRIMARY KEY(document_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); -- Gunakan STATEMENT SET untuk membungkus beberapa pernyataan BEGIN STATEMENT SET; -- Gunakan METADATA table_name untuk mengarahkan data ke tabel tujuan INSERT INTO KafkaProducts SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags FROM ADBPGSource WHERE table_name = 'products'; INSERT INTO KafkaDocuments SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags FROM ADBPGSource WHERE table_name = 'documents'; END;Perhatikan poin-poin berikut mengenai pekerjaan SQL ini:
Untuk tugas sinkronisasi multi-tabel, kami merekomendasikan agar Anda menggunakan satu tabel sumber untuk menangkap data dari beberapa tabel, seperti yang ditunjukkan dalam contoh SQL ini. Anda harus mendefinisikan semua kolom dari semua tabel sumber dalam tabel sumber ini. Jika nama kolom duplikat, cukup simpan salah satunya. Saat menulis ke tabel tujuan, gunakan
METADATAtable_name untuk mengarahkan data ke tabel tertentu. Pendekatan ini hanya memerlukan pembuatan satu Slot Replikasi di AnalyticDB for PostgreSQL. Hal ini mengurangi penggunaan sumber daya database sumber, meningkatkan kinerja sinkronisasi, dan menyederhanakan pemeliharaan di masa depan.Gunakan parameter
table-nameuntuk menentukan beberapa tabel sumber. Sertakan nama tabel dalam tanda kurung dan pisahkan dengan garis vertikal (|), misalnya,(table1|table2|table3).Menetapkan
debezium.snapshot.modekeneverberarti hanya data inkremental dari tabel sumber yang disinkronkan. Untuk menyinkronkan data penuh dan inkremental, ubah pengaturannya menjadiinitial.
Di pojok kanan atas halaman Editor SQL, klik Validate untuk melakukan pemeriksaan sintaks.
Klik Deploy, lalu klik OK.
Di pojok kanan atas, klik Go To O&M. Di halaman Job O&M, klik Start.
Langkah 5: Masukkan data uji
Di instans AnalyticDB for PostgreSQL, perbarui data di kedua tabel sumber dan amati perubahan pesan di topik Kafka.
Anda dapat menggunakan pernyataan SQL berikut untuk memasukkan data uji:
INSERT INTO products (
product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
'Test Product', 'Test-2025', 'Sebuah data produk uji', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"Test1", "Test2"}'
);Referensi
Untuk informasi selengkapnya tentang berlangganan data penuh AnalyticDB for PostgreSQL, lihat Baca dan tulis data penuh secara real time menggunakan Flink.