This topic describes how to use JDBC connector to write Flink data to an ApsaraDB for ClickHouse cluster.

Background information

In Flink 1.11.0, the JDBC connector underwent major restructuring.

  • In Flink 1.10.1 and earlier, the package name is flink-jdbc.
  • In Flink 1.11.0 and later, the package name is flink-connector-jdbc.

The following table lists the methods that can be used to write data to ClickHouse Sink before and after refactoring.

Operation name flink-jdbc flink-connector-jdbc
DataStream Not supported Supported
Table API (Legacy) Supported Not supported
Table API (DDL) Not supported Not supported

flink-connector-jdbc do not support the Table API (Legacy) method and you must execute the DDL statement to call Table API. However, the supported JDBC drivers are hard coded for the Table API (DDL) method. ClickHouse is not supported.

The following sections describe how to write Flink data to an ApsaraDB for ClickHouse cluster for both Flink 1.10.1 with flink-jdbc and Flink 1.11.0 with flink-connector-jdbc.

Flink 1.10.1 + flink-jdbc

For Flink 1.10.1 and earlier, you must use flink-jdbc and the Table API method. Maven and Flink 1.10.1 are used in the following example.

  1. Run the mvn archetype:generate command to create a project. You must enter information such as group-id and artifact-id during this process.
    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.10.1
  2. Edit the <dependencies /> section in the pom.xml file to add dependencies.
    // Add the dependencies of Flink Table API.
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    // Add the dependencies of Flink JDBC and Clickhouse JDBC Driver.
    
    <dependency>
            <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                </dependency>
            <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
                </dependency>
            </dependency>
                            
  3. Create a program file to write data.
    In the sample code, CsvTableSource is used to read data to the CSV file to generate Table Source. JDBCAppendTableSink is used to write data to ClickHouse Sink.
    Note
    • ClickHouse has a high latency for each insert operation, so you must set BatchSize to insert data in batches and improve performance.
    • In the JDBCAppendTableSink implementation, if the data size of the last batch is less than the BatchSize value, the remaining data will not be inserted.
    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.common.typeinfo.TypeInformation
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/your-path-to-test-csv/source.csv"
        val CkJdbcUrl =
          "jdbc:clickhouse://clickhouse-host>:port>/database>"
        val CkUsername = "your-username>"
        val CkPassword = "your-password>"
        val BatchSize = 500 // Set the batch size.
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // Write data to ClickHouse Sink.
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(CkJdbcUrl)
          .setUsername(CkUsername)
          .setPassword(CkPassword)
          .setQuery(insertIntoCkSql)
          .setBatchSize(BatchSize)
          .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
          .build()
    
        tEnv.registerTableSink(
          "sink",
          Array("name", "grade", "rate"),
          Array(Types.STRING, Types.LONG, Types.FLOAT),
          sink
        )
    
        tEnv.insertInto(resultTable, "sink")
    
        env.execute("Flink Table API to ClickHouse Example")
      }
    }
    Parameters in the code:
    • SourceCsvPath: the path of the source CSV file.
    • CkJdbcUrl: the endpoint of the destination ApsaraDB for ClickHouse cluster.
    • CkUsername: the username of the destination ApsaraDB for ClickHouse cluster.
    • CkPassword: the password of the destination ApsaraDB for ClickHouse cluster.
  4. Compile and run the file.
    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink 1.11.0 + flink-connector-jdbc

For Flink 1.11.0 and later, you must use flink-connector-jdbc and the DataStream method. Maven and Flink 1.11.0 are used in the following example.

  1. Run the mvn archetype:generate command to create a project. You must enter information such as group-id and artifact-id during this process.
    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.11.0
  2. Edit the <dependencies /> section in the pom.xml file to add dependencies.
    // Add the dependencies of Flink Table API.
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    // Add the dependencies of Flink JDBC Connector and Clickhouse JDBC Driver.
    
    <dependency>
            <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                </dependency>
            <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
                </dependency>
            </dependency>
  3. Create a program file to write data.
    In the sample code, CsvTableSource is used to read data to the CSV file to generate Table Source. TableEnvironment.toAppendStream is used to convert the table to DataStream. JdbcSink is used to write data to the ApsaraDB for ClickHouse cluster.
    Note
    • ClickHouse has a high latency for each insert operation, so you must set BatchSize to insert data in batches and improve performance.
    • For flink-connector-jdbc, serialization occurs in lambda functions when JdbcSink is invoked by using Scala API. Manual interface implementation must be used to pass in the JDBC Statement build function of class CkSinkBuilder.
      class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
        def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3)
          }
      }_1)
          ps.setLong(2, v._2)
          ps.setFloat(3, v._3)
        }
      }
    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.connector.jdbc._
    import java.sql.PreparedStatement
    
    //Use manual interface implementation to pass in the JDBC STATEMENT BUILD function.
    class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
      def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
        ps.setString(1, v._1)
        ps.setLong(2, v._2)
        ps.setFloat(3, v._3)
      }
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/your-path-to-test-csv>/source.csv"
        val CkJdbcUrl = "jdbc:clickhouse://clickhouse-host>:port>/database>"
        val CkUsername = "your-username>"
        val CkPassword = "your-password>"
        val BatchSize = 500 // Set the batch size.
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
    // Convert the table to DataStream.
        val resultDataStream =
          tEnv.toAppendStream[(String, Long, Float)](resultTable)
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // Write data to ClickHouse JDBC Sink.
        resultDataStream.addSink(
          JdbcSink.sink[(String, Long, Float)](
            insertIntoCkSql,
            new CkSinkBuilder,
            new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
              .withUrl(CkJdbcUrl)
              .withUsername(CkUsername)
              .withPassword(CkPassword)
              .build()
          )
        )
    
        env.execute("Flink DataStream to ClickHouse Example")
      }
    }
    Parameters in the code:
    • SourceCsvPath: the path of the source CSV file.
    • CkJdbcUrl: the endpoint of the destination ApsaraDB for ClickHouse cluster.
    • CkUsername: the username of the destination ApsaraDB for ClickHouse cluster.
    • CkPassword: the password of the destination ApsaraDB for ClickHouse cluster.
  4. Compile and run the file.
    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar