すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ClickHouse コネクタ

最終更新日:May 23, 2026

このトピックでは、ClickHouse コネクタを使用して ClickHouse にデータを書き込む方法について説明します。

背景情報

ClickHouse は、オンライン分析処理 (OLAP) 向けの列指向データベース管理システムです。 詳細については、「ClickHouse とは」をご参照ください。

次の表に、ClickHouse コネクタの機能を示します。

カテゴリ

詳細

サポートタイプ

結果テーブルのみ

実行モード

バッチおよびストリーミングモード

データ形式

該当なし

コネクタ固有のメトリクス

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

説明

これらのメトリクスの詳細については、「メトリクスのモニタリング」をご参照ください。

API タイプ

SQL

結果テーブル内のデータの更新または削除のサポート

コネクタは、Flink 結果テーブルの DDL でプライマリキーを指定し、ignoreDelete パラメータを false に設定した場合に更新と削除をサポートします。 ただし、このオプションはパフォーマンスを大幅に低下させます。

機能

  • ClickHouse 分散テーブルのローカルテーブルにデータを直接書き込みます。

  • Alibaba Cloud E-MapReduce (EMR) 上の ClickHouse に書き込む際に、exactly-once セマンティクスを実現します。

前提条件

  • ClickHouse テーブルを作成します。 詳細については、「新しいテーブルの作成」をご参照ください。

  • ホワイトリストを設定します。

    • ApsaraDB for ClickHouse を使用する場合は、「ホワイトリストの設定」をご参照ください。

    • Alibaba Cloud E-MapReduce (EMR) 上の ClickHouse を使用する場合は、「セキュリティグループの管理」をご参照ください。

    • ECS インスタンス上のセルフマネージド ClickHouse クラスターを使用する場合は、「セキュリティグループの概要」をご参照ください。

    • その他のすべての設定では、ClickHouse がデプロイされているマシンでホワイトリストを設定し、Realtime Compute for Apache Flink デプロイメントからのアクセスを許可する必要があります。

    説明

    Realtime Compute for Apache Flink の vSwitch ネットワークセグメントを表示するには、「ホワイトリストの設定方法」をご参照ください。

制限事項

  • sink.parallelism パラメータはサポートされていません。

  • デフォルトでは、ClickHouse シンクは at-least-once セマンティクスを提供します。

  • ClickHouse コネクタは、Ververica Runtime (VVR) 3.0.2 以降でのみサポートされています。

  • ignoreDelete オプションは、VVR 3.0.3、VVR 4.0.7 以降のバージョンでのみサポートされています。

  • ClickHouse Nested データ型は、VVR 4.0.10 以降でのみサポートされています。

  • 分散テーブルのローカルテーブルへの直接書き込みは、VVR 4.0.11 以降でのみサポートされています。

  • EMR 上の ClickHouse への書き込みにおける exactly-once セマンティクスは、VVR 4.0.11 以降で利用可能です。ただし、製品機能の変更により、このセマンティクスは、ClickHouse on EMR のバージョンが v3.45.1 または v5.11.1 より後の場合は利用できません。

  • ローカルテーブルノード間でデータを均等に分散する balance 書き込みモードは、VVR 8.0.7 以降でのみ利用可能です。

  • ClickHouse ローカルテーブルへの書き込みは、ApsaraDB for ClickHouse Community-compatible Edition でのみサポートされています。

  • ClickHouse カタログを登録する際、デフォルトのデータベース名にハイフン (-) が含まれていると、JDBC URL の検証が失敗します。

構文

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000',
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH パラメータ

パラメータ

説明

必須

デフォルト

備考

connector

結果テーブルのタイプ。

String

はい

N/A

値を clickhouse に設定します。

url

ClickHouse クラスターの JDBC URL。

String

はい

N/A

URL 形式は jdbc:clickhouse://<yourNetworkAddress>:<port>/<yourDatabaseName> です。 ローカルテーブルに直接書き込む場合、ClickHouse で select * from system.clusters を実行してノードの IP アドレスを取得できます。 データベース名を指定しない場合は、default データベースが使用されます。

説明

ClickHouse 分散テーブルにデータを書き込む場合、url は分散テーブルをホストするクラスター内のノードの JDBC URL である必要があります。

userName

ClickHouse へのアクセスに使用するユーザー名。

String

はい

N/A

N/A

password

ClickHouse へのアクセスに使用するパスワード。

String

はい

N/A

N/A

tableName

ClickHouse テーブルの名前。

String

はい

N/A

N/A

maxRetryTimes

データ挿入に失敗した場合の最大リトライ回数。

Int

いいえ

3

N/A

batchSize

1 回のバッチで書き込むレコード数。

Int

いいえ

100

キャッシュ内のデータエントリ数が batchSize パラメータの値に達するか、待機時間が flushIntervalMs を超えると、システムはキャッシュから ClickHouse テーブルにデータを自動的に書き込みます。

flushIntervalMs

バッファをフラッシュする時間間隔 (ミリ秒単位)。

Long

いいえ

1000

単位はミリ秒です。

ignoreDelete

削除メッセージを無視するかどうかを指定します。

Boolean

いいえ

true

有効な値:

  • true (デフォルト):削除メッセージを無視します。

  • false:削除メッセージを無視しません。

    このオプションを false に設定し、DDL でプライマリキーが定義されている場合、コネクタは ALTER ステートメントを使用して ClickHouse のデータを削除します。

説明

ignoreDeletefalse に設定した場合、partition 書き込みモードは使用できません。

shardWrite

ClickHouse 分散テーブルの場合、基になるローカルテーブルにデータを直接書き込むかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false (デフォルト):コネクタは分散テーブルにデータを書き込み、分散テーブルが対応するローカルテーブルにデータをルーティングします。 tableName パラメータは分散テーブルの名前である必要があります。

  • true:コネクタは分散テーブルをバイパスして、ローカルテーブルにデータを直接書き込みます。

    書き込みスループットを向上させるために推奨されます。

    • url で書き込み先のローカルテーブルを手動で指定する場合、tableName にはローカルテーブルの名前を指定する必要があります。 例:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002/default'
      'tableName' = 'local_table'
    • ノードを手動で指定したくない場合は、inferLocalTable パラメータを true に設定して、Flink にローカルテーブルノードを自動的に検出させることができます。 この場合、tableName には分散テーブルの名前を、url にはクラスター内のノードの JDBC URL を指定する必要があります。 例:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default' // クラスター内のノードの JDBC URL。
      'tableName' = 'distribute_table'

inferLocalTable

ClickHouse 分散テーブルに書き込むときに、基になるローカルテーブルを自動的に検出し、それらに直接書き込むかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false (デフォルト):分散テーブルに書き込み、url パラメータでノードを 1 つだけ指定した場合、コネクタはローカルテーブルの検出を試みません。 分散テーブルに書き込み、そのテーブルがデータをローカルテーブルにルーティングします。

  • true:Flink はローカルテーブルの検出を試み、それらに直接書き込みます。 この機能を使用するには、shardWritetrue に設定し、tableName には分散テーブルの名前を、url にはクラスター内のノードの JDBC URL を指定する必要があります。

説明

このパラメータは、非分散テーブルへの書き込み時には無視されます。

writeMode

分散テーブルのローカルテーブルにデータを書き込むためのストラテジー。

Enum

いいえ

default

有効な値:

  • default (デフォルト):URL で指定された最初のノードのローカルテーブルに常に書き込みます。

  • partition:シャーディングキーに基づいてデータを分散し、同じキーを持つレコードが同じローカルテーブルノードに書き込まれるようにします。

  • random:ランダムに選択されたローカルテーブルノードにデータを書き込みます。

  • balance:ラウンドロビンストラテジーを使用して、すべてのローカルテーブルノードにデータを均等に分散します。

説明

writeModepartition に設定した場合は、ignoreDeletetrue に設定する必要があります。

shardingKey

ローカルテーブルノード間でデータをパーティション分割するために使用されるキー。

String

いいえ

N/A

writeMode が 'partition' に設定されている場合、shardingKey パラメータは必須です。 コンマ (,) で区切られた複数のフィールドを含めることができます。

exactlyOnce

exactly-once セマンティクスを有効にするかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true:exactly-once セマンティクスを有効にします。

  • false (デフォルト):exactly-once セマンティクスを無効にします。

説明
  • exactly-once セマンティクスは、EMR 上の ClickHouse への書き込みでのみサポートされています。 シンクが EMR 上の ClickHouse クラスターである場合にのみ、このパラメータを true に設定してください。

  • パーティションストラテジーを使用した ClickHouse ローカルテーブルへの書き込みでは、exactly-once セマンティクスはサポートされていません。 したがって、exactlyOncetrue に設定されている場合、writeModepartition に設定することはできません。

データ型のマッピング

Flink 型

ClickHouse 型

BOOLEAN

UInt8 / Boolean

説明

ClickHouse v21.12 以降は Boolean 型をサポートしています。 それ以前のバージョンでは、Flink の BOOLEAN 型は ClickHouse の UInt8 型にマッピングされます。

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

BIGINT

UInt32

FLOAT

Float32

DOUBLE

Float64

CHAR

FixedString

VARCHAR

String

BINARY

FixedString

VARBINARY

String

DATE

Date

TIMESTAMP(0)

DateTime

TIMESTAMP(x)

Datetime64(x)

DECIMAL

DECIMAL

ARRAY

ARRAY

Nested

説明

ClickHouse コネクタは、Flink の TIME、MAP、MULTISET、および ROW 型をサポートしていません。

ClickHouse の Nested データ型を使用するには、Flink の ARRAY 型にマッピングする必要があります。 例:

-- ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);

次のように型をマッピングします。

-- Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);
説明

VVR 6.0.6 より前のバージョンでは、公式の ClickHouse JDBC ドライバーで DateTime64 データを書き込むと、値が秒単位に切り捨てられて精度が失われる問題がありました。 その結果、TIMESTAMP(0) のような秒単位の精度の TIMESTAMP データしか正常に書き込めませんでした。 VVR 6.0.6 以降ではこの問題が解決され、正確な DateTime64 データの書き込みが可能になりました。

  • 例 1:シングルノードテーブルへの書き込み。

    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • 例 2:分散テーブルへの書き込み。

    ノード 192.XX.XX.1、192.XX.XX.2、192.XX.XX.3 にある local_table_test という名前の 3 つのローカルテーブルから、distributed_table_test という名前の分散テーブルが作成されていると仮定します。

    • Flink でローカルテーブルにデータを直接書き込み、キーによってデータをパーティション分割するには、次の DDL を使用します。

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'local_table_test',
        'shardWrite' = 'true',
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;
    • URL で手動で指定する代わりに、Flink にローカルテーブルノードを自動的に検出させるには、次の DDL を使用します。

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- クラスター内のノードの JDBC URL。
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'distributed_table_test', -- 分散テーブルの名前。
        'shardWrite' = 'true',
        'inferLocalTable' = 'true', -- inferLocalTable を true に設定します。
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;

よくある質問