Konektor SelectDB mengintegrasikan Realtime Compute for Apache Flink dengan ApsaraDB for SelectDB—gudang data real-time yang sepenuhnya dikelola dan kompatibel dengan Apache Doris di Alibaba Cloud. Gunakan konektor ini untuk membangun pipeline real-time guna membaca dari, menulis ke, atau melakukan pencarian data di SelectDB, serta menjalankan sinkronisasi database penuh dalam pekerjaan ingesti data berbasis YAML.
Kemampuan yang didukung:
| Kategori | Detail |
|---|---|
| Jenis tabel | Tabel sumber, tabel sink, tabel dimensi, sink ingesti data |
| Mode eksekusi | Stream dan batch |
| Format data | JSON dan CSV |
| Jenis API | DataStream dan SQL |
| Dukungan Update/Delete | Ya |
| Metrik pemantauan | Tidak ada |
Fitur utama:
Sinkronisasi data database penuh
Semantik tepat-sekali melalui two-phase commit (2PC) — tidak ada catatan duplikat atau hilang
Kompatibel dengan Apache Doris 1.0 dan versi lebih baru
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 8.0.10 atau versi lebih baru
Instans ApsaraDB for SelectDB. Lihat Buat instans.
Daftar putih alamat IP yang dikonfigurasi pada instans tersebut. Lihat Konfigurasi daftar putih.
Siapkan konektor
Konektor SelectDB sudah terintegrasi dalam VVR 11.1 dan versi lebih baru — tidak perlu instalasi manual.
Untuk VVR 8.0.10 hingga 11.0, instal konektor secara manual:
Unduh paket JAR dari Maven Central (versi Flink 1.15–1.17).
Unggah file JAR ke konsol pengembangan Realtime Compute for Apache Flink Anda. Lihat Kelola konektor kustom.
Referensikan konektor dalam pekerjaan SQL Anda menggunakan
'connector' = 'doris'.
SQL
Sintaksis
Ketiga jenis tabel — sumber, sink, dan dimensi — menggunakan sintaksis DDL yang sama. Tentukan peran tabel melalui parameter yang Anda sertakan.
Untuk menggunakan SelectDB sebagai tabel sumber, aktifkan terlebih dahulu koneksi kluster langsung. Di konsol ApsaraDB for SelectDB, buka Instance Details > Network Information, lalu klik Enable Direct Cluster Connection. Ini akan mengaktifkan protokol Arrow Flight SQL untuk pembacaan paralel berkecepatan tinggi.
CREATE TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);Parameter
Umum
| Parameter | Wajib | Bawaan | Deskripsi |
|---|---|---|---|
connector | Ya | — | Ditetapkan ke doris. |
fenodes | Ya | — | Titik akhir HTTP instans SelectDB: <Alamat VPC atau Alamat Publik>:<Port Protokol HTTP>. Dapatkan keduanya dari halaman Instance Details > Network Information di konsol SelectDB. Contoh: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080. |
jdbc-url | Tidak | — | String koneksi Java Database Connectivity (JDBC) untuk pencarian tabel dimensi dan kueri metadata: jdbc:mysql://<Alamat VPC atau Alamat Publik>:<Port Protokol MySQL>. Contoh: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030. |
table.identifier | Ya | — | Tabel target dalam format <database>.<tabel>. Contoh: db.tbl. |
username | Ya | — | Username database. Atur ulang kata sandi dari pojok kanan atas halaman Instance Details jika diperlukan. |
password | Ya | — | Kata sandi untuk username database. |
doris.request.retries | Tidak | 3 | Jumlah percobaan ulang untuk permintaan yang gagal. |
doris.request.connect.timeout | Tidak | 30s | Timeout koneksi. |
doris.request.read.timeout | Tidak | 30s | Timeout baca. |
Tabel sumber
| Parameter | Wajib | Bawaan | Deskripsi |
|---|---|---|---|
doris.request.query.timeout | Tidak | 21600s | Timeout kueri (bawaan 6 jam). |
doris.request.tablet.size | Tidak | 1 | Jumlah tablet per partisi. Nilai yang lebih rendah meningkatkan paralelisme Flink tetapi memberi tekanan lebih besar pada database. |
doris.batch.size | Tidak | 4064 | Jumlah maksimum baris yang dibaca dari node Backend (BE) per permintaan. Tingkatkan nilai ini untuk mengurangi overhead koneksi dan latensi jaringan. |
doris.exec.mem.limit | Tidak | 8192mb | Batas memori per kueri dalam byte (bawaan 8 GB). |
source.use-flight-sql | Tidak | false | Tidak perlu konfigurasi tambahan — mengaktifkan Direct Cluster Connection di konsol SelectDB secara otomatis mengaktifkan Arrow Flight SQL. |
source.flight-sql-port | Tidak | — | Port Arrow Flight SQL (arrow_flight_sql_port) dari node Frontend (FE). |
Tabel sink
Mode penulisan memengaruhi jaminan pengiriman dan perilaku flush. Pilih berdasarkan kebutuhan konsistensi Anda:
| Penulisan streaming | Penulisan batch | |
|---|---|---|
| Kondisi pemicu | Mengikuti interval checkpoint Flink | Flush berkala berdasarkan volume data atau ambang batas waktu |
| Jaminan pengiriman | Tepat-sekali (melalui 2PC) | Setidaknya-sekali; capai idempotensi dengan model Unique |
| Latensi | Dibatasi oleh interval checkpoint | Fleksibel, tidak bergantung pada checkpoint |
| Toleransi kesalahan | Pemulihan status Flink penuh | Mengandalkan deduplikasi model Unique |
| Parameter | Wajib | Bawaan | Deskripsi |
|---|---|---|---|
sink.label-prefix | Tidak | — | Awalan label untuk impor Stream Load. Harus unik secara global di semua pekerjaan — label yang sama hanya dapat dikomit satu kali. Diperlukan untuk menjamin semantik tepat-sekali saat pekerjaan dimulai ulang. |
sink.properties.* | Tidak | — | Parameter impor Stream Load yang diteruskan langsung ke API Stream Load SelectDB. Lihat contoh di bawah. |
sink.enable-delete | Tidak | true | Teruskan operasi DELETE. Memerlukan tabel Doris agar memiliki penghapusan batch yang diaktifkan dan hanya berfungsi dengan model Unique. |
sink.enable-2pc | Tidak | true | Aktifkan two-phase commit (2PC) untuk semantik tepat-sekali. Lihat Operasi Transaksi Eksplisit. |
sink.buffer-size | Tidak | 1 MB | Ukuran buffer cache tulis dalam byte. Biarkan pada nilai bawaan. |
sink.buffer-count | Tidak | 3 | Jumlah buffer cache tulis. Biarkan pada nilai bawaan. |
sink.max-retries | Tidak | 3 | Jumlah maksimum percobaan ulang setelah kegagalan komit. |
sink.enable.batch-mode | Tidak | false | Alihkan ke mode penulisan batch. Flush dikontrol oleh tiga parameter sink.buffer-flush.* di bawah ini, bukan oleh checkpoint. Semantik tepat-sekali tidak dijamin; gunakan model Unique untuk idempotensi. |
sink.flush.queue-size | Tidak | 2 | Ukuran antrean cache dalam mode batch. |
sink.buffer-flush.max-rows | Tidak | 500000 | Jumlah maksimum baris per flush dalam mode batch. |
sink.buffer-flush.max-bytes | Tidak | 100 MB | Jumlah maksimum byte per flush dalam mode batch. |
sink.buffer-flush.interval | Tidak | 10s | Interval flush dalam mode batch. |
sink.ignore.update-before | Tidak | true | Abaikan event update-before dari Flink CDC. |
**Contoh sink.properties.*:**
Format CSV:
'sink.properties.column_separator' = ','
-- Jika nilai mungkin mengandung koma, gunakan pemisah non-printable:
-- 'sink.properties.column_separator' = '\x01'Format JSON:
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
-- Alternatif: 'sink.properties.strip_outer_array' = 'true'Tabel dimensi
| Parameter | Wajib | Bawaan | Deskripsi |
|---|---|---|---|
lookup.cache.max-rows | Tidak | -1 | Jumlah maksimum baris dalam cache lookup. -1 menonaktifkan caching. |
lookup.cache.ttl | Tidak | 10s | Waktu hidup (TTL) entri cache. |
lookup.max-retries | Tidak | 1 | Jumlah percobaan ulang setelah kueri lookup gagal. |
lookup.jdbc.async | Tidak | false | Aktifkan lookup asinkron. |
lookup.jdbc.read.batch.size | Tidak | 128 | Ukuran batch maksimum per kueri dalam mode lookup asinkron. |
lookup.jdbc.read.batch.queue-size | Tidak | 256 | Ukuran antrean buffer antara dalam mode lookup asinkron. |
lookup.jdbc.read.thread-size | Tidak | 3 | Jumlah thread lookup JDBC per task dalam mode lookup asinkron. |
Contoh
Tabel sumber
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);Tabel sink
CREATE TEMPORARY TABLE selectdb_sink (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****',
'sink.label-prefix' = 'flink_orders' -- Harus unik secara global di semua pekerjaan
);Tabel dimensi
SelectDB berperan sebagai tabel dimensi lookup yang digabungkan dengan tabel fakta streaming.
-- Tabel fakta dari Kafka
CREATE TEMPORARY TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` AS proctime()
) WITH (
'connector' = 'kafka',
...
);
-- Tabel dimensi dari SelectDB
CREATE TEMPORARY TABLE dim_city (
`city` STRING,
`level` INT,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'admin',
'password' = '****'
);
-- Temporal join
SELECT a.id, a.name, a.city, c.province, c.country, c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city;Ingesti data
Gunakan konektor SelectDB sebagai sink dalam pekerjaan ingesti data berbasis YAML untuk sinkronisasi database penuh.
Sintaksis
source:
type: <source-type>
sink:
type: doris
name: Doris Sink
fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
username: root
password: ""Parameter
| Parameter | Wajib | Bawaan | Deskripsi |
|---|---|---|---|
type | Ya | — | Ditetapkan ke doris. |
name | Tidak | — | Nama deskriptif untuk sink. |
fenodes | Ya | — | Titik akhir HTTP: <Alamat VPC atau Alamat Publik>:<Port Protokol HTTP>. Dapatkan keduanya dari halaman Instance Details > Network Information di konsol SelectDB. Contoh: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080. |
jdbc-url | Tidak | — | String koneksi JDBC. Contoh: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030. |
username | Ya | — | Username database. |
password | Ya | — | Kata sandi untuk username database. |
sink.enable.batch-mode | Tidak | true | Mode batch aktif secara bawaan dalam pekerjaan ingesti data. Flush dikontrol oleh tiga parameter sink.buffer-flush.*. Semantik tepat-sekali tidak dijamin; gunakan model Unique untuk idempotensi. |
sink.flush.queue-size | Tidak | 2 | Ukuran antrean cache. |
sink.buffer-flush.max-rows | Tidak | 500000 | Jumlah maksimum baris per flush. |
sink.buffer-flush.max-bytes | Tidak | 100 MB | Jumlah maksimum byte per flush. |
sink.buffer-flush.interval | Tidak | 10s | Interval flush. Minimum: 1s. |
sink.properties.* | Tidak | — | Parameter impor Stream Load. |
**Contoh sink.properties.*:**
Format CSV:
sink.properties.column_separator: ','
# Jika nilai mungkin mengandung koma, gunakan pemisah non-printable:
# sink.properties.column_separator: '\x01'Format JSON:
sink.properties.format: 'json'
sink.properties.read_json_by_line: 'true'Pemetaan tipe
Flink ke SelectDB
| Tipe Flink CDC | Tipe SelectDB | Catatan |
|---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
DECIMAL | DECIMAL | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP[(p)] | DATETIME[(p)] | |
TIMESTAMP_LTZ[(p)] | DATETIME[(p)] | |
CHAR(n) | CHAR(n*3) | SelectDB menyimpan string dalam UTF-8. Karakter Inggris menempati 1 byte; karakter Cina menempati 3 byte. Panjang maksimum CHAR adalah 255; nilai yang lebih panjang akan dikonversi otomatis ke VARCHAR. |
VARCHAR(n) | VARCHAR(n*3) | Pengali UTF-8 yang sama berlaku. Panjang maksimum VARCHAR adalah 65533; nilai yang lebih panjang akan dikonversi otomatis ke STRING. |
BINARY(n) | STRING | |
VARBINARY(n) | STRING | |
STRING | STRING |