Topik ini menjelaskan cara mengonfigurasi pekerjaan Flink untuk mengalirkan data ke instans AnalyticDB for PostgreSQL menggunakan konektor AnalyticDB for PostgreSQL.
Batasan
Realtime Compute for Apache Flink tidak dapat membaca data dari AnalyticDB for PostgreSQL dalam mode serverless.
Hanya Ververica Runtime (VVR) 6.0.0 atau versi yang lebih baru yang mendukung konektor AnalyticDB for PostgreSQL.
Hanya VVR 8.0.1 atau versi yang lebih baru yang mendukung AnalyticDB for PostgreSQL V7.0.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Ruang kerja Flink yang sepenuhnya dikelola. Lihat Aktifkan Flink yang sepenuhnya dikelola.
Instans AnalyticDB for PostgreSQL. Lihat Buat instans.
Instans AnalyticDB for PostgreSQL dan ruang kerja Flink yang sepenuhnya dikelola berada dalam virtual private cloud (VPC) yang sama.
Konfigurasi instans AnalyticDB for PostgreSQL
Masuk ke Konsol AnalyticDB for PostgreSQL.
Tambahkan Blok CIDR dari ruang kerja Flink yang sepenuhnya dikelola ke Daftar putih alamat IP instans tersebut. Lihat Konfigurasi Daftar putih alamat IP.
Klik Log On to Database. Lihat Koneksi klien untuk opsi koneksi.
Buat tabel tujuan:
CREATE TABLE test_adbpg_table( b1 int, b2 int, b3 text, PRIMARY KEY(b1) );
Siapkan konektor Flink
Konektor AnalyticDB for PostgreSQL didistribusikan sebagai file JAR yang dihosting di GitHub. Unduh file tersebut dan unggah ke ruang kerja Flink Anda sebagai konektor kustom.
Masuk ke Konsol Realtime Compute for Apache Flink. Pada tab Fully Managed Flink, temukan ruang kerja Anda dan klik Console di kolom Actions.
Pada panel navigasi sebelah kiri, klik Connectors.
Klik Create Custom Connector dan unggah file JAR.
Catatan Dapatkan file JAR dari rilis GitHub. Versi JAR harus sesuai dengan versi mesin Flink ruang kerja Anda.Klik Next. Sistem akan mengurai konten JAR. Jika penguraian gagal, periksa apakah kode konektor sesuai dengan standar komunitas Apache Flink.
Klik Finish. Konektor akan muncul dalam daftar konektor.
Buat pekerjaan Flink
Masuk ke Konsol Realtime Compute for Apache Flink. Pada tab Fully Managed Flink, temukan ruang kerja Anda dan klik Console di kolom Actions.
Pada panel navigasi sebelah kiri, klik SQL Editor. Di pojok kiri atas, klik New.
Pada kotak dialog New Draft, pada tab SQL Scripts, klik Blank Stream Draft lalu klik Next.
Konfigurasi draft dan klik Create.
Parameter Deskripsi Contoh Name Nama unik untuk draft dalam proyek saat ini adbpg-test Location Folder tempat file kode disimpan Draft Engine Version Versi mesin Flink. Lihat Versi mesin untuk versi yang didukung vvr-6.0.7-flink-1.15
Tulis data ke AnalyticDB for PostgreSQL
Definisikan tabel sumber dan sink
Salin SQL berikut ke editor kode. SQL ini mendefinisikan tabel datagen_source yang menghasilkan data acak dan sink test_adbpg_table yang dipetakan ke instans AnalyticDB for PostgreSQL Anda.
-- Source: generates random data using the built-in datagen connector
CREATE TABLE datagen_source (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5',
'fields.f_sequence.kind' = 'sequence',
'fields.f_sequence.start' = '1',
'fields.f_sequence.end' = '1000',
'fields.f_random.min' = '1',
'fields.f_random.max' = '1000',
'fields.f_random_str.length' = '10'
);
-- Sink: writes data to AnalyticDB for PostgreSQL via the adbpg connector
CREATE TABLE test_adbpg_table (
`B1` bigint,
`B2` bigint,
`B3` VARCHAR,
`B4` VARCHAR,
PRIMARY KEY(B1) NOT ENFORCED
) WITH (
'connector' = 'adbpg-nightly-1.13',
'url' = 'jdbc:postgresql://<internal-endpoint>:5432/<database-name>',
'tablename' = 'test_adbpg_table',
'username' = '<username>',
'password' = '<password>',
'maxretrytimes' = '2',
'batchsize' = '50000',
'connectionmaxactive' = '5',
'conflictmode' = 'ignore',
'usecopy' = '0',
'targetschema' = 'public',
'exceptionmode' = 'ignore',
'casesensitive' = '0',
'writemode' = '1',
'retrywaittime' = '200'
);Pertahankan parameter datagen_source seperti aslinya. Untuk test_adbpg_table, ganti nilai placeholder dan sesuaikan parameter opsional sesuai kebutuhan Anda.
Parameter konektor
Parameter wajib
| Parameter | Deskripsi | Contoh |
|---|---|---|
connector | Nama konektor. Format: adbpg-nightly-{version} | adbpg-nightly-1.13 |
url | URL Java Database Connectivity (JDBC) untuk instans. Format: jdbc:postgresql://<internal-endpoint>:<port>/<database> | jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres |
tablename | Nama tabel tujuan | test_adbpg_table |
username | Nama akun database | — |
password | Kata sandi akun database | — |
Parameter opsional
| Parameter | Default | Deskripsi |
|---|---|---|
conflictmode | upsert | Kebijakan penanganan konflik kunci primer atau indeks unik. Lihat Mode penanganan konflik. |
writemode | 1 | Metode penulisan: 0 = BATCH INSERT, 1 = COPY API (tercepat), 2 = BATCH UPSERT |
batchsize | 50000 | Jumlah maksimum baris per batch penulisan |
batchwritetimeoutms | 50000 | Timeout penulisan batch dalam milidetik. Batch akan dikirim setelah periode ini meskipun belum penuh. |
maxretrytimes | 3 | Jumlah maksimum percobaan ulang saat terjadi kegagalan penulisan |
retrywaittime | 100 | Interval antar percobaan ulang dalam milidetik |
exceptionmode | ignore | Kebijakan penanganan exception: ignore = lewati data yang ditulis saat terjadi exception; strict = picu failover dan laporkan error |
targetschema | public | Skema target dalam database AnalyticDB for PostgreSQL |
connectionmaxactive | 5 | Jumlah maksimum koneksi aktif per TaskManager dalam kolam koneksi |
casesensitive | 0 | Sensitivitas huruf besar/kecil nama kolom dan tabel: 0 = tidak sensitif huruf besar/kecil, 1 = sensitif huruf besar/kecil |
verbose | 0 | Output log konektor: 0 = dinonaktifkan, 1 = diaktifkan |
Mode penanganan konflik
Parameter conflictmode mengontrol apa yang terjadi ketika suatu catatan bertentangan dengan kunci primer atau indeks unik yang sudah ada. AnalyticDB for PostgreSQL menggunakan kombinasi pernyataan INSERT ON CONFLICT dan COPY ON CONFLICT untuk melakukan operasi UPSERT.
| Nilai | Perilaku |
|---|---|
upsert (default) | Menjalankan INSERT ON CONFLICT + COPY ON CONFLICT untuk menimpa baris yang sudah ada. Untuk tabel partisi, versi minor instans harus V6.3.6.1 atau lebih baru. Lihat Perbarui versi minor mesin. |
update | Memperbarui baris yang bertentangan |
ignore | Mempertahankan baris yang sudah ada dan membuang catatan masuk |
strict | Memicu failover dan melaporkan error |
Mulai penerapan
Di pojok kanan atas SQL Editor, klik Deploy lalu klik OK.
Catatan Kluster sesi hanya cocok untuk pengembangan dan pengujian. Untuk penerapan produksi, jangan men-deploy ke kluster sesi. Lihat Mencari tahu masalah pada penerapan.Pada halaman Deployments, temukan penerapan Anda dan klik Start di kolom Actions.
Pada kotak dialog Start Job, klik Start.
Verifikasi hasil
Sambungkan ke database AnalyticDB for PostgreSQL. Lihat Koneksi klien.
Kueri tabel untuk memastikan data sedang ditulis:
SELECT * FROM test_adbpg_table;Data telah ditulis ke database AnalyticDB for PostgreSQL. Gambar berikut menunjukkan hasil yang dikembalikan.

Langkah selanjutnya
Konektor AnalyticDB for PostgreSQL — referensi lengkap konektor, termasuk semua parameter dan pemetaan tipe data
Konektor Datagen — referensi untuk sumber datagen yang digunakan dalam contoh ini