このトピックでは、ClickHouseコネクタの使用方法について説明します。
背景情報
ClickHouseは、オンライン分析処理(OLAP)に使用される列指向データベース管理システムです。詳細については、「What Is ClickHouse?」をご参照ください。
次の表に、ClickHouseコネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | 結果テーブル |
実行モード | バッチモードとストリーミングモード |
データ形式 | 該当なし |
メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
APIタイプ | SQL API |
結果テーブルのデータの更新または削除 | Flink結果テーブルの作成に使用されるDDLステートメントで主キーが指定され、ignoreDeleteパラメータがfalseに設定されている場合、結果テーブルのデータを更新または削除できます。ただし、データ処理のパフォーマンスが大幅に低下します。 |
機能
ClickHouse分散テーブルの場合、データはClickHouse分散テーブルに対応するClickHouseローカルテーブルに直接書き込まれます。
Alibaba Cloud E-MapReduce(EMR)にデプロイされているClickHouseクラスタの場合、exactly-onceセマンティクスを使用できます。
前提条件
ClickHouseテーブルが作成されていること。詳細については、「Create a New table」をご参照ください。
ClickHouseクラスタのホワイトリストが構成されていること。
Alibaba Cloud ApsaraDB for ClickHouseクラスタを使用する場合は、「ホワイトリストの構成」の手順に従ってホワイトリストを構成します。
Alibaba Cloud EMRにデプロイされているClickHouseクラスタを使用する場合は、「セキュリティグループの管理」の手順に従ってホワイトリストを構成します。
Elastic Compute Service(ECS)インスタンスでホストされているセルフマネージドClickHouseクラスタを使用する場合は、「セキュリティグループの概要」の手順に従ってホワイトリストを構成します。
その他の場合、ClickHouseクラスタがデプロイされているマシンのホワイトリストを構成して、Realtime Compute for Apache FlinkがデプロイされているマシンからClickHouseクラスタにアクセスできるようにします。
説明Realtime Compute for Apache Flink が属する vSwitch の CIDR ブロックを表示する方法の詳細については、「コンソール操作」をご参照ください。
制限事項
ClickHouseコネクタは、sink.parallelismパラメータをサポートしていません。
ClickHouse結果テーブルは、at-least-onceセマンティクスをサポートしています。
Ververica Runtime(VVR) 3.0.2以降を使用するRealtime Compute for Apache Flinkのみが、ClickHouseコネクタをサポートしています。
VVR 3.0.3またはVVR 4.0.7、あるいはそれらの以降のマイナーバージョンを使用するRealtime Compute for Apache Flinkのみが、WITH句のignoreDeleteパラメータをサポートしています。
VVR 4.0.10以降を使用するRealtime Compute for Apache Flinkのみが、ClickHouseのNESTEDデータ型をサポートしています。
VVR 4.0.11以降を使用するRealtime Compute for Apache Flinkのみが、ClickHouse分散テーブルに対応するClickHouseローカルテーブルにデータを書き込むことができます。
VVR 4.0.11以降を使用するRealtime Compute for Apache Flinkのみが、Alibaba Cloud EMRにデプロイされているClickHouseクラスタのテーブルにデータを書き込むためのexactly-onceセマンティクスを提供します。EMR ClickHouseの機能変更により、EMR V3.45.1以降のマイナーバージョン、またはEMR V5.11.1以降のマイナーバージョンのClickHouseクラスタのテーブルにデータを書き込む際に、exactly-onceセマンティクスは使用できなくなりました。
VVR 8.0.7以降を使用するRealtime Compute for Apache Flinkでのみ、writeModeパラメータをbalanceに設定して、ClickHouseローカルテーブルに均等にデータを書き込むことができます。
ApsaraDB for ClickHouse Community 互換エディションクラスタのみが、ClickHouseローカルテーブルへのデータ書き込みを許可します。
構文
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000'
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);WITH句のパラメータ
パラメータ | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | 結果テーブルのタイプ。 | STRING | はい | デフォルト値なし | 値を clickhouse に設定します。 |
url | ClickHouseのJava Database Connectivity(JDBC) URL。 | STRING | はい | デフォルト値なし |
説明 ClickHouse分散テーブルにデータを書き込む場合は、ClickHouse分散テーブルが属するノードのJDBC URLを指定する必要があります。 |
userName | ClickHouseへのアクセスに使用するユーザー名。 | STRING | はい | デフォルト値なし | 該当なし。 |
password | ClickHouseへのアクセスに使用するパスワード。 | STRING | はい | デフォルト値なし | 該当なし。 |
tableName | ClickHouseテーブルの名前。 | STRING | はい | デフォルト値なし | 該当なし。 |
maxRetryTimes | 結果テーブルへのデータ書き込みの最大再試行回数。 | INT | いいえ | 3 | 該当なし。 |
batchSize | 一度に書き込むことができるデータレコードの数。 | INT | いいえ | 100 | キャッシュ内のデータレコード数が batchSize パラメータの値に達した場合、またはキャッシュがクリアされる間隔が flushIntervalMs パラメータの値よりも大きい場合、システムはキャッシュされたデータをClickHouseテーブルに自動的に書き込みます。 |
flushIntervalMs | キャッシュがクリアされる間隔。 | LONG | いいえ | 1000 | 単位:ミリ秒。 |
ignoreDelete | 削除メッセージを無視するかどうかを指定します。 | BOOLEAN | いいえ | true | 有効な値:
説明 ignoreDeleteパラメータをfalseに設定した場合、パーティション書き込みモードでは、ClickHouse分散テーブルに対応するClickHouseローカルテーブルにデータを書き込むことができません。この場合、writeModeパラメータをpartitionに設定することはできません。 |
shardWrite | 現在のテーブルがClickHouse分散テーブルの場合、データをClickHouseローカルテーブルに直接書き込むかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
|
inferLocalTable | ClickHouse分散テーブルにデータを書き込み、ClickHouseローカルテーブルに直接データを書き込む場合、ClickHouse分散テーブルに対応するClickHouseローカルテーブルに関する情報を自動的に推測するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
説明 ClickHouse非分散テーブルにデータを書き込む場合は、このパラメータを構成する必要はありません。 |
writeMode | ClickHouseローカルテーブルにデータを書き込むポリシー。 | ENUM | いいえ | default | 有効な値:
説明 writeModeパラメータをpartitionに設定する場合は、ignoreDeleteパラメータがtrueに設定されていることを確認してください。 |
shardingKey | 特定のノードにある同じClickHouseローカルテーブルにデータを書き込むためのキー。 | default | いいえ | デフォルト値なし | writeMode パラメータをpartitionに設定する場合は、 shardingKey パラメータを構成する必要があります。 shardingKeyパラメータの値には複数のフィールドを含めることができます。複数のフィールドはコンマ(,)で区切ります。 |
exactlyOnce | exactly-onceセマンティクスを使用するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
説明
|
データ型マッピング
Realtime Compute for Apache Flinkのデータ型 | ClickHouseのデータ型 |
BOOLEAN | UInt8 / Boolean 説明 ClickHouse V21.12以降はBOOLEANデータ型をサポートしています。使用しているClickHouseのバージョンがV21.12より前の場合、Realtime Compute for Apache FlinkのBOOLEANデータ型はClickHouseのUINT8データ型に対応します。 |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
BIGINT | UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
CHAR | FixedString |
VARCHAR | STRING |
BINARY | FixedString |
VARBINARY | STRING |
DATE | Date |
TIMESTAMP(0) | DateTime |
TIMESTAMP(x) | Datetime64(x) |
DECIMAL | DECIMAL |
ARRAY | ARRAY |
Nested |
ClickHouseは、Realtime Compute for Apache FlinkのTIME、MAP、MULTISET、ROWのデータ型をサポートしていません。
ClickHouseのNESTEDデータ型を使用するには、このデータ型をRealtime Compute for Apache FlinkのARRAYデータ型にマッピングする必要があります。サンプルコード:
-- ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);ClickHouseのNESTEDデータ型をRealtime Compute for Apache FlinkのARRAYデータ型にマッピングします。
-- Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);ClickHouseのDATETIMEデータ型は秒単位まで正確にすることができ、DATETIME64データ型はナノ秒単位まで正確にすることができます。VVR 6.0.6より前のバージョンのVVRを使用するRealtime Compute for Apache Flinkの場合、ClickHouseが提供するJDBCドライバがDATETIME64データ型のデータを書き込むと、精度の低下が発生し、データは秒単位までしか正確になりません。したがって、Realtime Compute for Apache FlinkはTIMESTAMPデータ型のデータを秒単位でのみ書き込むことができます。値はTIMESTAMP(0)形式で表示されます。VVR 6.0.6以降を使用するRealtime Compute for Apache Flinkでは、精度の低下に関する問題は解決されています。Realtime Compute for Apache FlinkはDATETIME64データ型のデータを想定どおりに書き込むことができます。
例
例 1:データはノードのClickHouseローカルテーブルに書き込まれます。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;例 2:データはClickHouse分散テーブルに書き込まれます。
local_table_testという名前の3つのClickHouseローカルテーブルが、192.XX.XX.1、192.XX.XX.2、192.XX.XX.3ノードに存在します。ClickHouseローカルテーブルに基づいて、distributed_table_testという名前のClickHouse分散テーブルが作成されます。
同じキーを持つデータを特定のノードにある同じClickHouseローカルテーブルに直接書き込む場合は、次のステートメントを実行します。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;urlパラメータで、ClickHouseローカルテーブルにデータが書き込まれるノードを手動で指定したくない場合は、次のステートメントを実行して、システムがClickHouseローカルテーブルのノードを自動的に推測できるようにします。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- urlパラメータをClickHouse分散テーブルが属するノードのJDBC URLに設定します。 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'distributed_table_test', -- tableNameパラメータをClickHouse分散テーブルの名前に設定します。 'shardWrite' = 'true', 'inferLocalTable' = 'true', -- inferLocalTableパラメータをtrueに設定します。 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;