All Products
Search
Document Center

Realtime Compute for Apache Flink:SelectDB

Last Updated:Mar 26, 2026

Konektor SelectDB mengintegrasikan Realtime Compute for Apache Flink dengan ApsaraDB for SelectDB—gudang data real-time yang sepenuhnya dikelola dan kompatibel dengan Apache Doris di Alibaba Cloud. Gunakan konektor ini untuk membangun pipeline real-time guna membaca dari, menulis ke, atau melakukan pencarian data di SelectDB, serta menjalankan sinkronisasi database penuh dalam pekerjaan ingesti data berbasis YAML.

Kemampuan yang didukung:

KategoriDetail
Jenis tabelTabel sumber, tabel sink, tabel dimensi, sink ingesti data
Mode eksekusiStream dan batch
Format dataJSON dan CSV
Jenis APIDataStream dan SQL
Dukungan Update/DeleteYa
Metrik pemantauanTidak ada

Fitur utama:

  • Sinkronisasi data database penuh

  • Semantik tepat-sekali melalui two-phase commit (2PC) — tidak ada catatan duplikat atau hilang

  • Kompatibel dengan Apache Doris 1.0 dan versi lebih baru

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 8.0.10 atau versi lebih baru

  • Instans ApsaraDB for SelectDB. Lihat Buat instans.

  • Daftar putih alamat IP yang dikonfigurasi pada instans tersebut. Lihat Konfigurasi daftar putih.

Siapkan konektor

Konektor SelectDB sudah terintegrasi dalam VVR 11.1 dan versi lebih baru — tidak perlu instalasi manual.

Untuk VVR 8.0.10 hingga 11.0, instal konektor secara manual:

  1. Unduh paket JAR dari Maven Central (versi Flink 1.15–1.17).

  2. Unggah file JAR ke konsol pengembangan Realtime Compute for Apache Flink Anda. Lihat Kelola konektor kustom.

  3. Referensikan konektor dalam pekerjaan SQL Anda menggunakan 'connector' = 'doris'.

SQL

Sintaksis

Ketiga jenis tabel — sumber, sink, dan dimensi — menggunakan sintaksis DDL yang sama. Tentukan peran tabel melalui parameter yang Anda sertakan.

Untuk menggunakan SelectDB sebagai tabel sumber, aktifkan terlebih dahulu koneksi kluster langsung. Di konsol ApsaraDB for SelectDB, buka Instance Details > Network Information, lalu klik Enable Direct Cluster Connection. Ini akan mengaktifkan protokol Arrow Flight SQL untuk pembacaan paralel berkecepatan tinggi.
CREATE TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****'
);

Parameter

Umum

ParameterWajibBawaanDeskripsi
connectorYaDitetapkan ke doris.
fenodesYaTitik akhir HTTP instans SelectDB: <Alamat VPC atau Alamat Publik>:<Port Protokol HTTP>. Dapatkan keduanya dari halaman Instance Details > Network Information di konsol SelectDB. Contoh: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.
jdbc-urlTidakString koneksi Java Database Connectivity (JDBC) untuk pencarian tabel dimensi dan kueri metadata: jdbc:mysql://<Alamat VPC atau Alamat Publik>:<Port Protokol MySQL>. Contoh: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030.
table.identifierYaTabel target dalam format <database>.<tabel>. Contoh: db.tbl.
usernameYaUsername database. Atur ulang kata sandi dari pojok kanan atas halaman Instance Details jika diperlukan.
passwordYaKata sandi untuk username database.
doris.request.retriesTidak3Jumlah percobaan ulang untuk permintaan yang gagal.
doris.request.connect.timeoutTidak30sTimeout koneksi.
doris.request.read.timeoutTidak30sTimeout baca.

Tabel sumber

ParameterWajibBawaanDeskripsi
doris.request.query.timeoutTidak21600sTimeout kueri (bawaan 6 jam).
doris.request.tablet.sizeTidak1Jumlah tablet per partisi. Nilai yang lebih rendah meningkatkan paralelisme Flink tetapi memberi tekanan lebih besar pada database.
doris.batch.sizeTidak4064Jumlah maksimum baris yang dibaca dari node Backend (BE) per permintaan. Tingkatkan nilai ini untuk mengurangi overhead koneksi dan latensi jaringan.
doris.exec.mem.limitTidak8192mbBatas memori per kueri dalam byte (bawaan 8 GB).
source.use-flight-sqlTidakfalseTidak perlu konfigurasi tambahan — mengaktifkan Direct Cluster Connection di konsol SelectDB secara otomatis mengaktifkan Arrow Flight SQL.
source.flight-sql-portTidakPort Arrow Flight SQL (arrow_flight_sql_port) dari node Frontend (FE).

Tabel sink

Mode penulisan memengaruhi jaminan pengiriman dan perilaku flush. Pilih berdasarkan kebutuhan konsistensi Anda:

Penulisan streamingPenulisan batch
Kondisi pemicuMengikuti interval checkpoint FlinkFlush berkala berdasarkan volume data atau ambang batas waktu
Jaminan pengirimanTepat-sekali (melalui 2PC)Setidaknya-sekali; capai idempotensi dengan model Unique
LatensiDibatasi oleh interval checkpointFleksibel, tidak bergantung pada checkpoint
Toleransi kesalahanPemulihan status Flink penuhMengandalkan deduplikasi model Unique
ParameterWajibBawaanDeskripsi
sink.label-prefixTidakAwalan label untuk impor Stream Load. Harus unik secara global di semua pekerjaan — label yang sama hanya dapat dikomit satu kali. Diperlukan untuk menjamin semantik tepat-sekali saat pekerjaan dimulai ulang.
sink.properties.*TidakParameter impor Stream Load yang diteruskan langsung ke API Stream Load SelectDB. Lihat contoh di bawah.
sink.enable-deleteTidaktrueTeruskan operasi DELETE. Memerlukan tabel Doris agar memiliki penghapusan batch yang diaktifkan dan hanya berfungsi dengan model Unique.
sink.enable-2pcTidaktrueAktifkan two-phase commit (2PC) untuk semantik tepat-sekali. Lihat Operasi Transaksi Eksplisit.
sink.buffer-sizeTidak1 MBUkuran buffer cache tulis dalam byte. Biarkan pada nilai bawaan.
sink.buffer-countTidak3Jumlah buffer cache tulis. Biarkan pada nilai bawaan.
sink.max-retriesTidak3Jumlah maksimum percobaan ulang setelah kegagalan komit.
sink.enable.batch-modeTidakfalseAlihkan ke mode penulisan batch. Flush dikontrol oleh tiga parameter sink.buffer-flush.* di bawah ini, bukan oleh checkpoint. Semantik tepat-sekali tidak dijamin; gunakan model Unique untuk idempotensi.
sink.flush.queue-sizeTidak2Ukuran antrean cache dalam mode batch.
sink.buffer-flush.max-rowsTidak500000Jumlah maksimum baris per flush dalam mode batch.
sink.buffer-flush.max-bytesTidak100 MBJumlah maksimum byte per flush dalam mode batch.
sink.buffer-flush.intervalTidak10sInterval flush dalam mode batch.
sink.ignore.update-beforeTidaktrueAbaikan event update-before dari Flink CDC.

**Contoh sink.properties.*:**

Format CSV:

'sink.properties.column_separator' = ','
-- Jika nilai mungkin mengandung koma, gunakan pemisah non-printable:
-- 'sink.properties.column_separator' = '\x01'

Format JSON:

'sink.properties.format'            = 'json',
'sink.properties.read_json_by_line' = 'true'
-- Alternatif: 'sink.properties.strip_outer_array' = 'true'

Tabel dimensi

ParameterWajibBawaanDeskripsi
lookup.cache.max-rowsTidak-1Jumlah maksimum baris dalam cache lookup. -1 menonaktifkan caching.
lookup.cache.ttlTidak10sWaktu hidup (TTL) entri cache.
lookup.max-retriesTidak1Jumlah percobaan ulang setelah kueri lookup gagal.
lookup.jdbc.asyncTidakfalseAktifkan lookup asinkron.
lookup.jdbc.read.batch.sizeTidak128Ukuran batch maksimum per kueri dalam mode lookup asinkron.
lookup.jdbc.read.batch.queue-sizeTidak256Ukuran antrean buffer antara dalam mode lookup asinkron.
lookup.jdbc.read.thread-sizeTidak3Jumlah thread lookup JDBC per task dalam mode lookup asinkron.

Contoh

Tabel sumber

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****'
);

Tabel sink

CREATE TEMPORARY TABLE selectdb_sink (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****',
  'sink.label-prefix' = 'flink_orders'  -- Harus unik secara global di semua pekerjaan
);

Tabel dimensi

SelectDB berperan sebagai tabel dimensi lookup yang digabungkan dengan tabel fakta streaming.

-- Tabel fakta dari Kafka
CREATE TEMPORARY TABLE fact_table (
  `id`           BIGINT,
  `name`         STRING,
  `city`         STRING,
  `process_time` AS proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

-- Tabel dimensi dari SelectDB
CREATE TEMPORARY TABLE dim_city (
  `city`     STRING,
  `level`    INT,
  `province` STRING,
  `country`  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'jdbc-url'         = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
  'table.identifier' = 'dim.dim_city',
  'username'         = 'admin',
  'password'         = '****'
);

-- Temporal join
SELECT a.id, a.name, a.city, c.province, c.country, c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city;

Ingesti data

Gunakan konektor SelectDB sebagai sink dalam pekerjaan ingesti data berbasis YAML untuk sinkronisasi database penuh.

Sintaksis

source:
  type: <source-type>

sink:
  type: doris
  name: Doris Sink
  fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
  username: root
  password: ""

Parameter

ParameterWajibBawaanDeskripsi
typeYaDitetapkan ke doris.
nameTidakNama deskriptif untuk sink.
fenodesYaTitik akhir HTTP: <Alamat VPC atau Alamat Publik>:<Port Protokol HTTP>. Dapatkan keduanya dari halaman Instance Details > Network Information di konsol SelectDB. Contoh: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.
jdbc-urlTidakString koneksi JDBC. Contoh: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030.
usernameYaUsername database.
passwordYaKata sandi untuk username database.
sink.enable.batch-modeTidaktrueMode batch aktif secara bawaan dalam pekerjaan ingesti data. Flush dikontrol oleh tiga parameter sink.buffer-flush.*. Semantik tepat-sekali tidak dijamin; gunakan model Unique untuk idempotensi.
sink.flush.queue-sizeTidak2Ukuran antrean cache.
sink.buffer-flush.max-rowsTidak500000Jumlah maksimum baris per flush.
sink.buffer-flush.max-bytesTidak100 MBJumlah maksimum byte per flush.
sink.buffer-flush.intervalTidak10sInterval flush. Minimum: 1s.
sink.properties.*TidakParameter impor Stream Load.

**Contoh sink.properties.*:**

Format CSV:

sink.properties.column_separator: ','
# Jika nilai mungkin mengandung koma, gunakan pemisah non-printable:
# sink.properties.column_separator: '\x01'

Format JSON:

sink.properties.format: 'json'
sink.properties.read_json_by_line: 'true'

Pemetaan tipe

Flink ke SelectDB

Tipe Flink CDCTipe SelectDBCatatan
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
DECIMALDECIMAL
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATEDATE
TIMESTAMP[(p)]DATETIME[(p)]
TIMESTAMP_LTZ[(p)]DATETIME[(p)]
CHAR(n)CHAR(n*3)SelectDB menyimpan string dalam UTF-8. Karakter Inggris menempati 1 byte; karakter Cina menempati 3 byte. Panjang maksimum CHAR adalah 255; nilai yang lebih panjang akan dikonversi otomatis ke VARCHAR.
VARCHAR(n)VARCHAR(n*3)Pengali UTF-8 yang sama berlaku. Panjang maksimum VARCHAR adalah 65533; nilai yang lebih panjang akan dikonversi otomatis ke STRING.
BINARY(n)STRING
VARBINARY(n)STRING
STRINGSTRING

Langkah selanjutnya