本トピックでは、Java Database Connectivity (JDBC) コネクタを用いて Flink から ClickHouse へデータを書き込む方法について説明します。記載する手順は、さまざまなバージョンの Flink に適用可能です。
背景情報
Flink は、バージョン 1.11.0 で JDBC コネクタを大幅にリファクタリングしました。
-
リファクタリング前(バージョン 1.10.1 以前)では、パッケージ名は flink-jdbc でした。
-
リファクタリング後(バージョン 1.11.0 以降)では、パッケージ名は flink-connector-jdbc です。
以下の表は、各パッケージが ClickHouse sink へのデータ書き込みをどの程度サポートしているかを示しています。
|
API 名 |
flink-jdbc |
flink-connector-jdbc |
|
DataStream |
非対応 |
対応 |
|
Table API(レガシ) |
サポート |
非対応 |
|
Table API(DDL) |
非対応 |
非対応 |
flink-connector-jdbc パッケージでは、レガシ Table API のサポートが完全に削除されています。Table API を利用するには、Data Definition Language(DDL)のみを使用する必要があります。ただし、Table DDL 方式では、サポートされる JDBC ドライバーがハードコードされており、ClickHouse は含まれていません。本トピックでは、Flink 1.10.1(flink-jdbc 使用)および Flink 1.11.0(flink-connector-jdbc 使用)を例として、Flink から ClickHouse へのデータ書き込み方法を解説します。
Flink 1.10.1 + flink-jdbc
Flink 1.10.1 およびそれ以前のバージョンでは、flink-jdbc パッケージと Table API を用いて ClickHouse へデータを書き込む必要があります。本セクションでは、Maven および Flink 1.10.1 を用いた例を示します。
-
mvn archetype:generate コマンドを使用してプロジェクトを作成します。作成プロセス中に、プロンプトに従って group-id、artifact-id などの情報を入力します。
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.10.1 -
pom.xml ファイル内の
<dependencies />セクションを編集し、必要な依存関係を追加します。<!--//Flink Table API の依存関係を追加 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!--//Flink JDBC および ClickHouse JDBC ドライバーの依存関係を追加 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency> -
データ書き込み用のプログラムファイルを作成します。
この例では、
CsvTableSourceを使用して CSV ファイルを読み込み、テーブルソースを作成します。その後、JDBCAppendTableSinkを使用してデータを ClickHouse sink へ書き込みます。説明-
ClickHouse では単一の INSERT 操作に高いレイテンシーが発生するため、
BatchSizeを設定して一括挿入を行い、パフォーマンスを向上させることを推奨します。 -
JDBCAppendTableSink の実装では、最終バッチのレコード数が
BatchSize未満の場合、残りのデータは挿入されません。
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.common.typeinfo.TypeInformation object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // バッチサイズを設定してください val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // ClickHouse sink へデータを書き込みます val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(CkJdbcUrl) .setUsername(CkUsername) .setPassword(CkPassword) .setQuery(insertIntoCkSql) .setBatchSize(BatchSize) .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT) .build() tEnv.registerTableSink( "sink", Array("name", "grade", "rate"), Array(Types.STRING, Types.LONG, Types.FLOAT), sink ) tEnv.insertInto(resultTable, "sink") env.execute("Flink Table API から ClickHouse への書き込み(サンプル)") } }パラメーター:
-
SourceCsvPath:ソース CSV ファイルのパス。 -
CkJdbcUrl:送信先 ClickHouse クラスターのアドレス。 -
CkUsername:送信先 ClickHouse クラスターのユーザー名。 -
CkPassword:ApsaraDB for ClickHouse クラスターのパスワード。
-
-
プログラムをコンパイルして実行します。
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
Flink 1.11.0 + flink-connector-jdbc
Flink 1.11.0 およびそれ以降のバージョンでは、flink-connector-jdbc パッケージと DataStream API を用いて ClickHouse へデータを書き込む必要があります。本セクションでは、Maven および Flink 1.11.0 を用いた例を示します。
-
mvn archetype:generate コマンドを使用してプロジェクトを作成します。プロンプトに従って、group-id、artifact-id などの情報を入力します。
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.11.0 -
pom.xml ファイル内の
<dependencies />セクションを編集し、必要な依存関係を追加します。<!--//Flink Table API の依存関係を追加 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!--//Flink JDBC コネクタおよび ClickHouse JDBC ドライバーの依存関係を追加 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency> -
データ書き込み用のプログラムファイルを作成します。
この例では、
CsvTableSourceを使用して CSV ファイルを読み込み、テーブルソースを作成します。次に、TableEnvironment.toAppendStreamを使用してテーブルを DataStream に変換します。最後に、JdbcSinkを使用してデータを ClickHouse へ書き込みます。説明-
ClickHouse では単一の INSERT 操作に高いレイテンシーが発生するため、
BatchSizeを設定して一括挿入を行い、パフォーマンスを向上させることを推奨します。 -
現在の flink-connector-jdbc バージョンでは、Scala API を使用して JdbcSink を呼び出す際に、ラムダ関数でシリアル化の問題が発生します。この問題を回避するため、JDBC ステートメントビルダー関数を渡すために、インターフェイスを手動で実装する必要があります。
CkSinkBuilderクラスは、そのような実装の例です。class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } }
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.connector.jdbc._ import java.sql.PreparedStatement // JDBC ステートメントビルダー関数を渡すために、インターフェイスを手動で実装します class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } } object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // バッチサイズを設定してください val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") // テーブルを DataStream に変換します val resultDataStream = tEnv.toAppendStream[(String, Long, Float)](resultTable) val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // ClickHouse JDBC sink へデータを書き込みます resultDataStream.addSink( JdbcSink.sink[(String, Long, Float)]( insertIntoCkSql, new CkSinkBuilder, new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .withUrl(CkJdbcUrl) .withUsername(CkUsername) .withPassword(CkPassword) .build() ) ) env.execute("Flink DataStream から ClickHouse への書き込み(サンプル)") } }パラメーター:
-
SourceCsvPath:ソース CSV ファイルのパス。 -
CkJdbcUrl:送信先 ClickHouse クラスターのアドレス。 -
CkUsername:送信先 ClickHouse クラスターのユーザー名。 -
CkPassword:ClickHouse クラスターのパスワード。
-
-
プログラムをコンパイルして実行します。
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar