全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor ClickHouse

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menggunakan Konektor ClickHouse.

Informasi latar belakang

ClickHouse adalah sistem manajemen basis data berorientasi kolom yang digunakan untuk Pemrosesan Analitik Online (OLAP). Untuk informasi lebih lanjut, lihat Apa itu ClickHouse?.

Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor ClickHouse.

Item

Deskripsi

Jenis tabel

Tabel hasil

Mode operasi

Mode batch dan mode streaming

Format data

Tidak tersedia

Metrik

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

Catatan

Untuk informasi lebih lanjut tentang metrik, lihat Metrics.

Jenis API

SQL API

Pembaruan atau penghapusan data dalam tabel hasil

Jika kunci utama ditentukan dalam pernyataan DDL yang digunakan untuk membuat tabel hasil Flink dan parameter ignoreDelete diatur ke false, data dalam tabel hasil dapat diperbarui atau dihapus. Namun, kinerja pemrosesan data akan berkurang secara signifikan.

Fitur

  • Untuk tabel terdistribusi ClickHouse, data langsung ditulis ke tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi tersebut.

  • Untuk kluster ClickHouse yang diterapkan di Alibaba Cloud E-MapReduce (EMR), Anda dapat menggunakan semantik exactly-once.

Prasyarat

  • Tabel ClickHouse telah dibuat. Untuk informasi lebih lanjut, lihat Buat Tabel Baru.

  • Daftar putih dikonfigurasi untuk kluster ClickHouse.

    • Jika Anda menggunakan kluster ApsaraDB for ClickHouse dari Alibaba Cloud, konfigurasikan daftar putih sesuai petunjuk dalam Konfigurasikan Daftar Putih.

    • Jika Anda menggunakan kluster ClickHouse yang diterapkan di Alibaba Cloud EMR, konfigurasikan daftar putih sesuai petunjuk dalam Kelola Grup Keamanan.

    • Jika Anda menggunakan kluster ClickHouse mandiri yang di-hosting pada instance Elastic Compute Service (ECS), konfigurasikan daftar putih sesuai petunjuk dalam Ikhtisar.

    • Dalam kasus lain, konfigurasikan daftar putih mesin tempat kluster ClickHouse diterapkan untuk memastikan bahwa kluster ClickHouse dapat diakses oleh mesin tempat Realtime Compute for Apache Flink diterapkan.

    Catatan

    Untuk informasi lebih lanjut tentang cara melihat blok CIDR dari vSwitch tempat Realtime Compute for Apache Flink berada, lihat FAQ tentang manajemen dan operasi ruang kerja serta namespace.

Batasan

  • Konektor ClickHouse tidak mendukung parameter sink.parallelism.

  • Tabel hasil ClickHouse mendukung semantik at-least-once.

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 3.0.2 atau versi lebih baru yang mendukung Konektor ClickHouse.

  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 3.0.3 atau VVR 4.0.7, atau versi minor lebih baru yang mendukung parameter ignoreDelete dalam klausa WITH.

  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.10 atau versi lebih baru yang mendukung tipe data NESTED dari ClickHouse.

  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.11 atau versi lebih baru yang memungkinkan Anda menulis data ke tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse.

  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.11 atau versi lebih baru yang menyediakan semantik exactly-once untuk menulis data ke tabel kluster ClickHouse yang diterapkan di Alibaba Cloud EMR. Semantik exactly-once tidak lagi dapat digunakan untuk menulis data ke tabel kluster ClickHouse EMR V3.45.1, atau versi minor lebih baru dari EMR V5.11.1 karena perubahan kemampuan EMR ClickHouse.

  • Anda dapat mengatur parameter writeMode ke balance untuk menulis data secara merata ke tabel lokal ClickHouse hanya di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau versi lebih baru.

  • Hanya kluster ApsaraDB for ClickHouse Community-compatible Edition yang memungkinkan Anda menulis data ke tabel lokal ClickHouse.

Sintaksis

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

Parameter dalam klausa WITH

Parameter

Deskripsi

Tipe Data

Wajib

Nilai Default

Catatan

connector

Tipe tabel hasil.

STRING

Ya

Tidak ada nilai default

Atur nilainya ke clickhouse.

url

URL Java Database Connectivity (JDBC) dari ClickHouse.

STRING

Ya

Tidak ada nilai default

Anda harus menentukan URL dalam format jdbc:clickhouse://<yourNetworAddress>:<PortId>/<yourDatabaseName>. Jika Anda ingin langsung menulis data ke tabel lokal ClickHouse, Anda dapat menjalankan pernyataan select * from system.clusters untuk mendapatkan alamat IP node tempat data ditulis ke tabel lokal ClickHouse. Jika Anda tidak menentukan nama database, database bernama default akan digunakan.

Catatan

Jika Anda ingin menulis data ke tabel terdistribusi ClickHouse, Anda harus menentukan URL ke JDBC URL node tempat tabel terdistribusi ClickHouse berada.

userName

Nama pengguna yang digunakan untuk mengakses ClickHouse.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

password

Kata sandi yang digunakan untuk mengakses ClickHouse.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

tableName

Nama tabel ClickHouse.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

maxRetryTimes

Jumlah maksimum percobaan ulang untuk menulis data ke tabel hasil.

INT

Tidak

3

Tidak tersedia.

batchSize

Jumlah catatan data yang dapat ditulis sekaligus.

INT

Tidak

100

Jika jumlah catatan data dalam cache mencapai nilai parameter batchSize atau interval pembersihan cache lebih besar dari nilai parameter flushIntervalMs, sistem secara otomatis menulis data yang di-cache ke tabel ClickHouse.

flushIntervalMs

Interval pembersihan cache.

LONG

Tidak

1000

Satuan: milidetik.

ignoreDelete

Menentukan apakah akan mengabaikan pesan penghapusan.

BOOLEAN

Tidak

true

Nilai valid:

  • true: Pesan penghapusan diabaikan. Ini adalah nilai default.

  • false: Pesan penghapusan tidak diabaikan.

    Jika Anda mengatur parameter ini ke false dan menentukan kunci utama dalam pernyataan DDL, sistem akan menjalankan pernyataan ALTER untuk menghapus data dari tabel ClickHouse.

Catatan

Jika Anda mengatur parameter ignoreDelete ke false, data tidak dapat ditulis ke tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse dalam mode penulisan partisi. Dalam hal ini, Anda tidak dapat mengatur parameter writeMode ke partition.

shardWrite

Menentukan apakah akan langsung menulis data ke tabel lokal ClickHouse jika tabel saat ini adalah tabel terdistribusi ClickHouse.

BOOLEAN

Tidak

false

Nilai valid:

  • false: Sistem menulis data ke tabel terdistribusi ClickHouse dan kemudian ke tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse. Ini adalah nilai default. Dalam hal ini, jika Anda mengatur parameter shardWrite ke false, Anda harus mengatur parameter tableName ke nama tabel terdistribusi ClickHouse.

  • true: Sistem melewati tabel terdistribusi ClickHouse dan langsung menulis data ke tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse.

    Jika Anda ingin meningkatkan throughput untuk menulis data ke tabel terdistribusi ClickHouse, kami sarankan Anda mengatur parameter ini ke true.

    • Jika Anda ingin secara manual menentukan node tempat data ditulis ke tabel lokal ClickHouse dalam parameter url, Anda harus mengatur parameter tableName ke nama tabel lokal ClickHouse. Contoh:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002/default'
      'tableName' = 'local_table'
    • Jika Anda tidak ingin secara manual menentukan node tempat data ditulis ke tabel lokal ClickHouse dalam parameter url, Anda dapat mengonfigurasi parameter inferLocalTable bersama dengan parameter shardWrite untuk memungkinkan Realtime Compute for Apache Flink secara otomatis menyimpulkan node tabel lokal ClickHouse. Dalam hal ini, Anda harus mengatur parameter tableName ke nama tabel terdistribusi ClickHouse dan parameter url ke JDBC URL node tempat tabel terdistribusi ClickHouse berada. Contoh:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default' // Atur parameter url ke JDBC URL node tempat tabel terdistribusi ClickHouse berada.
      'tableName' = 'distribute_table'

inferLocalTable

Menentukan apakah akan secara otomatis menyimpulkan informasi tentang tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse jika Anda ingin menulis data ke tabel terdistribusi ClickHouse dan langsung menulis data ke tabel lokal ClickHouse.

BOOLEAN

Tidak

false

Nilai valid:

  • false: Jika Anda ingin menulis data ke tabel terdistribusi ClickHouse dan hanya menentukan satu node dalam parameter url, sistem tidak secara otomatis menyimpulkan informasi tentang tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse. Sistem menulis data ke tabel terdistribusi ClickHouse dan kemudian ke tabel lokal ClickHouse. Ini adalah nilai default.

  • true: Sistem secara otomatis menyimpulkan informasi tentang tabel lokal ClickHouse yang sesuai dengan tabel terdistribusi ClickHouse dan langsung menulis data ke tabel lokal ClickHouse. Dalam hal ini, jika Anda mengatur parameter inferLocalTable ke true, Anda harus mengatur parameter shardWrite ke true, parameter tableName ke nama tabel terdistribusi ClickHouse, dan parameter url ke JDBC URL node tempat tabel terdistribusi ClickHouse berada.

Catatan

Jika Anda ingin menulis data ke tabel non-distribusi ClickHouse, Anda tidak perlu mengonfigurasi parameter ini.

writeMode

Kebijakan berdasarkan mana data ditulis ke tabel lokal ClickHouse.

ENUM

Tidak

default

Nilai valid:

  • default: Data ditulis ke tabel lokal ClickHouse pada node pertama kluster ClickHouse. Ini adalah nilai default.

  • partition: Data dengan kunci yang sama ditulis ke tabel lokal ClickHouse yang sama pada node tertentu.

  • random: Data ditulis secara acak ke tabel lokal ClickHouse pada node.

  • balance: Algoritma round-robin digunakan untuk menulis data secara merata ke tabel lokal ClickHouse pada node.

Catatan

Jika Anda mengatur parameter writeMode ke partition, pastikan parameter ignoreDelete diatur ke true.

shardingKey

Kunci berdasarkan mana data ditulis ke tabel lokal ClickHouse yang sama pada node tertentu.

default

Tidak

Tidak ada nilai default

Jika Anda mengatur parameter writeMode ke partition, Anda harus mengonfigurasi parameter shardingKey. Nilai parameter shardingKey dapat berisi beberapa bidang. Pisahkan beberapa bidang dengan koma (,).

exactlyOnce

Menentukan apakah akan menggunakan semantik exactly-once.

BOOLEAN

Tidak

false

Nilai valid:

  • true: Semantik exactly-once digunakan.

  • false: Semantik exactly-once tidak digunakan. Ini adalah nilai default.

Catatan
  • Anda dapat menggunakan semantik exactly-once untuk menulis data hanya ke kluster ClickHouse yang diterapkan di Alibaba Cloud EMR. Oleh karena itu, Anda dapat mengatur parameter ini ke true hanya jika Anda ingin menulis data ke kluster ClickHouse yang diterapkan di Alibaba Cloud EMR.

  • Jika Anda mengatur parameter writeMode ke partition dan Anda ingin menulis data ke tabel lokal ClickHouse, Anda tidak dapat menggunakan semantik exactly-once. Oleh karena itu, jika Anda mengatur parameter exactlyOnce ke true, Anda tidak dapat mengatur parameter writeMode ke partition.

Pemetaan tipe data

Tipe data Realtime Compute for Apache Flink

Tipe data ClickHouse

BOOLEAN

UInt8 / Boolean

Catatan

ClickHouse V21.12 dan versi lebih baru mendukung tipe data BOOLEAN.Jika versi ClickHouse yang Anda gunakan lebih lama dari V21.12, tipe data BOOLEAN dari Realtime Compute for Apache Flink sesuai dengan tipe data UINT8 dari ClickHouse.

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

BIGINT

UInt32

FLOAT

Float32

DOUBLE

Float64

CHAR

FixedString

VARCHAR

STRING

BINARY

FixedString

VARBINARY

STRING

DATE

Date

TIMESTAMP(0)

DateTime

TIMESTAMP(x)

Datetime64(x)

DECIMAL

DECIMAL

ARRAY

ARRAY

Nested

Catatan

ClickHouse tidak mendukung tipe data berikut dari Realtime Compute for Apache Flink: TIME, MAP, MULTISET, dan ROW.

Untuk menggunakan tipe data NESTED dari ClickHouse, Anda harus memetakan tipe data ini ke tipe data ARRAY dari Realtime Compute for Apache Flink. Contoh kode:

-- ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);

Petakan tipe data NESTED dari ClickHouse ke tipe data ARRAY dari Realtime Compute for Apache Flink.

-- Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);
Catatan

Tipe data DATETIME dari ClickHouse dapat akurat hingga detik, dan tipe data DATETIME64 dapat akurat hingga nanodetik. Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi lebih lama dari 6.0.6, ketika driver JDBC yang disediakan oleh ClickHouse menulis data tipe DATETIME64, terjadi kehilangan presisi dan data hanya dapat akurat hingga detik. Oleh karena itu, Realtime Compute for Apache Flink hanya dapat menulis data tipe TIMESTAMP dalam detik. Nilainya ditampilkan dalam format TIMESTAMP(0). Untuk Realtime Compute for Apache Flink yang menggunakan VVR 6.0.6 atau versi lebih baru, masalah kehilangan presisi telah diselesaikan. Realtime Compute for Apache Flink dapat menulis data tipe DATETIME64 sesuai harapan.

Contoh

  • Contoh 1: Data ditulis ke tabel lokal ClickHouse pada node.

    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • Contoh 2: Data ditulis ke tabel terdistribusi ClickHouse.

    Tiga tabel lokal ClickHouse bernama local_table_test ada di node 192.XX.XX.1, 192.XX.XX.2, dan 192.XX.XX.3. Tabel terdistribusi ClickHouse bernama distributed_table_test dibuat berdasarkan tabel lokal ClickHouse.

    • Jika Anda ingin langsung menulis data dengan kunci yang sama ke tabel lokal ClickHouse yang sama pada node tertentu, jalankan pernyataan berikut:

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'local_table_test',
        'shardWrite' = 'true',
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;
    • Jika Anda tidak ingin secara manual menentukan node tempat data ditulis ke tabel lokal ClickHouse dalam parameter url, jalankan pernyataan berikut untuk memungkinkan sistem secara otomatis menyimpulkan node tabel lokal ClickHouse:

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- Atur parameter url ke JDBC URL node tempat tabel terdistribusi ClickHouse berada. 
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'distributed_table_test', -- Atur parameter tableName ke nama tabel terdistribusi ClickHouse. 
        'shardWrite' = 'true',
        'inferLocalTable' = 'true', -- Atur parameter inferLocalTable ke true. 
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;

FAQ