すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ClickHouseコネクタ

最終更新日:Jan 07, 2025

このトピックでは、ClickHouseコネクタの使用方法について説明します。

背景情報

ClickHouseは、オンライン分析処理(OLAP)に使用される列指向データベース管理システムです。詳細については、「What Is ClickHouse?」をご参照ください。

次の表に、ClickHouseコネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

結果テーブル

実行モード

バッチモードとストリーミングモード

データ形式

該当なし

メトリック

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

説明

メトリックの詳細については、「メトリック」をご参照ください。

APIタイプ

SQL API

結果テーブルのデータの更新または削除

Flink結果テーブルの作成に使用されるDDLステートメントで主キーが指定され、ignoreDeleteパラメータがfalseに設定されている場合、結果テーブルのデータを更新または削除できます。ただし、データ処理のパフォーマンスが大幅に低下します。

機能

  • ClickHouse分散テーブルの場合、データはClickHouse分散テーブルに対応するClickHouseローカルテーブルに直接書き込まれます。

  • Alibaba Cloud E-MapReduce(EMR)にデプロイされているClickHouseクラスタの場合、exactly-onceセマンティクスを使用できます。

前提条件

  • ClickHouseテーブルが作成されていること。詳細については、「Create a New table」をご参照ください。

  • ClickHouseクラスタのホワイトリストが構成されていること。

    • Alibaba Cloud ApsaraDB for ClickHouseクラスタを使用する場合は、「ホワイトリストの構成」の手順に従ってホワイトリストを構成します。

    • Alibaba Cloud EMRにデプロイされているClickHouseクラスタを使用する場合は、「セキュリティグループの管理」の手順に従ってホワイトリストを構成します。

    • Elastic Compute Service(ECS)インスタンスでホストされているセルフマネージドClickHouseクラスタを使用する場合は、「セキュリティグループの概要」の手順に従ってホワイトリストを構成します。

    • その他の場合、ClickHouseクラスタがデプロイされているマシンのホワイトリストを構成して、Realtime Compute for Apache FlinkがデプロイされているマシンからClickHouseクラスタにアクセスできるようにします。

    説明

    Realtime Compute for Apache Flink が属する vSwitch の CIDR ブロックを表示する方法の詳細については、「コンソール操作」をご参照ください。

制限事項

  • ClickHouseコネクタは、sink.parallelismパラメータをサポートしていません。

  • ClickHouse結果テーブルは、at-least-onceセマンティクスをサポートしています。

  • Ververica Runtime(VVR) 3.0.2以降を使用するRealtime Compute for Apache Flinkのみが、ClickHouseコネクタをサポートしています。

  • VVR 3.0.3またはVVR 4.0.7、あるいはそれらの以降のマイナーバージョンを使用するRealtime Compute for Apache Flinkのみが、WITH句のignoreDeleteパラメータをサポートしています。

  • VVR 4.0.10以降を使用するRealtime Compute for Apache Flinkのみが、ClickHouseのNESTEDデータ型をサポートしています。

  • VVR 4.0.11以降を使用するRealtime Compute for Apache Flinkのみが、ClickHouse分散テーブルに対応するClickHouseローカルテーブルにデータを書き込むことができます。

  • VVR 4.0.11以降を使用するRealtime Compute for Apache Flinkのみが、Alibaba Cloud EMRにデプロイされているClickHouseクラスタのテーブルにデータを書き込むためのexactly-onceセマンティクスを提供します。EMR ClickHouseの機能変更により、EMR V3.45.1以降のマイナーバージョン、またはEMR V5.11.1以降のマイナーバージョンのClickHouseクラスタのテーブルにデータを書き込む際に、exactly-onceセマンティクスは使用できなくなりました。

  • VVR 8.0.7以降を使用するRealtime Compute for Apache Flinkでのみ、writeModeパラメータをbalanceに設定して、ClickHouseローカルテーブルに均等にデータを書き込むことができます。

  • ApsaraDB for ClickHouse Community 互換エディションクラスタのみが、ClickHouseローカルテーブルへのデータ書き込みを許可します。

構文

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH句のパラメータ

パラメータ

説明

データ型

必須

デフォルト値

備考

connector

結果テーブルのタイプ。

STRING

はい

デフォルト値なし

値を clickhouse に設定します。

url

ClickHouseのJava Database Connectivity(JDBC) URL。

STRING

はい

デフォルト値なし

jdbc:clickhouse://<yourNetworAddress>:<PortId>/<yourDatabaseName> 形式で URL を指定する必要があります。 ClickHouse ローカルテーブルに直接データを書き込む場合は、select * from system.clusters ステートメントを実行して、ClickHouse ローカルテーブルにデータが書き込まれるノードの IP アドレスを取得できます。 データベース名を指定しない場合は、default という名前のデータベースが使用されます。

説明

ClickHouse分散テーブルにデータを書き込む場合は、ClickHouse分散テーブルが属するノードのJDBC URLを指定する必要があります。

userName

ClickHouseへのアクセスに使用するユーザー名。

STRING

はい

デフォルト値なし

該当なし。

password

ClickHouseへのアクセスに使用するパスワード。

STRING

はい

デフォルト値なし

該当なし。

tableName

ClickHouseテーブルの名前。

STRING

はい

デフォルト値なし

該当なし。

maxRetryTimes

結果テーブルへのデータ書き込みの最大再試行回数。

INT

いいえ

3

該当なし。

batchSize

一度に書き込むことができるデータレコードの数。

INT

いいえ

100

キャッシュ内のデータレコード数が batchSize パラメータの値に達した場合、またはキャッシュがクリアされる間隔が flushIntervalMs パラメータの値よりも大きい場合、システムはキャッシュされたデータをClickHouseテーブルに自動的に書き込みます。

flushIntervalMs

キャッシュがクリアされる間隔。

LONG

いいえ

1000

単位:ミリ秒。

ignoreDelete

削除メッセージを無視するかどうかを指定します。

BOOLEAN

いいえ

true

有効な値:

  • true:削除メッセージは無視されます。これはデフォルト値です。

  • false:削除メッセージは無視されません。

    このパラメータをfalseに設定し、DDLステートメントで主キーを指定すると、システムはALTERステートメントを実行してClickHouseテーブルからデータを削除します。

説明

ignoreDeleteパラメータをfalseに設定した場合、パーティション書き込みモードでは、ClickHouse分散テーブルに対応するClickHouseローカルテーブルにデータを書き込むことができません。この場合、writeModeパラメータをpartitionに設定することはできません。

shardWrite

現在のテーブルがClickHouse分散テーブルの場合、データをClickHouseローカルテーブルに直接書き込むかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • false:システムはデータをClickHouse分散テーブルに書き込み、次にClickHouse分散テーブルに対応するClickHouseローカルテーブルに書き込みます。これはデフォルト値です。この場合、shardWriteパラメータをfalseに設定する場合は、tableNameパラメータをClickHouse分散テーブルの名前に設定する必要があります。

  • true:システムはClickHouse分散テーブルをスキップし、ClickHouse分散テーブルに対応するClickHouseローカルテーブルに直接データを書き込みます。

    ClickHouse分散テーブルにデータを書き込むためのスループットを向上させる場合は、このパラメータをtrueに設定することをお勧めします。

    • urlパラメータで、ClickHouseローカルテーブルにデータが書き込まれるノードを手動で指定する場合は、tableNameパラメータをClickHouseローカルテーブルの名前に設定する必要があります。例:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002/default'
      'tableName' = 'local_table'
    • urlパラメータで、ClickHouseローカルテーブルにデータが書き込まれるノードを手動で指定したくない場合は、shardWriteパラメータとともにinferLocalTableパラメータを構成して、Realtime Compute for Apache FlinkがClickHouseローカルテーブルのノードを自動的に推測できるようにすることができます。この場合、tableNameパラメータをClickHouse分散テーブルの名前に設定し、urlパラメータをClickHouse分散テーブルが属するノードのJDBC URLに設定する必要があります。例:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default' // urlパラメータをClickHouse分散テーブルが属するノードのJDBC URLに設定します。
      'tableName' = 'distribute_table'

inferLocalTable

ClickHouse分散テーブルにデータを書き込み、ClickHouseローカルテーブルに直接データを書き込む場合、ClickHouse分散テーブルに対応するClickHouseローカルテーブルに関する情報を自動的に推測するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • false:ClickHouse分散テーブルにデータを書き込み、urlパラメータに1つのノードのみを指定した場合、システムはClickHouse分散テーブルに対応するClickHouseローカルテーブルに関する情報を自動的に推測しません。システムはデータをClickHouse分散テーブルに書き込み、次にClickHouseローカルテーブルに書き込みます。これはデフォルト値です。

  • true:システムはClickHouse分散テーブルに対応するClickHouseローカルテーブルに関する情報を自動的に推測し、ClickHouseローカルテーブルに直接データを書き込みます。この場合、inferLocalTableパラメータをtrueに設定する場合は、shardWriteパラメータをtrueに設定し、tableNameパラメータをClickHouse分散テーブルの名前に設定し、urlパラメータをClickHouse分散テーブルが属するノードのJDBC URLに設定する必要があります。

説明

ClickHouse非分散テーブルにデータを書き込む場合は、このパラメータを構成する必要はありません。

writeMode

ClickHouseローカルテーブルにデータを書き込むポリシー。

ENUM

いいえ

default

有効な値:

  • default:データはClickHouseクラスタの最初のノードにあるClickHouseローカルテーブルに書き込まれます。これはデフォルト値です。

  • partition:同じキーを持つデータは、特定のノードにある同じClickHouseローカルテーブルに書き込まれます。

  • random:データはノードにあるClickHouseローカルテーブルにランダムに書き込まれます。

  • balance:ラウンドロビンアルゴリズムを使用して、ノードにあるClickHouseローカルテーブルに均等にデータを書き込みます。

説明

writeModeパラメータをpartitionに設定する場合は、ignoreDeleteパラメータがtrueに設定されていることを確認してください。

shardingKey

特定のノードにある同じClickHouseローカルテーブルにデータを書き込むためのキー。

default

いいえ

デフォルト値なし

writeMode パラメータをpartitionに設定する場合は、 shardingKey パラメータを構成する必要があります。 shardingKeyパラメータの値には複数のフィールドを含めることができます。複数のフィールドはコンマ(,)で区切ります。

exactlyOnce

exactly-onceセマンティクスを使用するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:exactly-onceセマンティクスが使用されます。

  • false:exactly-onceセマンティクスは使用されません。これはデフォルト値です。

説明
  • exactly-onceセマンティクスを使用して、Alibaba Cloud EMRにデプロイされているClickHouseクラスタにのみデータを書き込むことができます。したがって、Alibaba Cloud EMRにデプロイされているClickHouseクラスタにデータを書き込む場合にのみ、このパラメータをtrueに設定できます。

  • writeModeパラメータをpartitionに設定し、ClickHouseローカルテーブルにデータを書き込む場合、exactly-onceセマンティクスを使用することはできません。したがって、exactlyOnceパラメータをtrueに設定する場合は、writeMode パラメータをpartitionに設定することはできません。

データ型マッピング

Realtime Compute for Apache Flinkのデータ型

ClickHouseのデータ型

BOOLEAN

UInt8 / Boolean

説明

ClickHouse V21.12以降はBOOLEANデータ型をサポートしています。使用しているClickHouseのバージョンがV21.12より前の場合、Realtime Compute for Apache FlinkのBOOLEANデータ型はClickHouseのUINT8データ型に対応します。

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

BIGINT

UInt32

FLOAT

Float32

DOUBLE

Float64

CHAR

FixedString

VARCHAR

STRING

BINARY

FixedString

VARBINARY

STRING

DATE

Date

TIMESTAMP(0)

DateTime

TIMESTAMP(x)

Datetime64(x)

DECIMAL

DECIMAL

ARRAY

ARRAY

Nested

説明

ClickHouseは、Realtime Compute for Apache FlinkのTIME、MAP、MULTISET、ROWのデータ型をサポートしていません。

ClickHouseのNESTEDデータ型を使用するには、このデータ型をRealtime Compute for Apache FlinkのARRAYデータ型にマッピングする必要があります。サンプルコード:

-- ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);

ClickHouseのNESTEDデータ型をRealtime Compute for Apache FlinkのARRAYデータ型にマッピングします。

-- Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);
説明

ClickHouseのDATETIMEデータ型は秒単位まで正確にすることができ、DATETIME64データ型はナノ秒単位まで正確にすることができます。VVR 6.0.6より前のバージョンのVVRを使用するRealtime Compute for Apache Flinkの場合、ClickHouseが提供するJDBCドライバがDATETIME64データ型のデータを書き込むと、精度の低下が発生し、データは秒単位までしか正確になりません。したがって、Realtime Compute for Apache FlinkはTIMESTAMPデータ型のデータを秒単位でのみ書き込むことができます。値はTIMESTAMP(0)形式で表示されます。VVR 6.0.6以降を使用するRealtime Compute for Apache Flinkでは、精度の低下に関する問題は解決されています。Realtime Compute for Apache FlinkはDATETIME64データ型のデータを想定どおりに書き込むことができます。

  • 例 1:データはノードのClickHouseローカルテーブルに書き込まれます。

    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • 例 2:データはClickHouse分散テーブルに書き込まれます。

    local_table_testという名前の3つのClickHouseローカルテーブルが、192.XX.XX.1、192.XX.XX.2、192.XX.XX.3ノードに存在します。ClickHouseローカルテーブルに基づいて、distributed_table_testという名前のClickHouse分散テーブルが作成されます。

    • 同じキーを持つデータを特定のノードにある同じClickHouseローカルテーブルに直接書き込む場合は、次のステートメントを実行します。

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'local_table_test',
        'shardWrite' = 'true',
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;
    • urlパラメータで、ClickHouseローカルテーブルにデータが書き込まれるノードを手動で指定したくない場合は、次のステートメントを実行して、システムがClickHouseローカルテーブルのノードを自動的に推測できるようにします。

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- urlパラメータをClickHouse分散テーブルが属するノードのJDBC URLに設定します。
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'distributed_table_test', -- tableNameパラメータをClickHouse分散テーブルの名前に設定します。
        'shardWrite' = 'true',
        'inferLocalTable' = 'true', -- inferLocalTableパラメータをtrueに設定します。
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;

FAQ