Konektor ClickHouse memungkinkan Anda menulis data Flink SQL ke tabel hasil ClickHouse dalam mode batch maupun streaming. Konektor ini mendukung penulisan langsung ke tabel standar, tabel terdistribusi, dan tabel lokal.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Tabel ClickHouse. Untuk informasi selengkapnya, lihat Buat tabel baru.
Daftar putih yang dikonfigurasi untuk kluster ClickHouse Anda:
ApsaraDB for ClickHouse cluster: Configure the whitelist
Alibaba Cloud E-MapReduce (EMR) ClickHouse cluster: Manage security groups
ClickHouse yang dikelola sendiri pada Elastic Compute Service (ECS): Overview
Penerapan lainnya: Tambahkan Blok CIDR dari vSwitch yang digunakan oleh Realtime Compute for Apache Flink ke daftar izin mesin yang menjalankan kluster ClickHouse Anda. Untuk menemukan Blok CIDR tersebut, lihat FAQ about workspace and namespace management and operations.
Kemampuan konektor
| Item | Deskripsi |
|---|---|
| Tipe tabel | Tabel hasil |
| Mode berjalan | Mode batch dan mode streaming |
| Format data | N/A |
| Metrik | numRecordsOut, numRecordsOutPerSecond, currentSendTime. Untuk detailnya, lihat Metrics. |
| Tipe API | SQL API |
| Pembaruan dan penghapusan data | Didukung jika kunci primer didefinisikan dalam pernyataan DDL dan parameter ignoreDelete diatur ke false. Mengaktifkan fitur ini secara signifikan mengurangi throughput penulisan. |
Batasan
Parameter
sink.parallelismtidak didukung.Tabel hasil secara default mendukung semantik at-least-once.
Memerlukan Ververica Runtime (VVR) versi 3.0.2 atau lebih baru.
Parameter
ignoreDeletememerlukan VVR 3.0.3, VVR 4.0.7, atau versi minor yang lebih baru dari salah satu versi tersebut.Tipe data NESTED memerlukan VVR 4.0.10 atau lebih baru.
Menulis langsung ke tabel lokal ClickHouse yang berkorespondensi dengan tabel terdistribusi memerlukan VVR 4.0.11 atau lebih baru. Hanya kluster ApsaraDB for ClickHouse Community-compatible Edition yang mendukung fitur ini.
Semantik exactly-once untuk kluster EMR ClickHouse memerlukan VVR 4.0.11 atau lebih baru. Fitur ini tidak lagi tersedia untuk kluster EMR V3.45.1 atau versi minor EMR yang lebih baru dari V5.11.1, karena adanya perubahan kemampuan pada EMR ClickHouse.
Mode penulisan
balancememerlukan VVR 8.0.7 atau lebih baru.
Sintaksis
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000',
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);Parameter dalam klausa WITH
Parameter wajib
| Parameter | Tipe data | Deskripsi |
|---|---|---|
connector | STRING | Jenis konektor. Atur ke clickhouse. |
url | STRING | URL JDBC ClickHouse, dalam format jdbc:clickhouse://<yourNetworkAddress>:<PortId>/<yourDatabaseName>. Jika Anda menghilangkan nama database, database default akan digunakan. Untuk menulis ke tabel terdistribusi, atur URL ini ke alamat node yang menghosting tabel terdistribusi tersebut. Untuk menulis langsung ke tabel lokal dengan node yang ditentukan secara manual, cantumkan beberapa alamat node yang dipisahkan koma (lihat contoh di bawah). |
userName | STRING | Username ClickHouse. |
password | STRING | Password ClickHouse. |
tableName | STRING | Nama tabel ClickHouse. Saat menulis langsung ke tabel lokal (shardWrite=true), atur ke nama tabel lokal kecuali jika inferLocalTable=true, dalam hal ini atur ke nama tabel terdistribusi. |
Parameter opsional
| Parameter | Tipe data | Default | Deskripsi |
|---|---|---|---|
maxRetryTimes | INT | 3 | Jumlah maksimum percobaan ulang saat penulisan gagal. |
batchSize | INT | 100 | Jumlah catatan yang dibuffer sebelum flush dilakukan. Flush dipicu ketika buffer mencapai jumlah catatan batchSize atau ketika interval flushIntervalMs telah berlalu, mana yang lebih dulu terjadi. |
flushIntervalMs | LONG | 1000 | Interval maksimum antar flush, dalam milidetik. |
ignoreDelete | BOOLEAN | true | Apakah pesan DELETE diabaikan. Saat diatur ke false dan kunci primer didefinisikan, konektor menjalankan pernyataan ALTER untuk menghapus baris yang sesuai. Mengatur nilai ini ke false secara signifikan mengurangi throughput dan tidak kompatibel dengan writeMode=partition. |
shardWrite | BOOLEAN | false | Apakah melewati tabel terdistribusi dan menulis langsung ke tabel lokal yang mendasarinya. Atur ke true untuk meningkatkan throughput penulisan pada tabel terdistribusi. Saat false, atur tableName ke nama tabel terdistribusi. |
inferLocalTable | BOOLEAN | false | Saat shardWrite=true, apakah Flink secara otomatis menemukan node tabel lokal dari metadata tabel terdistribusi. Memerlukan shardWrite=true, tableName diatur ke nama tabel terdistribusi, dan url mengarah ke node yang menghosting tabel terdistribusi. Tidak diperlukan untuk tabel non-terdistribusi. |
writeMode | ENUM | default | Strategi distribusi penulisan untuk tabel lokal. Lihat Choose a write mode untuk panduan. Nilai yang valid: default, partition, random, balance. |
shardingKey | STRING | None | Bidang (atau bidang-bidang yang dipisahkan koma) yang digunakan untuk merutekan catatan saat writeMode=partition. Catatan dengan nilai kunci yang sama akan dikirim ke node tabel lokal yang sama. |
exactlyOnce | BOOLEAN | false | Apakah menggunakan semantik exactly-once. Hanya didukung untuk kluster ClickHouse yang diterapkan di Alibaba Cloud EMR. Tidak kompatibel dengan writeMode=partition. |
Pilih mode penulisan
Parameter writeMode mengontrol cara catatan didistribusikan ke node tabel lokal saat menulis ke tabel terdistribusi ClickHouse dengan shardWrite=true.
| Mode penulisan | Strategi distribusi | Gunakan saat |
|---|---|---|
default | Semua penulisan dialihkan ke node pertama | Menulis ke kluster single-node atau pengujian |
partition | Catatan dengan shardingKey yang sama selalu dikirim ke node yang sama | Anda memerlukan co-location catatan terkait pada satu node |
random | Setiap catatan ditulis ke node yang dipilih secara acak | Anda menginginkan penyebaran beban sederhana tanpa afinitas kunci |
balance | Catatan didistribusikan ke node secara round-robin | Anda menginginkan distribusi merata tanpa afinitas kunci (memerlukan VVR 8.0.7 atau lebih baru) |
writeMode=partitionmemerlukanignoreDelete=truedan tidak kompatibel denganexactlyOnce=true.
Pemetaan tipe data
| Tipe Flink | Tipe ClickHouse | Didukung | Catatan |
|---|---|---|---|
| BOOLEAN | UInt8 / Boolean | Ya | ClickHouse V21.12 dan versi lebih baru mendukung tipe BOOLEAN asli. Versi sebelumnya memetakan ke UInt8. |
| TINYINT | Int8 | Ya | |
| SMALLINT | Int16 | Ya | |
| INTEGER | Int32 | Ya | |
| BIGINT | Int64 | Ya | |
| BIGINT | UInt32 | Ya | |
| FLOAT | Float32 | Ya | |
| DOUBLE | Float64 | Ya | |
| CHAR | FixedString | Ya | |
| VARCHAR | STRING | Ya | |
| BINARY | FixedString | Ya | |
| VARBINARY | STRING | Ya | |
| DATE | Date | Ya | |
| TIMESTAMP(0) | DateTime | Ya | Akurat hingga detik. |
| TIMESTAMP(x) | DateTime64(x) | Ya | Untuk VVR sebelum 6.0.6, presisi dipotong menjadi detik karena keterbatasan driver JDBC. VVR 6.0.6 dan versi lebih baru menulis DateTime64 dengan presisi penuh. |
| DECIMAL | DECIMAL | Ya | |
| ARRAY | ARRAY | Ya | |
| ARRAY (per field) | Nested | Ya | Petakan setiap field kolom NESTED ClickHouse ke kolom ARRAY terpisah di Flink. Lihat contoh di bawah. |
| TIME | — | Tidak | Tidak didukung. |
| MAP | — | Tidak | Tidak didukung. |
| MULTISET | — | Tidak | Tidak didukung. |
| ROW | — | Tidak | Tidak didukung. |
Bekerja dengan kolom NESTED
Untuk menulis ke kolom NESTED ClickHouse, petakan setiap sub-field ke kolom ARRAY terpisah di Flink, menggunakan konvensi penamaan ColumnName.FieldName.
Definisi tabel ClickHouse:
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
);DDL Flink yang sesuai:
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);Contoh
Menulis ke satu tabel ClickHouse
Contoh ini menggunakan konektor datagen bawaan untuk menghasilkan catatan dan menuliskannya ke tabel ClickHouse.
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;Menulis langsung ke tabel lokal dari tabel terdistribusi
Asumsikan tiga tabel lokal bernama local_table_test ada di node 192.XX.XX.1, 192.XX.XX.2, dan 192.XX.XX.3, serta sebuah tabel terdistribusi bernama distributed_table_test dibangun di atasnya.
Tentukan node secara manual
Cantumkan semua alamat node tabel lokal di url dan atur tableName ke nama tabel lokal. Catatan dengan nilai shardingKey yang sama akan dirutekan ke node yang sama.
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = 'local_table_test',
'shardWrite' = 'true',
'writeMode' = 'partition',
'shardingKey' = 'name'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;Biarkan Flink menemukan node tabel lokal secara otomatis
Atur inferLocalTable=true dan arahkan url ke node mana pun dari tabel terdistribusi. Flink melakukan kueri ke system.clusters untuk menemukan node tabel lokal secara otomatis.
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- URL dari node mana pun yang menghosting tabel terdistribusi
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = 'distributed_table_test', -- Nama tabel terdistribusi
'shardWrite' = 'true',
'inferLocalTable' = 'true', -- Temukan node tabel lokal secara otomatis
'writeMode' = 'partition',
'shardingKey' = 'name'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;FAQ
Apakah saya dapat menarik kembali (menghapus) data yang diperbarui dari tabel hasil ApsaraDB for ClickHouse?
Atur ignoreDelete=false dan definisikan kunci primer dalam pernyataan DDL. Konektor kemudian akan mengeluarkan pernyataan ALTER DELETE saat menerima pesan retract. Perlu diperhatikan bahwa hal ini secara signifikan mengurangi throughput penulisan, dan tidak dapat digunakan bersamaan dengan writeMode=partition atau exactlyOnce=true. Untuk detail selengkapnya, lihat Can I retract the updated data from an ApsaraDB for ClickHouse result table?
Kapan data yang ditulis menjadi terlihat di ClickHouse?
ClickHouse menggunakan penggabungan part secara asinkron, sehingga data mungkin tidak langsung terlihat setelah penulisan selesai. Waktu tunda tergantung pada interval penggabungan yang dikonfigurasi di kluster ClickHouse Anda. Untuk detail selengkapnya, lihat When can I view the data that is written to a ClickHouse sink table in the ClickHouse console?