ClickHouse コネクタを使用すると、Flink SQL のデータをバッチモードおよびストリーミングモードの両方で ClickHouse の結果テーブルに書き込むことができます。標準テーブル、分散テーブル、およびローカルテーブルへの直接書き込みをサポートします。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
ClickHouse テーブルが存在すること。詳細については、「新しいテーブルの作成」をご参照ください。
ClickHouse クラスターのホワイトリストが設定されていること:
ApsaraDB for ClickHouse クラスター:「ホワイトリストの設定」
Alibaba Cloud E-MapReduce (EMR) ClickHouse クラスター:「セキュリティグループの管理」
Elastic Compute Service (ECS) 上の自社管理型 ClickHouse:「概要」
その他のデプロイメント: Realtime Compute for Apache Flink で使用される vSwitch の CIDR ブロックを、ClickHouse クラスターを実行しているマシンの許可リストに追加します。CIDR ブロックを確認するには、「ワークスペースおよび名前空間の管理と操作に関するよくある質問」をご参照ください。
コネクタの機能
| 項目 | 説明 |
|---|---|
| テーブルの種類 | 結果テーブル |
| 実行モード | バッチモードおよびストリーミングモード |
| データフォーマット | 該当なし |
| メトリクス | numRecordsOut、numRecordsOutPerSecond、currentSendTime。詳細については、「メトリック」をご参照ください。 |
| API の種類 | SQL API |
| データの更新および削除 | DDL 文でプライマリキーが定義されており、ignoreDelete が false に設定されている場合にサポートされます。この機能を有効化すると、書き込みスループットが大幅に低下します。 |
制限事項
sink.parallelismパラメーターはサポートされていません。結果テーブルは、デフォルトで at-least-once セマンティクスをサポートします。
Ververica Runtime (VVR) 3.0.2 以降が必要です。
ignoreDeleteパラメーターは、VVR 3.0.3、VVR 4.0.7、またはそれ以降のマイナーバージョンを必要とします。NESTED データの型は、VVR 4.0.10 以降が必要です。
分散テーブルに対応する ClickHouse のローカルテーブルへの直接書き込みには、VVR 4.0.11 以降が必要です。これは、ApsaraDB for ClickHouse Community 互換エディションクラスターでのみサポートされます。
EMR ClickHouse クラスターにおける 1 回限りのセマンティクスは、VVR 4.0.11 以降が必要です。ただし、EMR ClickHouse の機能変更により、EMR V3.45.1 クラスターおよび EMR V5.11.1 より後のマイナーバージョンでは利用できなくなりました。
balance書き込みモードは、VVR 8.0.7 以降が必要です。
構文
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000',
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);WITH 句のパラメーター
必須パラメーター
| パラメーター | データの型 | 説明 |
|---|---|---|
connector | STRING | コネクタの種類。値を clickhouse に設定します。 |
url | STRING | ClickHouse の JDBC URL。形式は jdbc:clickhouse://<yourNetworkAddress>:<PortId>/<yourDatabaseName> です。データベース名を省略した場合、default データベースが使用されます。分散テーブルに書き込む場合は、分散テーブルをホストするノードの JDBC URL を指定します。手動でノードを指定してローカルテーブルに直接書き込む場合は、カンマ区切りで複数のノードアドレスを列挙します(下記の例を参照)。 |
userName | STRING | ClickHouse のユーザー名。 |
password | STRING | ClickHouse のパスワード。 |
tableName | STRING | ClickHouse テーブルの名前。ローカルテーブルに直接書き込む場合(shardWrite=true)、inferLocalTable=true でない限り、この値をローカルテーブル名に設定します。この場合、分散テーブル名を指定します。 |
オプションパラメーター
| パラメーター | データの型 | デフォルト値 | 説明 |
|---|---|---|---|
maxRetryTimes | INT | 3 | 書き込み失敗時の最大再試行回数。 |
batchSize | INT | 100 | フラッシュ前にバッファーに保持されるレコード数。バッファー内のレコード数が batchSize に達した場合、または flushIntervalMs の時間間隔が経過した場合、いずれか早いタイミングでフラッシュが実行されます。 |
flushIntervalMs | LONG | 1000 | フラッシュ間の最大間隔(ミリ秒単位)。 |
ignoreDelete | BOOLEAN | true | DELETE メッセージを破棄するかどうかを指定します。この値を false に設定し、かつプライマリキーが定義されている場合、コネクタは一致する行を削除する ALTER 文を実行します。この値を false に設定すると、スループットが大幅に低下し、writeMode=partition との併用はできません。 |
shardWrite | BOOLEAN | false | 分散テーブルをバイパスして、基盤となるローカルテーブルに直接書き込むかどうかを指定します。true に設定すると、分散テーブルへの書き込みスループットが向上します。false の場合、tableName を分散テーブル名に設定します。 |
inferLocalTable | BOOLEAN | false | shardWrite=true の場合、分散テーブルのメタデータから Flink が自動的にローカルテーブルのノードを検出するかどうかを指定します。この機能を利用するには、shardWrite=true、tableName が分散テーブル名に設定され、url が分散テーブルをホストするノードを指す必要があります。非分散テーブルでは不要です。 |
writeMode | ENUM | default | ローカルテーブルへの書き込みディストリビューション戦略。ガイドについては、「書き込みモードの選択」をご参照ください。有効な値: default、partition、random、balance。 |
shardingKey | STRING | なし | writeMode=partition の場合に、レコードをルーティングするために使用するフィールド(またはカンマ区切りの複数フィールド)。同じキー値を持つレコードは、同じローカルテーブルノードに送信されます。 |
exactlyOnce | BOOLEAN | false | 1 回限りのセマンティクスを使用するかどうかを指定します。Alibaba Cloud EMR にデプロイされた ClickHouse クラスターでのみサポートされます。writeMode=partition とは併用できません。 |
書き込みモードの選択
writeMode パラメーターは、shardWrite=true で ClickHouse の分散テーブルに書き込む際、レコードをローカルテーブルノード間でどのように分散するかを制御します。
| 書き込みモード | 配布戦略 | 使用するケース |
|---|---|---|
default | すべての書き込みが最初のノードに送信される | シングルノードクラスターへの書き込み、またはテスト時 |
partition | 同じ shardingKey を持つレコードは常に同じノードに送信される | 関連するレコードを 1 つのノード上に配置する必要がある場合 |
random | 各レコードがランダムに選択されたノードに書き込まれる | キーによるアフィニティを必要とせず、単純な負荷分散を希望する場合 |
balance | レコードがラウンドロビン方式でノード間で分散される | キーによるアフィニティを必要とせず、均等な分散を希望する場合(VVR 8.0.7 以降が必要) |
writeMode=partitionはignoreDelete=trueを必要とし、exactlyOnce=trueとは併用できません。
データの型のマッピング
| Flink の型 | ClickHouse の型 | 対応状況 | 備考 |
|---|---|---|---|
| BOOLEAN | UInt8 / Boolean | はい | ClickHouse V21.12 以降ではネイティブの BOOLEAN 型がサポートされています。それ以前のバージョンでは UInt8 にマッピングされます。 |
| TINYINT | Int8 | はい | |
| SMALLINT | Int16 | はい | |
| INTEGER | Int32 | はい | |
| BIGINT | Int64 | はい | |
| BIGINT | UInt32 | はい | |
| FLOAT | Float32 | はい | |
| DOUBLE | Float64 | はい | |
| CHAR | FixedString | はい | |
| VARCHAR | STRING | はい | |
| BINARY | FixedString | はい | |
| VARBINARY | STRING | はい | |
| DATE | Date | はい | |
| TIMESTAMP(0) | DateTime | はい | 秒単位の精度。 |
| TIMESTAMP(x) | DateTime64(x) | はい | VVR 6.0.6 より前のバージョンでは、JDBC ドライバーの制限により、精度が秒単位に丸められます。VVR 6.0.6 以降では、DateTime64 を完全な精度で書き込みます。 |
| DECIMAL | DECIMAL | はい | |
| ARRAY | ARRAY | はい | |
| ARRAY(フィールドごと) | Nested | はい | ClickHouse の NESTED カラムの各サブフィールドを、Flink の個別の ARRAY カラムにマップします。下記の例を参照してください。 |
| TIME | — | なし | サポートされていません。 |
| MAP | — | 非対応 | サポートされていません。 |
| MULTISET | — | いいえ | サポートされていません。 |
| ROW | — | いいえ | サポートされていません。 |
NESTED カラムの使用方法
ClickHouse の NESTED カラムに書き込むには、ColumnName.FieldName の命名規則を使用して、各サブフィールドを Flink の個別の ARRAY カラムにマップします。
ClickHouse のテーブル定義:
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
);対応する Flink DDL:
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);サンプル
単一の ClickHouse テーブルへの書き込み
このサンプルでは、組み込みの datagen コネクタを使用してレコードを生成し、それを ClickHouse テーブルに書き込みます。
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;分散テーブルのローカルテーブルへの直接書き込み
ノード 192.XX.XX.1、192.XX.XX.2、および 192.XX.XX.3 上に、それぞれ local_table_test という名前のローカルテーブルが 3 つ存在し、これらを基盤として distributed_table_test という名前の分散テーブルが構築されていると仮定します。
ノードを手動で指定
url にすべてのローカルテーブルノードのアドレスを列挙し、tableName をローカルテーブル名に設定します。shardingKey の値が同じレコードは、同じノードにルーティングされます。
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = 'local_table_test',
'shardWrite' = 'true',
'writeMode' = 'partition',
'shardingKey' = 'name'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;Flink によるローカルテーブルノードの自動検出
inferLocalTable=true を設定し、url を分散テーブルをホストする任意のノードに指定します。Flink は system.clusters をクエリして、ローカルテーブルノードを自動的に検出します。
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- 分散テーブルをホストする任意のノードの URL
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = 'distributed_table_test', -- 分散テーブルの名前
'shardWrite' = 'true',
'inferLocalTable' = 'true', -- ローカルテーブルノードを自動検出
'writeMode' = 'partition',
'shardingKey' = 'name'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;よくある質問
ApsaraDB for ClickHouse の結果テーブルから更新済みデータを撤回(削除)できますか?
ignoreDelete=false を設定し、DDL 文でプライマリキーを定義します。その後、コネクタは撤回メッセージを受信した際に ALTER DELETE 文を発行します。ただし、この機能を有効化すると、書き込みスループットが大幅に低下し、writeMode=partition や exactlyOnce=true との併用はできません。詳細については、「ApsaraDB for ClickHouse の結果テーブルから更新済みデータを撤回できますか?」をご参照ください。
書き込んだデータは、ClickHouse でいつから表示可能になりますか?
ClickHouse は非同期のパートマージを使用するため、書き込み完了直後にはデータが即座に表示されない場合があります。遅延時間は、ClickHouse クラスターで設定されたマージ間隔によって異なります。詳細については、「ClickHouse コンソールで ClickHouse sink テーブルに書き込まれたデータをいつ確認できますか?」をご参照ください。