Topik ini menjelaskan cara menggunakan Konektor AnalyticDB for PostgreSQL.
Informasi latar belakang
AnalyticDB for PostgreSQL adalah gudang data untuk pemrosesan paralel masif (MPP) yang menyediakan layanan analitik online untuk sejumlah besar data.
Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor AnalyticDB for PostgreSQL.
Item | Deskripsi |
Jenis tabel | Tabel sumber (beta), tabel dimensi, dan tabel sink Catatan Saat ini, untuk membaca dari sumber AnalyticDB untuk PostgreSQL, Anda perlu mengonfigurasi konektor kustom. Untuk informasi lebih lanjut, lihat Gunakan Flink CDC untuk berlangganan data penuh dan inkremental secara real-time. |
Mode operasi | Mode streaming dan batch. |
Format data | Tidak tersedia |
Metrik |
Catatan Untuk informasi lebih lanjut tentang metrik, lihat Metrics. |
Jenis API | SQL |
Pembaruan atau penghapusan data dalam tabel sink | Didukung |
Prasyarat
Instansi AnalyticDB for PostgreSQL dan tabel AnalyticDB for PostgreSQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat Instansi dan CREATE TABLE.
Daftar putih alamat IP dikonfigurasi untuk instansi AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP.
Keterbatasan
Hanya VVR 8.0.1 atau versi lebih baru yang mendukung AnalyticDB for PostgreSQL V7.0.
Database PostgreSQL yang dikelola sendiri tidak didukung.
Sintaksis
CREATE TEMPORARY TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);Opsi konektor
Umum
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
connector | Konektor yang digunakan. | STRING | Ya | Tidak ada nilai default |
|
url | URL Java Database Connectivity (JDBC) dari database. | STRING | Ya | Tidak ada nilai default | URL dalam format |
tableName | Nama tabel di dalam database. | STRING | Ya | Tidak ada nilai default | Tidak tersedia. |
userName | Nama pengguna yang digunakan untuk mengakses database AnalyticDB for PostgreSQL. | STRING | Ya | Tidak ada nilai default | Tidak tersedia. |
password | Kata sandi yang digunakan untuk mengakses database AnalyticDB for PostgreSQL. | STRING | Ya | Tidak ada nilai default | Tidak tersedia. |
maxRetryTimes | Jumlah maksimum percobaan ulang yang diizinkan untuk menulis data ke tabel jika upaya penulisan data gagal. | INTEGER | Tidak | 3 | Tidak tersedia. |
targetSchema | Nama skema. | STRING | Tidak | public | Tidak tersedia. |
caseSensitive | Menentukan apakah sensitivitas huruf besar/kecil diaktifkan. | STRING | Tidak | false | Nilai valid:
|
connectionMaxActive | Jumlah maksimum koneksi dalam kumpulan koneksi. | INTEGER | Tidak | 5 | Sistem secara otomatis melepaskan koneksi idle ke layanan database. Penting Jika opsi ini disetel ke nilai yang terlalu besar, jumlah koneksi server mungkin abnormal. |
Spesifik sumber (beta)
Opsi | Deskripsi | Tipe data | Diperlukan | Catatan |
schema-name | Nama skema. | STRING | Ya | Opsi ini mendukung ekspresi reguler. Anda dapat berlangganan beberapa skema sekaligus. |
port | Port instans AnalyticDB untuk PostgreSQL. | INTEGER | Ya | Atur ke |
decoding.plugin.name | Nama Plugin Logical Decoding PostgreSQL. | STRING | Ya | Atur ke |
slot.name | Nama slot decoding logis. | STRING | Ya |
|
debezium.* | Mengontrol perilaku klien Debezium. | STRING | Ya | Sebagai contoh, atur |
scan.incremental.snapshot.enabled | Menentukan apakah akan mengaktifkan snapshot inkremental. | BOOLEAN | Tidak | Nilai valid:
|
scan.startup.mode | Mode startup untuk konsumsi data. | STRING | Tidak | Nilai valid:
|
changelog-mode | Menentukan bagaimana peristiwa perubahan dikodekan dalam aliran perubahan. | STRING | Tidak | Nilai valid:
|
heartbeat.interval.ms | Interval pengiriman paket heartbeat, dalam milidetik. | DURATION | Tidak | Nilai default: 30 detik. Konektor CDC AnalyticDB untuk PostgreSQL secara aktif mengirimkan paket heartbeat ke database untuk memastikan offset slot terus maju. Jika data tabel tidak sering berubah, atur opsi ini ke nilai yang sesuai untuk membersihkan log WAL secara berkala dan menghindari pemborosan disk. |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom kunci chunk selama pembacaan snapshot. | STRING | Tidak | Default ke kolom pertama dari primary key. |
Spesifik untuk Sink
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
retryWaitTime | Interval antara percobaan ulang, dalam milidetik. | INTEGER | Tidak | 100 | |
batchSize | Jumlah catatan data yang dapat ditulis ke tabel sekaligus. | INTEGER | Tidak | 500 | Tidak tersedia. |
flushIntervalMs | Interval pengosongan cache. | INTEGER | Tidak | Tidak tersedia. | Jika jumlah data yang di-cache tidak mencapai batas atas dalam periode waktu tertentu, semua data yang di-cache ditulis ke tabel sink. Unit: milidetik. |
writeMode | Mode penulisan di mana sistem mencoba menulis data ke tabel untuk pertama kalinya. | STRING | Tidak | insert | Nilai valid:
|
conflictMode | Kebijakan berdasarkan mana konflik primary key atau indeks ditangani saat data dimasukkan ke tabel. | STRING | Tidak | strict | Nilai valid:
|
Spesifik tabel dimensi
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
maxJoinRows | Jumlah maksimum baris untuk digabungkan dalam satu baris data. | INTEGER | Tidak | 1024 | Tidak tersedia. |
cache | Kebijakan cache. | STRING | Tidak | ALL | Nilai valid:
|
cacheSize | Jumlah maksimum baris data yang dapat di-cache. | LONG | Tidak | 100000 | Opsi cacheSize berlaku hanya ketika Anda mengatur opsi cache ke LRU. |
cacheTTLMs | Periode timeout cache. | LONG | Tidak | Long.MAX_VALUE | Konfigurasi opsi cacheTTLMs bervariasi berdasarkan opsi cache.
Unit: milidetik. |
Pemetaan tipe data
Tipe data AnalyticDB for PostgreSQL | Tipe data Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
SMALLINT | INT |
INT | INT |
BIGINT | BIGINT |
FLOAT | DOUBLE |
VARCHAR | VARCHAR |
TEXT | VARCHAR |
TIMESTAMP | TIMESTAMP |
DATE | DATE |
Contoh kode
Tabel Sumber (beta)
Lihat Gunakan Flink CDC untuk berlangganan data penuh dan inkremental secara real-time.
Tabel Sink:
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;Tabel dimensi:
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TEMPORARY TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;