Topik ini menjelaskan cara menggunakan Konektor ClickHouse.
Informasi latar belakang
ClickHouse adalah sistem manajemen basis data berorientasi kolom yang digunakan untuk Pemrosesan Analitik Online (OLAP). Untuk informasi lebih lanjut, lihat Apa itu ClickHouse?.
Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor ClickHouse.
Item | Deskripsi |
Jenis tabel | Tabel hasil |
Mode operasi | Mode batch dan mode streaming |
Format data | Tidak tersedia |
Metrik |
Catatan Untuk informasi lebih lanjut tentang metrik, lihat Metrics. |
Jenis API | SQL API |
Pembaruan atau penghapusan data dalam tabel hasil | Jika kunci utama ditentukan dalam pernyataan DDL yang digunakan untuk membuat tabel hasil Flink dan parameter ignoreDelete diatur ke false, data dalam tabel hasil dapat diperbarui atau dihapus. Namun, kinerja pemrosesan data akan berkurang secara signifikan. |
Fitur
Untuk tabel terdistribusi ClickHouse, data langsung ditulis ke tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi tersebut.
Untuk kluster ClickHouse yang diterapkan di Alibaba Cloud E-MapReduce (EMR), Anda dapat menggunakan semantik exactly-once.
Prasyarat
Tabel ClickHouse telah dibuat. Untuk informasi lebih lanjut, lihat Buat Tabel Baru.
Daftar putih dikonfigurasi untuk kluster ClickHouse.
Jika Anda menggunakan kluster ApsaraDB for ClickHouse dari Alibaba Cloud, konfigurasikan daftar putih sesuai petunjuk dalam Konfigurasikan Daftar Putih.
Jika Anda menggunakan kluster ClickHouse yang diterapkan di Alibaba Cloud EMR, konfigurasikan daftar putih sesuai petunjuk dalam Kelola Grup Keamanan.
Jika Anda menggunakan kluster ClickHouse mandiri yang di-hosting pada instance Elastic Compute Service (ECS), konfigurasikan daftar putih sesuai petunjuk dalam Ikhtisar.
Dalam kasus lain, konfigurasikan daftar putih mesin tempat kluster ClickHouse diterapkan untuk memastikan bahwa kluster ClickHouse dapat diakses oleh mesin tempat Realtime Compute for Apache Flink diterapkan.
CatatanUntuk informasi lebih lanjut tentang cara melihat blok CIDR dari vSwitch tempat Realtime Compute for Apache Flink berada, lihat FAQ tentang manajemen dan operasi ruang kerja serta namespace.
Batasan
Konektor ClickHouse tidak mendukung parameter sink.parallelism.
Tabel hasil ClickHouse mendukung semantik at-least-once.
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 3.0.2 atau versi lebih baru yang mendukung Konektor ClickHouse.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 3.0.3 atau VVR 4.0.7, atau versi minor lebih baru yang mendukung parameter ignoreDelete dalam klausa WITH.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.10 atau versi lebih baru yang mendukung tipe data NESTED dari ClickHouse.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.11 atau versi lebih baru yang memungkinkan Anda menulis data ke tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.11 atau versi lebih baru yang menyediakan semantik exactly-once untuk menulis data ke tabel kluster ClickHouse yang diterapkan di Alibaba Cloud EMR. Semantik exactly-once tidak lagi dapat digunakan untuk menulis data ke tabel kluster ClickHouse EMR V3.45.1, atau versi minor lebih baru dari EMR V5.11.1 karena perubahan kemampuan EMR ClickHouse.
Anda dapat mengatur parameter writeMode ke balance untuk menulis data secara merata ke tabel lokal ClickHouse hanya di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau versi lebih baru.
Hanya kluster ApsaraDB for ClickHouse Community-compatible Edition yang memungkinkan Anda menulis data ke tabel lokal ClickHouse.
Sintaksis
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000'
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);Parameter dalam klausa WITH
Parameter | Deskripsi | Tipe Data | Wajib | Nilai Default | Catatan |
connector | Tipe tabel hasil. | STRING | Ya | Tidak ada nilai default | Atur nilainya ke clickhouse. |
url | URL Java Database Connectivity (JDBC) dari ClickHouse. | STRING | Ya | Tidak ada nilai default | Anda harus menentukan URL dalam format Catatan Jika Anda ingin menulis data ke tabel terdistribusi ClickHouse, Anda harus menentukan URL ke JDBC URL node tempat tabel terdistribusi ClickHouse berada. |
userName | Nama pengguna yang digunakan untuk mengakses ClickHouse. | STRING | Ya | Tidak ada nilai default | Tidak tersedia. |
password | Kata sandi yang digunakan untuk mengakses ClickHouse. | STRING | Ya | Tidak ada nilai default | Tidak tersedia. |
tableName | Nama tabel ClickHouse. | STRING | Ya | Tidak ada nilai default | Tidak tersedia. |
maxRetryTimes | Jumlah maksimum percobaan ulang untuk menulis data ke tabel hasil. | INT | Tidak | 3 | Tidak tersedia. |
batchSize | Jumlah catatan data yang dapat ditulis sekaligus. | INT | Tidak | 100 | Jika jumlah catatan data dalam cache mencapai nilai parameter batchSize atau interval pembersihan cache lebih besar dari nilai parameter flushIntervalMs, sistem secara otomatis menulis data yang di-cache ke tabel ClickHouse. |
flushIntervalMs | Interval pembersihan cache. | LONG | Tidak | 1000 | Satuan: milidetik. |
ignoreDelete | Menentukan apakah akan mengabaikan pesan penghapusan. | BOOLEAN | Tidak | true | Nilai valid:
Catatan Jika Anda mengatur parameter ignoreDelete ke false, data tidak dapat ditulis ke tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse dalam mode penulisan partisi. Dalam hal ini, Anda tidak dapat mengatur parameter writeMode ke partition. |
shardWrite | Menentukan apakah akan langsung menulis data ke tabel lokal ClickHouse jika tabel saat ini adalah tabel terdistribusi ClickHouse. | BOOLEAN | Tidak | false | Nilai valid:
|
inferLocalTable | Menentukan apakah akan secara otomatis menyimpulkan informasi tentang tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse jika Anda ingin menulis data ke tabel terdistribusi ClickHouse dan langsung menulis data ke tabel lokal ClickHouse. | BOOLEAN | Tidak | false | Nilai valid:
Catatan Jika Anda ingin menulis data ke tabel non-distribusi ClickHouse, Anda tidak perlu mengonfigurasi parameter ini. |
writeMode | Kebijakan berdasarkan mana data ditulis ke tabel lokal ClickHouse. | ENUM | Tidak | default | Nilai valid:
Catatan Jika Anda mengatur parameter writeMode ke partition, pastikan parameter ignoreDelete diatur ke true. |
shardingKey | Kunci berdasarkan mana data ditulis ke tabel lokal ClickHouse yang sama pada node tertentu. | default | Tidak | Tidak ada nilai default | Jika Anda mengatur parameter writeMode ke partition, Anda harus mengonfigurasi parameter shardingKey. Nilai parameter shardingKey dapat berisi beberapa bidang. Pisahkan beberapa bidang dengan koma (,). |
exactlyOnce | Menentukan apakah akan menggunakan semantik exactly-once. | BOOLEAN | Tidak | false | Nilai valid:
Catatan
|
Pemetaan tipe data
Tipe data Realtime Compute for Apache Flink | Tipe data ClickHouse |
BOOLEAN | UInt8 / Boolean Catatan ClickHouse V21.12 dan versi lebih baru mendukung tipe data BOOLEAN.Jika versi ClickHouse yang Anda gunakan lebih lama dari V21.12, tipe data BOOLEAN dari Realtime Compute for Apache Flink sesuai dengan tipe data UINT8 dari ClickHouse. |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
BIGINT | UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
CHAR | FixedString |
VARCHAR | STRING |
BINARY | FixedString |
VARBINARY | STRING |
DATE | Date |
TIMESTAMP(0) | DateTime |
TIMESTAMP(x) | Datetime64(x) |
DECIMAL | DECIMAL |
ARRAY | ARRAY |
Nested |
ClickHouse tidak mendukung tipe data berikut dari Realtime Compute for Apache Flink: TIME, MAP, MULTISET, dan ROW.
Untuk menggunakan tipe data NESTED dari ClickHouse, Anda harus memetakan tipe data ini ke tipe data ARRAY dari Realtime Compute for Apache Flink. Contoh kode:
-- ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);Petakan tipe data NESTED dari ClickHouse ke tipe data ARRAY dari Realtime Compute for Apache Flink.
-- Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);Tipe data DATETIME dari ClickHouse dapat akurat hingga detik, dan tipe data DATETIME64 dapat akurat hingga nanodetik. Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi lebih lama dari 6.0.6, ketika driver JDBC yang disediakan oleh ClickHouse menulis data tipe DATETIME64, terjadi kehilangan presisi dan data hanya dapat akurat hingga detik. Oleh karena itu, Realtime Compute for Apache Flink hanya dapat menulis data tipe TIMESTAMP dalam detik. Nilainya ditampilkan dalam format TIMESTAMP(0). Untuk Realtime Compute for Apache Flink yang menggunakan VVR 6.0.6 atau versi lebih baru, masalah kehilangan presisi telah diselesaikan. Realtime Compute for Apache Flink dapat menulis data tipe DATETIME64 sesuai harapan.
Contoh
Contoh 1: Data ditulis ke tabel lokal ClickHouse pada node.
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;Contoh 2: Data ditulis ke tabel terdistribusi ClickHouse.
Tiga tabel lokal ClickHouse bernama local_table_test ada di node 192.XX.XX.1, 192.XX.XX.2, dan 192.XX.XX.3. Tabel terdistribusi ClickHouse bernama distributed_table_test dibuat berdasarkan tabel lokal ClickHouse.
Jika Anda ingin langsung menulis data dengan kunci yang sama ke tabel lokal ClickHouse yang sama pada node tertentu, jalankan pernyataan berikut:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;Jika Anda tidak ingin secara manual menentukan node tempat data ditulis ke tabel lokal ClickHouse dalam parameter url, jalankan pernyataan berikut untuk memungkinkan sistem secara otomatis menyimpulkan node tabel lokal ClickHouse:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- Atur parameter url ke JDBC URL node tempat tabel terdistribusi ClickHouse berada. 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'distributed_table_test', -- Atur parameter tableName ke nama tabel terdistribusi ClickHouse. 'shardWrite' = 'true', 'inferLocalTable' = 'true', -- Atur parameter inferLocalTable ke true. 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;