All Products
Search
Document Center

ApsaraDB for ClickHouse:Import using JDBC connector

Last Updated:Mar 28, 2026

Use the Java Database Connectivity (JDBC) connector to write data from Apache Flink to ApsaraDB for ClickHouse. The steps differ depending on your Flink version.

How the connector versions differ

Flink refactored its JDBC connector in version 1.11.0, splitting it into two separate packages with different API support:

APIflink-jdbc (Flink ≤ 1.10.1)flink-connector-jdbc (Flink ≥ 1.11.0)
DataStreamNot supportedSupported
Table API (Legacy)SupportedNot supported
Table API (DDL)Not supportedNot supported

flink-connector-jdbc dropped the legacy Table API. Although it supports Table DDL, the DDL method hard-codes its JDBC drivers and does not include ClickHouse. Use the DataStream API instead.

Prerequisites

Before you begin, make sure you have:

  • A running ApsaraDB for ClickHouse cluster

  • Maven installed

  • The ClickHouse cluster host, port, database name, username, and password

Flink 1.10.1 + flink-jdbc

In Flink 1.10.1 and earlier, write data to ClickHouse using flink-jdbc and the Table API.

How it works

The program reads a CSV file through CsvTableSource, registers it as a table source, selects the columns to write, and then pushes the data to ClickHouse through JDBCAppendTableSink.

ClickHouse has high latency for single inserts. Set BatchSize to write data in bulk. If the final batch has fewer records than BatchSize, those records are not written.

Set up the project

  1. Generate a Maven project:

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.10.1

    Enter your group-id, artifact-id, and other details when prompted.

  2. Add the following dependencies to the <dependencies /> section of pom.xml:

    <!-- Flink Table API -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- Flink JDBC and ClickHouse JDBC driver -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.2.4</version>
    </dependency>
  3. Create the program file:

    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.common.typeinfo.TypeInformation
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"     // Path to your source CSV file
        val CkJdbcUrl =
          "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"  // ClickHouse JDBC URL
        val CkUsername = "<YOUR-USERNAME>"           // ClickHouse username
        val CkPassword = "<YOUR-PASSWORD>"           // ClickHouse password
        val BatchSize = 500                          // Records per bulk insert
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
        // Write data to the ClickHouse sink
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")  // mandatory
          .setDBUrl(CkJdbcUrl)                                      // mandatory
          .setUsername(CkUsername)                                  // mandatory
          .setPassword(CkPassword)                                  // mandatory
          .setQuery(insertIntoCkSql)                                // mandatory
          .setBatchSize(BatchSize)                                  // set to perform bulk inserts
          .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT) // mandatory
          .build()
    
        tEnv.registerTableSink(
          "sink",
          Array("name", "grade", "rate"),
          Array(Types.STRING, Types.LONG, Types.FLOAT),
          sink
        )
    
        tEnv.insertInto(resultTable, "sink")
    
        env.execute("Flink Table API to ClickHouse Example")
      }
    }

    Replace the following placeholders:

    PlaceholderDescription
    <YOUR-PATH-TO-TEST-CSV>Path to the source CSV file
    <clickhouse-host>Host of the ApsaraDB for ClickHouse cluster
    <port>Port of the ApsaraDB for ClickHouse cluster
    <database>Target database name
    <YOUR-USERNAME>Username for the ApsaraDB for ClickHouse cluster
    <YOUR-PASSWORD>Password for the ApsaraDB for ClickHouse cluster
  4. Build and run the program:

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink 1.11.0 + flink-connector-jdbc

In Flink 1.11.0 and later, write data to ClickHouse using flink-connector-jdbc and the DataStream API.

How it works

The program reads a CSV file through CsvTableSource, converts the table into a DataStream using TableEnvironment.toAppendStream, and then writes each record to ClickHouse through JdbcSink.

ClickHouse has high latency for single inserts. Set BatchSize to write data in bulk. If the final batch has fewer records than BatchSize, those records are not written.
Important

The current version of flink-connector-jdbc has a serialization issue with lambda functions when calling JdbcSink from the Scala API. Manually implement the JdbcStatementBuilder interface instead of passing a lambda. The CkSinkBuilder class in the example below demonstrates this pattern.

Set up the project

  1. Generate a Maven project:

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.11.0

    Enter your group-id, artifact-id, and other details when prompted.

  2. Add the following dependencies to the <dependencies /> section of pom.xml:

    <!-- Flink Table API -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- Flink JDBC connector and ClickHouse JDBC driver -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.2.4</version>
    </dependency>
  3. Create the program file:

    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.connector.jdbc._
    import java.sql.PreparedStatement
    
    // Manually implement JdbcStatementBuilder to work around the Scala lambda serialization issue
    class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
      def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
        ps.setString(1, v._1)
        ps.setLong(2, v._2)
        ps.setFloat(3, v._3)
      }
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"     // Path to your source CSV file
        val CkJdbcUrl =
          "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"  // ClickHouse JDBC URL
        val CkUsername = "<YOUR-USERNAME>"           // ClickHouse username
        val CkPassword = "<YOUR-PASSWORD>"           // ClickHouse password
        val BatchSize = 500                          // Records per bulk insert
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
        // Convert the table into a DataStream
        val resultDataStream =
          tEnv.toAppendStream[(String, Long, Float)](resultTable)
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
        // Write data to the ClickHouse JDBC sink
        resultDataStream.addSink(
          JdbcSink.sink[(String, Long, Float)](
            insertIntoCkSql,                           // mandatory
            new CkSinkBuilder,                         // mandatory
            new JdbcExecutionOptions.Builder()
              .withBatchSize(BatchSize)                // set to perform bulk inserts
              .build(),                                // optional
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")  // mandatory
              .withUrl(CkJdbcUrl)                      // mandatory
              .withUsername(CkUsername)                // mandatory
              .withPassword(CkPassword)                // mandatory
              .build()
          )
        )
    
        env.execute("Flink DataStream to ClickHouse Example")
      }
    }

    Replace the following placeholders:

    PlaceholderDescription
    <YOUR-PATH-TO-TEST-CSV>Path to the source CSV file
    <clickhouse-host>Host of the ApsaraDB for ClickHouse cluster
    <port>Port of the ApsaraDB for ClickHouse cluster
    <database>Target database name
    <YOUR-USERNAME>Username for the ApsaraDB for ClickHouse cluster
    <YOUR-PASSWORD>Password for the ApsaraDB for ClickHouse cluster
  4. Build and run the program:

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar