このトピックでは、Flinkによって処理されたデータをApsaraDB for ClickHouseクラスターにインポートする必要がある場合に、Java Database Connectivity (JDBC) コネクタを使用して、異なるバージョンのFlinkによって処理されたデータをApsaraDB for ClickHouseクラスターに書き込む方法について説明します。
背景情報
Flink 1.11.0では、JDBCコネクタが大規模なリファクタリングを受けました。
Flink 1.10.1以前では、パッケージ名はflink-jdbcです。
Flink 1.11.0以降の場合、パッケージ名はflink-connector-jdbcです。
リファクタリングの前後にClickHouse Sinkにデータを書き込むために使用できるメソッドを次の表に示します。
操作名 | flink-jdbc | flink-connector-jdbc |
DataStream | 非対応 | 対応 |
テーブルAPI (レガシー) | 対応 | 非対応 |
テーブルAPI (DDL) | 非対応 | 非対応 |
flink-connector-jdbcはTable API (Legacy) メソッドをサポートしていないため、DDLステートメントを実行してTable APIを呼び出す必要があります。 ただし、サポートされるJDBCドライバは、Table API (DDL) メソッド用にハードコードされています。 ClickHouseはサポートされていません。 以下のセクションでは、Flinkによって処理されたデータを、Flink-jdbcのflink 1.10.1とFlink-connector-jdbcのflink 1.11.0の両方のApsaraDB for ClickHouseクラスターに書き込む方法について説明します。
Flink 1.10.1 + flink-jdbc
Flink 1.10.1以前の場合、flink-jdbcおよびTable APIメソッドを使用して、ApsaraDB For ClickHouseクラスターにデータを書き込む必要があります。 次の例では、MavenとFlink 1.10.1を使用します。
mvn archetype:generateコマンドを実行して、プロジェクトを作成します。 このプロセスでは、グループidやアーティファクトidなどの情報を入力する必要があります。
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.10.1を編集します。Edit the
<dependencies />pom.xmlファイルのセクションで依存関係を追加します。// Add the dependencies of Flink Table API. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> // Add the dependencies of Flink JDBC and ClickHouse JDBC Driver. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency>データを書き込むプログラムファイルを作成します。
サンプルコードでは、
CsvTableSourceを使用してCSVファイルにデータを読み取り、テーブルソースを生成します。JDBCAppendTableSinkは、ClickHouse Sinkにデータを書き込むために使用されます。説明ClickHouseは、各挿入操作のレイテンシが高くなります。 したがって、
BatchSizeを設定してデータをバッチで挿入し、パフォーマンスを向上させる必要があります。JDBCAppendTableSink実装では、最後のバッチのデータサイズが
BatchSize値未満の場合、残りのデータは挿入されません。
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.common.typeinfo.TypeInformation object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // Set the batch size. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // Write data to ClickHouse Sink. val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(CkJdbcUrl) .setUsername(CkUsername) .setPassword(CkPassword) .setQuery(insertIntoCkSql) .setBatchSize(BatchSize) .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT) .build() tEnv.registerTableSink( "sink", Array("name", "grade", "rate"), Array(Types.STRING, Types.LONG, Types.FLOAT), sink ) tEnv.insertInto(resultTable, "sink") env.execute("Flink Table API to ClickHouse Example") } }パラメーターの説明:
SourceCsvPath: ソースCSVファイルのパス。CkJdbcUrl: ターゲットApsaraDB for ClickHouseクラスターのエンドポイント。CkUsername: ターゲットApsaraDB for ClickHouseクラスターのユーザー名。CkPassword: ターゲットApsaraDB for ClickHouseクラスターのパスワード。
ファイルをコンパイルして実行します。
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
Flink 1.11.0 + flink-connector-jdbc
Flink 1.11.0以降の場合、flink-connector-jdbcおよびDataStreamを使用して、ApsaraDB For ClickHouseクラスターにデータを書き込む必要があります。 次の例では、MavenとFlink 1.11.0を使用します。
mvn archetype:generateコマンドを実行して、プロジェクトを作成します。 このプロセスでは、グループidやアーティファクトidなどの情報を入力する必要があります。
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.11.0を編集します。Edit the
<dependencies />pom.xmlファイルのセクションで依存関係を追加します。// Add the dependencies of Flink Table API. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> // Add the dependencies of Flink JDBC Connector and ClickHouse JDBC Driver. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency>データを書き込むプログラムファイルを作成します。
サンプルコードでは、
CsvTableSourceを使用してCSVファイルにデータを読み取り、テーブルソースを生成します。TableEnvironment.toAppendStreamは、テーブルをDataStreamに変換するために使用されます。JdbcSinkは、ApsaraDB for ClickHouseクラスターにデータを書き込むために使用されます。説明ClickHouseは、各挿入操作のレイテンシが高くなります。 したがって、
BatchSizeを設定してデータをバッチで挿入し、パフォーマンスを向上させる必要があります。flink-connector-jdbcの場合、JdbcSinkがScala APIを使用して呼び出されると、ラムダ関数でシリアル化が行われます。 手動インターフェイス実装を使用して、
クラスCkSinkBuilderのJDBCステートメントビルド関数を渡す必要があります。class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } }
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.connector.jdbc._ import java.sql.PreparedStatement // Use manual interface implementation to pass in the JDBC Statement build function. class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } } object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // Set the batch size. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") // Convert the table to DataStream. val resultDataStream = tEnv.toAppendStream[(String, Long, Float)](resultTable) val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // Write data to ClickHouse JDBC Sink. resultDataStream.addSink( JdbcSink.sink[(String, Long, Float)]( insertIntoCkSql, new CkSinkBuilder, new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .withUrl(CkJdbcUrl) .withUsername(CkUsername) .withPassword(CkPassword) .build() ) ) env.execute("Flink DataStream to ClickHouse Example") } }パラメーターの説明:
SourceCsvPath: ソースCSVファイルのパス。CkJdbcUrl: ターゲットApsaraDB for ClickHouseクラスターのエンドポイント。CkUsername: ターゲットApsaraDB for ClickHouseクラスターのユーザー名。CkPassword: ターゲットApsaraDB for ClickHouseクラスターのパスワード。
ファイルをコンパイルして実行します。
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar