このトピックでは、ClickHouse コネクタを使用して ClickHouse にデータを書き込む方法について説明します。
背景情報
ClickHouse は、オンライン分析処理 (OLAP) 向けの列指向データベース管理システムです。 詳細については、「ClickHouse とは」をご参照ください。
次の表に、ClickHouse コネクタの機能を示します。
|
カテゴリ |
詳細 |
|
サポートタイプ |
結果テーブルのみ |
|
実行モード |
バッチおよびストリーミングモード |
|
データ形式 |
該当なし |
|
コネクタ固有のメトリクス |
説明
これらのメトリクスの詳細については、「メトリクスのモニタリング」をご参照ください。 |
|
API タイプ |
SQL |
|
結果テーブル内のデータの更新または削除のサポート |
コネクタは、Flink 結果テーブルの DDL でプライマリキーを指定し、 |
機能
-
ClickHouse 分散テーブルのローカルテーブルにデータを直接書き込みます。
-
Alibaba Cloud E-MapReduce (EMR) 上の ClickHouse に書き込む際に、exactly-once セマンティクスを実現します。
前提条件
-
ClickHouse テーブルを作成します。 詳細については、「新しいテーブルの作成」をご参照ください。
-
ホワイトリストを設定します。
-
ApsaraDB for ClickHouse を使用する場合は、「ホワイトリストの設定」をご参照ください。
-
Alibaba Cloud E-MapReduce (EMR) 上の ClickHouse を使用する場合は、「セキュリティグループの管理」をご参照ください。
-
ECS インスタンス上のセルフマネージド ClickHouse クラスターを使用する場合は、「セキュリティグループの概要」をご参照ください。
-
その他のすべての設定では、ClickHouse がデプロイされているマシンでホワイトリストを設定し、Realtime Compute for Apache Flink デプロイメントからのアクセスを許可する必要があります。
説明Realtime Compute for Apache Flink の vSwitch ネットワークセグメントを表示するには、「ホワイトリストの設定方法」をご参照ください。
-
制限事項
-
sink.parallelismパラメータはサポートされていません。 -
デフォルトでは、ClickHouse シンクは at-least-once セマンティクスを提供します。
-
ClickHouse コネクタは、Ververica Runtime (VVR) 3.0.2 以降でのみサポートされています。
-
ignoreDeleteオプションは、VVR 3.0.3、VVR 4.0.7 以降のバージョンでのみサポートされています。 -
ClickHouse Nested データ型は、VVR 4.0.10 以降でのみサポートされています。
-
分散テーブルのローカルテーブルへの直接書き込みは、VVR 4.0.11 以降でのみサポートされています。
-
EMR 上の ClickHouse への書き込みにおける exactly-once セマンティクスは、VVR 4.0.11 以降で利用可能です。ただし、製品機能の変更により、このセマンティクスは、ClickHouse on EMR のバージョンが v3.45.1 または v5.11.1 より後の場合は利用できません。
-
ローカルテーブルノード間でデータを均等に分散する
balance書き込みモードは、VVR 8.0.7 以降でのみ利用可能です。 -
ClickHouse ローカルテーブルへの書き込みは、ApsaraDB for ClickHouse Community-compatible Edition でのみサポートされています。
-
ClickHouse カタログを登録する際、デフォルトのデータベース名にハイフン (
-) が含まれていると、JDBC URL の検証が失敗します。
構文
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000',
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);
WITH パラメータ
|
パラメータ |
説明 |
型 |
必須 |
デフォルト |
備考 |
|
connector |
結果テーブルのタイプ。 |
String |
はい |
N/A |
値を clickhouse に設定します。 |
|
url |
ClickHouse クラスターの JDBC URL。 |
String |
はい |
N/A |
URL 形式は 説明
ClickHouse 分散テーブルにデータを書き込む場合、 |
|
userName |
ClickHouse へのアクセスに使用するユーザー名。 |
String |
はい |
N/A |
N/A |
|
password |
ClickHouse へのアクセスに使用するパスワード。 |
String |
はい |
N/A |
N/A |
|
tableName |
ClickHouse テーブルの名前。 |
String |
はい |
N/A |
N/A |
|
maxRetryTimes |
データ挿入に失敗した場合の最大リトライ回数。 |
Int |
いいえ |
3 |
N/A |
|
batchSize |
1 回のバッチで書き込むレコード数。 |
Int |
いいえ |
100 |
キャッシュ内のデータエントリ数が batchSize パラメータの値に達するか、待機時間が flushIntervalMs を超えると、システムはキャッシュから ClickHouse テーブルにデータを自動的に書き込みます。 |
|
flushIntervalMs |
バッファをフラッシュする時間間隔 (ミリ秒単位)。 |
Long |
いいえ |
1000 |
単位はミリ秒です。 |
|
ignoreDelete |
削除メッセージを無視するかどうかを指定します。 |
Boolean |
いいえ |
true |
有効な値:
説明
|
|
shardWrite |
ClickHouse 分散テーブルの場合、基になるローカルテーブルにデータを直接書き込むかどうかを指定します。 |
Boolean |
いいえ |
false |
有効な値:
|
|
inferLocalTable |
ClickHouse 分散テーブルに書き込むときに、基になるローカルテーブルを自動的に検出し、それらに直接書き込むかどうかを指定します。 |
Boolean |
いいえ |
false |
有効な値:
説明
このパラメータは、非分散テーブルへの書き込み時には無視されます。 |
|
writeMode |
分散テーブルのローカルテーブルにデータを書き込むためのストラテジー。 |
Enum |
いいえ |
default |
有効な値:
説明
|
|
shardingKey |
ローカルテーブルノード間でデータをパーティション分割するために使用されるキー。 |
String |
いいえ |
N/A |
writeMode が 'partition' に設定されている場合、shardingKey パラメータは必須です。 コンマ (,) で区切られた複数のフィールドを含めることができます。 |
|
exactlyOnce |
exactly-once セマンティクスを有効にするかどうかを指定します。 |
Boolean |
いいえ |
false |
有効な値:
説明
|
データ型のマッピング
|
Flink 型 |
ClickHouse 型 |
|
BOOLEAN |
UInt8 / Boolean 説明
ClickHouse v21.12 以降は Boolean 型をサポートしています。 それ以前のバージョンでは、Flink の BOOLEAN 型は ClickHouse の UInt8 型にマッピングされます。 |
|
TINYINT |
Int8 |
|
SMALLINT |
Int16 |
|
INTEGER |
Int32 |
|
BIGINT |
Int64 |
|
BIGINT |
UInt32 |
|
FLOAT |
Float32 |
|
DOUBLE |
Float64 |
|
CHAR |
FixedString |
|
VARCHAR |
String |
|
BINARY |
FixedString |
|
VARBINARY |
String |
|
DATE |
Date |
|
TIMESTAMP(0) |
DateTime |
|
TIMESTAMP(x) |
Datetime64(x) |
|
DECIMAL |
DECIMAL |
|
ARRAY |
ARRAY |
|
Nested |
ClickHouse コネクタは、Flink の TIME、MAP、MULTISET、および ROW 型をサポートしていません。
ClickHouse の Nested データ型を使用するには、Flink の ARRAY 型にマッピングする必要があります。 例:
-- ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);
次のように型をマッピングします。
-- Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);
VVR 6.0.6 より前のバージョンでは、公式の ClickHouse JDBC ドライバーで DateTime64 データを書き込むと、値が秒単位に切り捨てられて精度が失われる問題がありました。 その結果、TIMESTAMP(0) のような秒単位の精度の TIMESTAMP データしか正常に書き込めませんでした。 VVR 6.0.6 以降ではこの問題が解決され、正確な DateTime64 データの書き込みが可能になりました。
例
-
例 1:シングルノードテーブルへの書き込み。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source; -
例 2:分散テーブルへの書き込み。
ノード 192.XX.XX.1、192.XX.XX.2、192.XX.XX.3 にある local_table_test という名前の 3 つのローカルテーブルから、distributed_table_test という名前の分散テーブルが作成されていると仮定します。
-
Flink でローカルテーブルにデータを直接書き込み、キーによってデータをパーティション分割するには、次の DDL を使用します。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source; -
URL で手動で指定する代わりに、Flink にローカルテーブルノードを自動的に検出させるには、次の DDL を使用します。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- クラスター内のノードの JDBC URL。 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'distributed_table_test', -- 分散テーブルの名前。 'shardWrite' = 'true', 'inferLocalTable' = 'true', -- inferLocalTable を true に設定します。 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
-