すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ClickHouse コネクタ

最終更新日:Mar 26, 2026

ClickHouse コネクタを使用すると、Flink SQL のデータをバッチモードおよびストリーミングモードの両方で ClickHouse の結果テーブルに書き込むことができます。標準テーブル、分散テーブル、およびローカルテーブルへの直接書き込みをサポートします。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

コネクタの機能

項目説明
テーブルの種類結果テーブル
実行モードバッチモードおよびストリーミングモード
データフォーマット該当なし
メトリクスnumRecordsOutnumRecordsOutPerSecondcurrentSendTime。詳細については、「メトリック」をご参照ください。
API の種類SQL API
データの更新および削除DDL 文でプライマリキーが定義されており、ignoreDeletefalse に設定されている場合にサポートされます。この機能を有効化すると、書き込みスループットが大幅に低下します。

制限事項

  • sink.parallelism パラメーターはサポートされていません。

  • 結果テーブルは、デフォルトで at-least-once セマンティクスをサポートします。

  • Ververica Runtime (VVR) 3.0.2 以降が必要です。

  • ignoreDelete パラメーターは、VVR 3.0.3、VVR 4.0.7、またはそれ以降のマイナーバージョンを必要とします。

  • NESTED データの型は、VVR 4.0.10 以降が必要です。

  • 分散テーブルに対応する ClickHouse のローカルテーブルへの直接書き込みには、VVR 4.0.11 以降が必要です。これは、ApsaraDB for ClickHouse Community 互換エディションクラスターでのみサポートされます。

  • EMR ClickHouse クラスターにおける 1 回限りのセマンティクスは、VVR 4.0.11 以降が必要です。ただし、EMR ClickHouse の機能変更により、EMR V3.45.1 クラスターおよび EMR V5.11.1 より後のマイナーバージョンでは利用できなくなりました。

  • balance 書き込みモードは、VVR 8.0.7 以降が必要です。

構文

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000',
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH 句のパラメーター

必須パラメーター

パラメーターデータの型説明
connectorSTRINGコネクタの種類。値を clickhouse に設定します。
urlSTRINGClickHouse の JDBC URL。形式は jdbc:clickhouse://<yourNetworkAddress>:<PortId>/<yourDatabaseName> です。データベース名を省略した場合、default データベースが使用されます。分散テーブルに書き込む場合は、分散テーブルをホストするノードの JDBC URL を指定します。手動でノードを指定してローカルテーブルに直接書き込む場合は、カンマ区切りで複数のノードアドレスを列挙します(下記の例を参照)。
userNameSTRINGClickHouse のユーザー名。
passwordSTRINGClickHouse のパスワード。
tableNameSTRINGClickHouse テーブルの名前。ローカルテーブルに直接書き込む場合(shardWrite=true)、inferLocalTable=true でない限り、この値をローカルテーブル名に設定します。この場合、分散テーブル名を指定します。

オプションパラメーター

パラメーターデータの型デフォルト値説明
maxRetryTimesINT3書き込み失敗時の最大再試行回数。
batchSizeINT100フラッシュ前にバッファーに保持されるレコード数。バッファー内のレコード数が batchSize に達した場合、または flushIntervalMs の時間間隔が経過した場合、いずれか早いタイミングでフラッシュが実行されます。
flushIntervalMsLONG1000フラッシュ間の最大間隔(ミリ秒単位)。
ignoreDeleteBOOLEANtrueDELETE メッセージを破棄するかどうかを指定します。この値を false に設定し、かつプライマリキーが定義されている場合、コネクタは一致する行を削除する ALTER 文を実行します。この値を false に設定すると、スループットが大幅に低下し、writeMode=partition との併用はできません。
shardWriteBOOLEANfalse分散テーブルをバイパスして、基盤となるローカルテーブルに直接書き込むかどうかを指定します。true に設定すると、分散テーブルへの書き込みスループットが向上します。false の場合、tableName を分散テーブル名に設定します。
inferLocalTableBOOLEANfalseshardWrite=true の場合、分散テーブルのメタデータから Flink が自動的にローカルテーブルのノードを検出するかどうかを指定します。この機能を利用するには、shardWrite=truetableName が分散テーブル名に設定され、url が分散テーブルをホストするノードを指す必要があります。非分散テーブルでは不要です。
writeModeENUMdefaultローカルテーブルへの書き込みディストリビューション戦略。ガイドについては、「書き込みモードの選択」をご参照ください。有効な値: defaultpartitionrandombalance
shardingKeySTRINGなしwriteMode=partition の場合に、レコードをルーティングするために使用するフィールド(またはカンマ区切りの複数フィールド)。同じキー値を持つレコードは、同じローカルテーブルノードに送信されます。
exactlyOnceBOOLEANfalse1 回限りのセマンティクスを使用するかどうかを指定します。Alibaba Cloud EMR にデプロイされた ClickHouse クラスターでのみサポートされます。writeMode=partition とは併用できません。

書き込みモードの選択

writeMode パラメーターは、shardWrite=true で ClickHouse の分散テーブルに書き込む際、レコードをローカルテーブルノード間でどのように分散するかを制御します。

書き込みモード配布戦略使用するケース
defaultすべての書き込みが最初のノードに送信されるシングルノードクラスターへの書き込み、またはテスト時
partition同じ shardingKey を持つレコードは常に同じノードに送信される関連するレコードを 1 つのノード上に配置する必要がある場合
random各レコードがランダムに選択されたノードに書き込まれるキーによるアフィニティを必要とせず、単純な負荷分散を希望する場合
balanceレコードがラウンドロビン方式でノード間で分散されるキーによるアフィニティを必要とせず、均等な分散を希望する場合(VVR 8.0.7 以降が必要)
writeMode=partitionignoreDelete=true を必要とし、exactlyOnce=true とは併用できません。

データの型のマッピング

Flink の型ClickHouse の型対応状況備考
BOOLEANUInt8 / BooleanはいClickHouse V21.12 以降ではネイティブの BOOLEAN 型がサポートされています。それ以前のバージョンでは UInt8 にマッピングされます。
TINYINTInt8はい
SMALLINTInt16はい
INTEGERInt32はい
BIGINTInt64はい
BIGINTUInt32はい
FLOATFloat32はい
DOUBLEFloat64はい
CHARFixedStringはい
VARCHARSTRINGはい
BINARYFixedStringはい
VARBINARYSTRINGはい
DATEDateはい
TIMESTAMP(0)DateTimeはい秒単位の精度。
TIMESTAMP(x)DateTime64(x)はいVVR 6.0.6 より前のバージョンでは、JDBC ドライバーの制限により、精度が秒単位に丸められます。VVR 6.0.6 以降では、DateTime64 を完全な精度で書き込みます。
DECIMALDECIMALはい
ARRAYARRAYはい
ARRAY(フィールドごと)NestedはいClickHouse の NESTED カラムの各サブフィールドを、Flink の個別の ARRAY カラムにマップします。下記の例を参照してください。
TIMEなしサポートされていません。
MAP非対応サポートされていません。
MULTISETいいえサポートされていません。
ROWいいえサポートされていません。

NESTED カラムの使用方法

ClickHouse の NESTED カラムに書き込むには、ColumnName.FieldName の命名規則を使用して、各サブフィールドを Flink の個別の ARRAY カラムにマップします。

ClickHouse のテーブル定義:

CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
);

対応する Flink DDL:

CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID`      ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);

サンプル

単一の ClickHouse テーブルへの書き込み

このサンプルでは、組み込みの datagen コネクタを使用してレコードを生成し、それを ClickHouse テーブルに書き込みます。

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'      = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url'       = '<yourUrl>',
  'userName'  = '<yourUsername>',
  'password'  = '<yourPassword>',
  'tableName' = '<yourTablename>'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

分散テーブルのローカルテーブルへの直接書き込み

ノード 192.XX.XX.1192.XX.XX.2、および 192.XX.XX.3 上に、それぞれ local_table_test という名前のローカルテーブルが 3 つ存在し、これらを基盤として distributed_table_test という名前の分散テーブルが構築されていると仮定します。

ノードを手動で指定

url にすべてのローカルテーブルノードのアドレスを列挙し、tableName をローカルテーブル名に設定します。shardingKey の値が同じレコードは、同じノードにルーティングされます。

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'       = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'  = 'clickhouse',
  'url'        = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
  'userName'   = '<yourUsername>',
  'password'   = '<yourPassword>',
  'tableName'  = 'local_table_test',
  'shardWrite' = 'true',
  'writeMode'  = 'partition',
  'shardingKey' = 'name'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

Flink によるローカルテーブルノードの自動検出

inferLocalTable=true を設定し、url を分散テーブルをホストする任意のノードに指定します。Flink は system.clusters をクエリして、ローカルテーブルノードを自動的に検出します。

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'       = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'      = 'clickhouse',
  'url'            = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- 分散テーブルをホストする任意のノードの URL
  'userName'       = '<yourUsername>',
  'password'       = '<yourPassword>',
  'tableName'      = 'distributed_table_test',                    -- 分散テーブルの名前
  'shardWrite'     = 'true',
  'inferLocalTable' = 'true',                                     -- ローカルテーブルノードを自動検出
  'writeMode'      = 'partition',
  'shardingKey'    = 'name'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

よくある質問

ApsaraDB for ClickHouse の結果テーブルから更新済みデータを撤回(削除)できますか?

ignoreDelete=false を設定し、DDL 文でプライマリキーを定義します。その後、コネクタは撤回メッセージを受信した際に ALTER DELETE 文を発行します。ただし、この機能を有効化すると、書き込みスループットが大幅に低下し、writeMode=partitionexactlyOnce=true との併用はできません。詳細については、「ApsaraDB for ClickHouse の結果テーブルから更新済みデータを撤回できますか?」をご参照ください。

書き込んだデータは、ClickHouse でいつから表示可能になりますか?

ClickHouse は非同期のパートマージを使用するため、書き込み完了直後にはデータが即座に表示されない場合があります。遅延時間は、ClickHouse クラスターで設定されたマージ間隔によって異なります。詳細については、「ClickHouse コンソールで ClickHouse sink テーブルに書き込まれたデータをいつ確認できますか?」をご参照ください。