All Products
Search
Document Center

AnalyticDB:Tulis\ data\ ke\ AnalyticDB\ for\ PostgreSQL\ menggunakan\ Realtime\ Compute\ for\ Apache\ Flink

Last Updated:Mar 29, 2026

Topik ini menjelaskan cara mengonfigurasi pekerjaan Flink untuk mengalirkan data ke instans AnalyticDB for PostgreSQL menggunakan konektor AnalyticDB for PostgreSQL.

Batasan

  • Realtime Compute for Apache Flink tidak dapat membaca data dari AnalyticDB for PostgreSQL dalam mode serverless.

  • Hanya Ververica Runtime (VVR) 6.0.0 atau versi yang lebih baru yang mendukung konektor AnalyticDB for PostgreSQL.

  • Hanya VVR 8.0.1 atau versi yang lebih baru yang mendukung AnalyticDB for PostgreSQL V7.0.

Catatan Jika Anda menggunakan konektor kustom, ikuti petunjuk di Manage custom connectors.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Ruang kerja Flink yang sepenuhnya dikelola. Lihat Aktifkan Flink yang sepenuhnya dikelola.

  • Instans AnalyticDB for PostgreSQL. Lihat Buat instans.

  • Instans AnalyticDB for PostgreSQL dan ruang kerja Flink yang sepenuhnya dikelola berada dalam virtual private cloud (VPC) yang sama.

Konfigurasi instans AnalyticDB for PostgreSQL

  1. Masuk ke Konsol AnalyticDB for PostgreSQL.

  2. Tambahkan Blok CIDR dari ruang kerja Flink yang sepenuhnya dikelola ke Daftar putih alamat IP instans tersebut. Lihat Konfigurasi Daftar putih alamat IP.

  3. Klik Log On to Database. Lihat Koneksi klien untuk opsi koneksi.

  4. Buat tabel tujuan:

    CREATE TABLE test_adbpg_table(
      b1 int,
      b2 int,
      b3 text,
      PRIMARY KEY(b1)
    );

Siapkan konektor Flink

Konektor AnalyticDB for PostgreSQL didistribusikan sebagai file JAR yang dihosting di GitHub. Unduh file tersebut dan unggah ke ruang kerja Flink Anda sebagai konektor kustom.

  1. Masuk ke Konsol Realtime Compute for Apache Flink. Pada tab Fully Managed Flink, temukan ruang kerja Anda dan klik Console di kolom Actions.

  2. Pada panel navigasi sebelah kiri, klik Connectors.

  3. Klik Create Custom Connector dan unggah file JAR.

    Catatan Dapatkan file JAR dari rilis GitHub. Versi JAR harus sesuai dengan versi mesin Flink ruang kerja Anda.
  4. Klik Next. Sistem akan mengurai konten JAR. Jika penguraian gagal, periksa apakah kode konektor sesuai dengan standar komunitas Apache Flink.

  5. Klik Finish. Konektor akan muncul dalam daftar konektor.

Buat pekerjaan Flink

  1. Masuk ke Konsol Realtime Compute for Apache Flink. Pada tab Fully Managed Flink, temukan ruang kerja Anda dan klik Console di kolom Actions.

  2. Pada panel navigasi sebelah kiri, klik SQL Editor. Di pojok kiri atas, klik New.

  3. Pada kotak dialog New Draft, pada tab SQL Scripts, klik Blank Stream Draft lalu klik Next.

  4. Konfigurasi draft dan klik Create.

    ParameterDeskripsiContoh
    NameNama unik untuk draft dalam proyek saat iniadbpg-test
    LocationFolder tempat file kode disimpanDraft
    Engine VersionVersi mesin Flink. Lihat Versi mesin untuk versi yang didukungvvr-6.0.7-flink-1.15

Tulis data ke AnalyticDB for PostgreSQL

Definisikan tabel sumber dan sink

Salin SQL berikut ke editor kode. SQL ini mendefinisikan tabel datagen_source yang menghasilkan data acak dan sink test_adbpg_table yang dipetakan ke instans AnalyticDB for PostgreSQL Anda.

-- Source: generates random data using the built-in datagen connector
CREATE TABLE datagen_source (
  f_sequence INT,
  f_random INT,
  f_random_str STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5',
  'fields.f_sequence.kind' = 'sequence',
  'fields.f_sequence.start' = '1',
  'fields.f_sequence.end' = '1000',
  'fields.f_random.min' = '1',
  'fields.f_random.max' = '1000',
  'fields.f_random_str.length' = '10'
);

-- Sink: writes data to AnalyticDB for PostgreSQL via the adbpg connector
CREATE TABLE test_adbpg_table (
  `B1` bigint,
  `B2` bigint,
  `B3` VARCHAR,
  `B4` VARCHAR,
  PRIMARY KEY(B1) NOT ENFORCED
) WITH (
  'connector' = 'adbpg-nightly-1.13',
  'url' = 'jdbc:postgresql://<internal-endpoint>:5432/<database-name>',
  'tablename' = 'test_adbpg_table',
  'username' = '<username>',
  'password' = '<password>',
  'maxretrytimes' = '2',
  'batchsize' = '50000',
  'connectionmaxactive' = '5',
  'conflictmode' = 'ignore',
  'usecopy' = '0',
  'targetschema' = 'public',
  'exceptionmode' = 'ignore',
  'casesensitive' = '0',
  'writemode' = '1',
  'retrywaittime' = '200'
);

Pertahankan parameter datagen_source seperti aslinya. Untuk test_adbpg_table, ganti nilai placeholder dan sesuaikan parameter opsional sesuai kebutuhan Anda.

Parameter konektor

Parameter wajib

ParameterDeskripsiContoh
connectorNama konektor. Format: adbpg-nightly-{version}adbpg-nightly-1.13
urlURL Java Database Connectivity (JDBC) untuk instans. Format: jdbc:postgresql://<internal-endpoint>:<port>/<database>jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres
tablenameNama tabel tujuantest_adbpg_table
usernameNama akun database
passwordKata sandi akun database

Parameter opsional

ParameterDefaultDeskripsi
conflictmodeupsertKebijakan penanganan konflik kunci primer atau indeks unik. Lihat Mode penanganan konflik.
writemode1Metode penulisan: 0 = BATCH INSERT, 1 = COPY API (tercepat), 2 = BATCH UPSERT
batchsize50000Jumlah maksimum baris per batch penulisan
batchwritetimeoutms50000Timeout penulisan batch dalam milidetik. Batch akan dikirim setelah periode ini meskipun belum penuh.
maxretrytimes3Jumlah maksimum percobaan ulang saat terjadi kegagalan penulisan
retrywaittime100Interval antar percobaan ulang dalam milidetik
exceptionmodeignoreKebijakan penanganan exception: ignore = lewati data yang ditulis saat terjadi exception; strict = picu failover dan laporkan error
targetschemapublicSkema target dalam database AnalyticDB for PostgreSQL
connectionmaxactive5Jumlah maksimum koneksi aktif per TaskManager dalam kolam koneksi
casesensitive0Sensitivitas huruf besar/kecil nama kolom dan tabel: 0 = tidak sensitif huruf besar/kecil, 1 = sensitif huruf besar/kecil
verbose0Output log konektor: 0 = dinonaktifkan, 1 = diaktifkan
Catatan Untuk daftar lengkap parameter yang didukung dan pemetaan tipe data, lihat Konektor AnalyticDB for PostgreSQL.

Mode penanganan konflik

Parameter conflictmode mengontrol apa yang terjadi ketika suatu catatan bertentangan dengan kunci primer atau indeks unik yang sudah ada. AnalyticDB for PostgreSQL menggunakan kombinasi pernyataan INSERT ON CONFLICT dan COPY ON CONFLICT untuk melakukan operasi UPSERT.

NilaiPerilaku
upsert (default)Menjalankan INSERT ON CONFLICT + COPY ON CONFLICT untuk menimpa baris yang sudah ada. Untuk tabel partisi, versi minor instans harus V6.3.6.1 atau lebih baru. Lihat Perbarui versi minor mesin.
updateMemperbarui baris yang bertentangan
ignoreMempertahankan baris yang sudah ada dan membuang catatan masuk
strictMemicu failover dan melaporkan error

Mulai penerapan

  1. Di pojok kanan atas SQL Editor, klik Deploy lalu klik OK.

    Catatan Kluster sesi hanya cocok untuk pengembangan dan pengujian. Untuk penerapan produksi, jangan men-deploy ke kluster sesi. Lihat Mencari tahu masalah pada penerapan.
  2. Pada halaman Deployments, temukan penerapan Anda dan klik Start di kolom Actions.

  3. Pada kotak dialog Start Job, klik Start.

Verifikasi hasil

  1. Sambungkan ke database AnalyticDB for PostgreSQL. Lihat Koneksi klien.

  2. Kueri tabel untuk memastikan data sedang ditulis:

    SELECT * FROM test_adbpg_table;

    Data telah ditulis ke database AnalyticDB for PostgreSQL. Gambar berikut menunjukkan hasil yang dikembalikan.

    adbpg2.png

Langkah selanjutnya

Referensi