All Products
Search
Document Center

Realtime Compute for Apache Flink:Konektor ClickHouse

Last Updated:Mar 26, 2026

Konektor ClickHouse memungkinkan Anda menulis data Flink SQL ke tabel hasil ClickHouse dalam mode batch maupun streaming. Konektor ini mendukung penulisan langsung ke tabel standar, tabel terdistribusi, dan tabel lokal.

Prasyarat

Sebelum memulai, pastikan Anda telah:

Kemampuan konektor

ItemDeskripsi
Tipe tabelTabel hasil
Mode berjalanMode batch dan mode streaming
Format dataN/A
MetriknumRecordsOut, numRecordsOutPerSecond, currentSendTime. Untuk detailnya, lihat Metrics.
Tipe APISQL API
Pembaruan dan penghapusan dataDidukung jika kunci primer didefinisikan dalam pernyataan DDL dan parameter ignoreDelete diatur ke false. Mengaktifkan fitur ini secara signifikan mengurangi throughput penulisan.

Batasan

  • Parameter sink.parallelism tidak didukung.

  • Tabel hasil secara default mendukung semantik at-least-once.

  • Memerlukan Ververica Runtime (VVR) versi 3.0.2 atau lebih baru.

  • Parameter ignoreDelete memerlukan VVR 3.0.3, VVR 4.0.7, atau versi minor yang lebih baru dari salah satu versi tersebut.

  • Tipe data NESTED memerlukan VVR 4.0.10 atau lebih baru.

  • Menulis langsung ke tabel lokal ClickHouse yang berkorespondensi dengan tabel terdistribusi memerlukan VVR 4.0.11 atau lebih baru. Hanya kluster ApsaraDB for ClickHouse Community-compatible Edition yang mendukung fitur ini.

  • Semantik exactly-once untuk kluster EMR ClickHouse memerlukan VVR 4.0.11 atau lebih baru. Fitur ini tidak lagi tersedia untuk kluster EMR V3.45.1 atau versi minor EMR yang lebih baru dari V5.11.1, karena adanya perubahan kemampuan pada EMR ClickHouse.

  • Mode penulisan balance memerlukan VVR 8.0.7 atau lebih baru.

Sintaksis

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000',
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

Parameter dalam klausa WITH

Parameter wajib

ParameterTipe dataDeskripsi
connectorSTRINGJenis konektor. Atur ke clickhouse.
urlSTRINGURL JDBC ClickHouse, dalam format jdbc:clickhouse://<yourNetworkAddress>:<PortId>/<yourDatabaseName>. Jika Anda menghilangkan nama database, database default akan digunakan. Untuk menulis ke tabel terdistribusi, atur URL ini ke alamat node yang menghosting tabel terdistribusi tersebut. Untuk menulis langsung ke tabel lokal dengan node yang ditentukan secara manual, cantumkan beberapa alamat node yang dipisahkan koma (lihat contoh di bawah).
userNameSTRINGUsername ClickHouse.
passwordSTRINGPassword ClickHouse.
tableNameSTRINGNama tabel ClickHouse. Saat menulis langsung ke tabel lokal (shardWrite=true), atur ke nama tabel lokal kecuali jika inferLocalTable=true, dalam hal ini atur ke nama tabel terdistribusi.

Parameter opsional

ParameterTipe dataDefaultDeskripsi
maxRetryTimesINT3Jumlah maksimum percobaan ulang saat penulisan gagal.
batchSizeINT100Jumlah catatan yang dibuffer sebelum flush dilakukan. Flush dipicu ketika buffer mencapai jumlah catatan batchSize atau ketika interval flushIntervalMs telah berlalu, mana yang lebih dulu terjadi.
flushIntervalMsLONG1000Interval maksimum antar flush, dalam milidetik.
ignoreDeleteBOOLEANtrueApakah pesan DELETE diabaikan. Saat diatur ke false dan kunci primer didefinisikan, konektor menjalankan pernyataan ALTER untuk menghapus baris yang sesuai. Mengatur nilai ini ke false secara signifikan mengurangi throughput dan tidak kompatibel dengan writeMode=partition.
shardWriteBOOLEANfalseApakah melewati tabel terdistribusi dan menulis langsung ke tabel lokal yang mendasarinya. Atur ke true untuk meningkatkan throughput penulisan pada tabel terdistribusi. Saat false, atur tableName ke nama tabel terdistribusi.
inferLocalTableBOOLEANfalseSaat shardWrite=true, apakah Flink secara otomatis menemukan node tabel lokal dari metadata tabel terdistribusi. Memerlukan shardWrite=true, tableName diatur ke nama tabel terdistribusi, dan url mengarah ke node yang menghosting tabel terdistribusi. Tidak diperlukan untuk tabel non-terdistribusi.
writeModeENUMdefaultStrategi distribusi penulisan untuk tabel lokal. Lihat Choose a write mode untuk panduan. Nilai yang valid: default, partition, random, balance.
shardingKeySTRINGNoneBidang (atau bidang-bidang yang dipisahkan koma) yang digunakan untuk merutekan catatan saat writeMode=partition. Catatan dengan nilai kunci yang sama akan dikirim ke node tabel lokal yang sama.
exactlyOnceBOOLEANfalseApakah menggunakan semantik exactly-once. Hanya didukung untuk kluster ClickHouse yang diterapkan di Alibaba Cloud EMR. Tidak kompatibel dengan writeMode=partition.

Pilih mode penulisan

Parameter writeMode mengontrol cara catatan didistribusikan ke node tabel lokal saat menulis ke tabel terdistribusi ClickHouse dengan shardWrite=true.

Mode penulisanStrategi distribusiGunakan saat
defaultSemua penulisan dialihkan ke node pertamaMenulis ke kluster single-node atau pengujian
partitionCatatan dengan shardingKey yang sama selalu dikirim ke node yang samaAnda memerlukan co-location catatan terkait pada satu node
randomSetiap catatan ditulis ke node yang dipilih secara acakAnda menginginkan penyebaran beban sederhana tanpa afinitas kunci
balanceCatatan didistribusikan ke node secara round-robinAnda menginginkan distribusi merata tanpa afinitas kunci (memerlukan VVR 8.0.7 atau lebih baru)
writeMode=partition memerlukan ignoreDelete=true dan tidak kompatibel dengan exactlyOnce=true.

Pemetaan tipe data

Tipe FlinkTipe ClickHouseDidukungCatatan
BOOLEANUInt8 / BooleanYaClickHouse V21.12 dan versi lebih baru mendukung tipe BOOLEAN asli. Versi sebelumnya memetakan ke UInt8.
TINYINTInt8Ya
SMALLINTInt16Ya
INTEGERInt32Ya
BIGINTInt64Ya
BIGINTUInt32Ya
FLOATFloat32Ya
DOUBLEFloat64Ya
CHARFixedStringYa
VARCHARSTRINGYa
BINARYFixedStringYa
VARBINARYSTRINGYa
DATEDateYa
TIMESTAMP(0)DateTimeYaAkurat hingga detik.
TIMESTAMP(x)DateTime64(x)YaUntuk VVR sebelum 6.0.6, presisi dipotong menjadi detik karena keterbatasan driver JDBC. VVR 6.0.6 dan versi lebih baru menulis DateTime64 dengan presisi penuh.
DECIMALDECIMALYa
ARRAYARRAYYa
ARRAY (per field)NestedYaPetakan setiap field kolom NESTED ClickHouse ke kolom ARRAY terpisah di Flink. Lihat contoh di bawah.
TIMETidakTidak didukung.
MAPTidakTidak didukung.
MULTISETTidakTidak didukung.
ROWTidakTidak didukung.

Bekerja dengan kolom NESTED

Untuk menulis ke kolom NESTED ClickHouse, petakan setiap sub-field ke kolom ARRAY terpisah di Flink, menggunakan konvensi penamaan ColumnName.FieldName.

Definisi tabel ClickHouse:

CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
);

DDL Flink yang sesuai:

CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID`      ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);

Contoh

Menulis ke satu tabel ClickHouse

Contoh ini menggunakan konektor datagen bawaan untuk menghasilkan catatan dan menuliskannya ke tabel ClickHouse.

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'      = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url'       = '<yourUrl>',
  'userName'  = '<yourUsername>',
  'password'  = '<yourPassword>',
  'tableName' = '<yourTablename>'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

Menulis langsung ke tabel lokal dari tabel terdistribusi

Asumsikan tiga tabel lokal bernama local_table_test ada di node 192.XX.XX.1, 192.XX.XX.2, dan 192.XX.XX.3, serta sebuah tabel terdistribusi bernama distributed_table_test dibangun di atasnya.

Tentukan node secara manual

Cantumkan semua alamat node tabel lokal di url dan atur tableName ke nama tabel lokal. Catatan dengan nilai shardingKey yang sama akan dirutekan ke node yang sama.

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'       = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'  = 'clickhouse',
  'url'        = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
  'userName'   = '<yourUsername>',
  'password'   = '<yourPassword>',
  'tableName'  = 'local_table_test',
  'shardWrite' = 'true',
  'writeMode'  = 'partition',
  'shardingKey' = 'name'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

Biarkan Flink menemukan node tabel lokal secara otomatis

Atur inferLocalTable=true dan arahkan url ke node mana pun dari tabel terdistribusi. Flink melakukan kueri ke system.clusters untuk menemukan node tabel lokal secara otomatis.

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'       = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'      = 'clickhouse',
  'url'            = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- URL dari node mana pun yang menghosting tabel terdistribusi
  'userName'       = '<yourUsername>',
  'password'       = '<yourPassword>',
  'tableName'      = 'distributed_table_test',                    -- Nama tabel terdistribusi
  'shardWrite'     = 'true',
  'inferLocalTable' = 'true',                                     -- Temukan node tabel lokal secara otomatis
  'writeMode'      = 'partition',
  'shardingKey'    = 'name'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

FAQ

Apakah saya dapat menarik kembali (menghapus) data yang diperbarui dari tabel hasil ApsaraDB for ClickHouse?

Atur ignoreDelete=false dan definisikan kunci primer dalam pernyataan DDL. Konektor kemudian akan mengeluarkan pernyataan ALTER DELETE saat menerima pesan retract. Perlu diperhatikan bahwa hal ini secara signifikan mengurangi throughput penulisan, dan tidak dapat digunakan bersamaan dengan writeMode=partition atau exactlyOnce=true. Untuk detail selengkapnya, lihat Can I retract the updated data from an ApsaraDB for ClickHouse result table?

Kapan data yang ditulis menjadi terlihat di ClickHouse?

ClickHouse menggunakan penggabungan part secara asinkron, sehingga data mungkin tidak langsung terlihat setelah penulisan selesai. Waktu tunda tergantung pada interval penggabungan yang dikonfigurasi di kluster ClickHouse Anda. Untuk detail selengkapnya, lihat When can I view the data that is written to a ClickHouse sink table in the ClickHouse console?