All Products
Search
Document Center

ApsaraDB for ClickHouse:Use a JDBC connector to write data to an ApsaraDB for ClickHouse cluster

Last Updated:Jan 26, 2024

This topic describes how to use a Java Database Connectivity (JDBC) connector to write the data processed by Flink of different versions to an ApsaraDB for ClickHouse cluster, when you need to import the data processed by Flink into the ApsaraDB for ClickHouse cluster.

Background information

In Flink 1.11.0, the JDBC connector underwent major refactoring.

  • In Flink 1.10.1 and earlier, the package name is flink-jdbc.

  • In Flink 1.11.0 and later, the package name is flink-connector-jdbc.

The following table lists the methods that can be used to write data to ClickHouse Sink before and after refactoring.

Operation name

flink-jdbc

flink-connector-jdbc

DataStream

Not supported

Supported

Table API (Legacy)

Supported

Not supported

Table API (DDL)

Not supported

Not supported

flink-connector-jdbc does not support the Table API (Legacy) method, and you must execute a DDL statement to call Table API. However, the supported JDBC drivers are hard coded for the Table API (DDL) method. ClickHouse is not supported. The following sections describe how to write the data processed by Flink to an ApsaraDB for ClickHouse cluster for both Flink 1.10.1 with flink-jdbc and Flink 1.11.0 with flink-connector-jdbc.

Flink 1.10.1 + flink-jdbc

For Flink 1.10.1 and earlier, you must use flink-jdbc and the Table API method to write data to an ApsaraDB for ClickHouse cluster. Maven and Flink 1.10.1 are used in the following example.

  1. Run the mvn archetype:generate command to create a project. You must enter information such as group-id and artifact-id during this process.

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.10.1
  2. Edit the <dependencies /> section in the pom.xml file to add dependencies.

    // Add the dependencies of Flink Table API.
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    // Add the dependencies of Flink JDBC and ClickHouse JDBC Driver.
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
                            
  3. Create a program file to write data.

    In the sample code, CsvTableSource is used to read data to the CSV file to generate Table Source. JDBCAppendTableSink is used to write data to ClickHouse Sink.

    Note
    • ClickHouse has a high latency for each insert operation. Therefore, you must set BatchSize to insert data in batches and improve performance.

    • In the JDBCAppendTableSink implementation, if the data size of the last batch is less than the BatchSize value, the remaining data will not be inserted.

    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.common.typeinfo.TypeInformation
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl =
          "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // Set the batch size.
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // Write data to ClickHouse Sink. 
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(CkJdbcUrl)
          .setUsername(CkUsername)
          .setPassword(CkPassword)
          .setQuery(insertIntoCkSql)
          .setBatchSize(BatchSize)
          .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
          .build()
    
        tEnv.registerTableSink(
          "sink",
          Array("name", "grade", "rate"),
          Array(Types.STRING, Types.LONG, Types.FLOAT),
          sink
        )
    
        tEnv.insertInto(resultTable, "sink")
    
        env.execute("Flink Table API to ClickHouse Example")
      }
    }

    Parameter description:

    • SourceCsvPath: the path of the source CSV file.

    • CkJdbcUrl: the endpoint of the destination ApsaraDB for ClickHouse cluster.

    • CkUsername: the username of the destination ApsaraDB for ClickHouse cluster.

    • CkPassword: the password of the destination ApsaraDB for ClickHouse cluster.

  4. Compile and run the file.

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink 1.11.0 + flink-connector-jdbc

For Flink 1.11.0 and later, you must use flink-connector-jdbc and DataStream to write data to an ApsaraDB for ClickHouse cluster. Maven and Flink 1.11.0 are used in the following example.

  1. Run the mvn archetype:generate command to create a project. You must enter information such as group-id and artifact-id during this process.

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.11.0
  2. Edit the <dependencies /> section in the pom.xml file to add dependencies.

    // Add the dependencies of Flink Table API.
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    // Add the dependencies of Flink JDBC Connector and ClickHouse JDBC Driver.
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
  3. Create a program file to write data.

    In the sample code, CsvTableSource is used to read data to the CSV file to generate Table Source. TableEnvironment.toAppendStream is used to convert the table to DataStream. JdbcSink is used to write data to the ApsaraDB for ClickHouse cluster.

    Note
    • ClickHouse has a high latency for each insert operation. Therefore, you must set BatchSize to insert data in batches and improve performance.

    • For flink-connector-jdbc, serialization occurs in lambda functions when JdbcSink is invoked by using Scala API. Manual interface implementation must be used to pass in the JDBC Statement build function of class CkSinkBuilder.

      class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
        def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
          ps.setString(1, v._1)
          ps.setLong(2, v._2)
          ps.setFloat(3, v._3)
        }
      }
    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.connector.jdbc._
    import java.sql.PreparedStatement
    
    // Use manual interface implementation to pass in the JDBC Statement build function.
    class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
      def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
        ps.setString(1, v._1)
        ps.setLong(2, v._2)
        ps.setFloat(3, v._3)
      }
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // Set the batch size.
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
    // Convert the table to DataStream.
        val resultDataStream =
          tEnv.toAppendStream[(String, Long, Float)](resultTable)
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // Write data to ClickHouse JDBC Sink.
        resultDataStream.addSink(
          JdbcSink.sink[(String, Long, Float)](
            insertIntoCkSql,
            new CkSinkBuilder,
            new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
              .withUrl(CkJdbcUrl)
              .withUsername(CkUsername)
              .withPassword(CkPassword)
              .build()
          )
        )
    
        env.execute("Flink DataStream to ClickHouse Example")
      }
    }

    Parameter description:

    • SourceCsvPath: the path of the source CSV file.

    • CkJdbcUrl: the endpoint of the destination ApsaraDB for ClickHouse cluster.

    • CkUsername: the username of the destination ApsaraDB for ClickHouse cluster.

    • CkPassword: the password of the destination ApsaraDB for ClickHouse cluster.

  4. Compile and run the file.

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar