Use the Java Database Connectivity (JDBC) connector to write data from Apache Flink to ApsaraDB for ClickHouse. The steps differ depending on your Flink version.
How the connector versions differ
Flink refactored its JDBC connector in version 1.11.0, splitting it into two separate packages with different API support:
| API | flink-jdbc (Flink ≤ 1.10.1) | flink-connector-jdbc (Flink ≥ 1.11.0) |
|---|---|---|
| DataStream | Not supported | Supported |
| Table API (Legacy) | Supported | Not supported |
| Table API (DDL) | Not supported | Not supported |
flink-connector-jdbc dropped the legacy Table API. Although it supports Table DDL, the DDL method hard-codes its JDBC drivers and does not include ClickHouse. Use the DataStream API instead.
Prerequisites
Before you begin, make sure you have:
A running ApsaraDB for ClickHouse cluster
Maven installed
The ClickHouse cluster host, port, database name, username, and password
Flink 1.10.1 + flink-jdbc
In Flink 1.10.1 and earlier, write data to ClickHouse using flink-jdbc and the Table API.
How it works
The program reads a CSV file through CsvTableSource, registers it as a table source, selects the columns to write, and then pushes the data to ClickHouse through JDBCAppendTableSink.
ClickHouse has high latency for single inserts. SetBatchSizeto write data in bulk. If the final batch has fewer records thanBatchSize, those records are not written.
Set up the project
Generate a Maven project:
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.10.1Enter your
group-id,artifact-id, and other details when prompted.Add the following dependencies to the
<dependencies />section ofpom.xml:<!-- Flink Table API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink JDBC and ClickHouse JDBC driver --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency>Create the program file:
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.common.typeinfo.TypeInformation object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" // Path to your source CSV file val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" // ClickHouse JDBC URL val CkUsername = "<YOUR-USERNAME>" // ClickHouse username val CkPassword = "<YOUR-PASSWORD>" // ClickHouse password val BatchSize = 500 // Records per bulk insert val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // Write data to the ClickHouse sink val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") // mandatory .setDBUrl(CkJdbcUrl) // mandatory .setUsername(CkUsername) // mandatory .setPassword(CkPassword) // mandatory .setQuery(insertIntoCkSql) // mandatory .setBatchSize(BatchSize) // set to perform bulk inserts .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT) // mandatory .build() tEnv.registerTableSink( "sink", Array("name", "grade", "rate"), Array(Types.STRING, Types.LONG, Types.FLOAT), sink ) tEnv.insertInto(resultTable, "sink") env.execute("Flink Table API to ClickHouse Example") } }Replace the following placeholders:
Placeholder Description <YOUR-PATH-TO-TEST-CSV>Path to the source CSV file <clickhouse-host>Host of the ApsaraDB for ClickHouse cluster <port>Port of the ApsaraDB for ClickHouse cluster <database>Target database name <YOUR-USERNAME>Username for the ApsaraDB for ClickHouse cluster <YOUR-PASSWORD>Password for the ApsaraDB for ClickHouse cluster Build and run the program:
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
Flink 1.11.0 + flink-connector-jdbc
In Flink 1.11.0 and later, write data to ClickHouse using flink-connector-jdbc and the DataStream API.
How it works
The program reads a CSV file through CsvTableSource, converts the table into a DataStream using TableEnvironment.toAppendStream, and then writes each record to ClickHouse through JdbcSink.
ClickHouse has high latency for single inserts. SetBatchSizeto write data in bulk. If the final batch has fewer records thanBatchSize, those records are not written.
The current version of flink-connector-jdbc has a serialization issue with lambda functions when calling JdbcSink from the Scala API. Manually implement the JdbcStatementBuilder interface instead of passing a lambda. The CkSinkBuilder class in the example below demonstrates this pattern.
Set up the project
Generate a Maven project:
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.11.0Enter your
group-id,artifact-id, and other details when prompted.Add the following dependencies to the
<dependencies />section ofpom.xml:<!-- Flink Table API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink JDBC connector and ClickHouse JDBC driver --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency>Create the program file:
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.connector.jdbc._ import java.sql.PreparedStatement // Manually implement JdbcStatementBuilder to work around the Scala lambda serialization issue class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } } object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" // Path to your source CSV file val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" // ClickHouse JDBC URL val CkUsername = "<YOUR-USERNAME>" // ClickHouse username val CkPassword = "<YOUR-PASSWORD>" // ClickHouse password val BatchSize = 500 // Records per bulk insert val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") // Convert the table into a DataStream val resultDataStream = tEnv.toAppendStream[(String, Long, Float)](resultTable) val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin // Write data to the ClickHouse JDBC sink resultDataStream.addSink( JdbcSink.sink[(String, Long, Float)]( insertIntoCkSql, // mandatory new CkSinkBuilder, // mandatory new JdbcExecutionOptions.Builder() .withBatchSize(BatchSize) // set to perform bulk inserts .build(), // optional new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") // mandatory .withUrl(CkJdbcUrl) // mandatory .withUsername(CkUsername) // mandatory .withPassword(CkPassword) // mandatory .build() ) ) env.execute("Flink DataStream to ClickHouse Example") } }Replace the following placeholders:
Placeholder Description <YOUR-PATH-TO-TEST-CSV>Path to the source CSV file <clickhouse-host>Host of the ApsaraDB for ClickHouse cluster <port>Port of the ApsaraDB for ClickHouse cluster <database>Target database name <YOUR-USERNAME>Username for the ApsaraDB for ClickHouse cluster <YOUR-PASSWORD>Password for the ApsaraDB for ClickHouse cluster Build and run the program:
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar