All Products
Search
Document Center

ApsaraDB for ClickHouse:Impor menggunakan konektor JDBC

Last Updated:Mar 10, 2026

Topik ini menjelaskan cara menggunakan konektor Java Database Connectivity (JDBC) untuk menulis data dari Flink ke ClickHouse. Metode yang dijelaskan berlaku untuk berbagai versi Flink.

Informasi latar belakang

Flink melakukan refaktorisasi signifikan terhadap konektor JDBC-nya pada versi 1.11.0:

  • Sebelum refaktorisasi, pada versi 1.10.1 dan sebelumnya, nama paketnya adalah flink-jdbc.

  • Setelah refaktorisasi, pada versi 1.11.0 dan seterusnya, nama paketnya adalah flink-connector-jdbc.

Tabel berikut menunjukkan bagaimana masing-masing paket mendukung penulisan data ke sink ClickHouse.

Nama API

flink-jdbc

flink-connector-jdbc

DataStream

Tidak didukung

Dukungan

Table API (Legacy)

Dukungan

Tidak didukung

Table API (DDL)

Tidak didukung

Tidak didukung

Paket flink-connector-jdbc sepenuhnya menghapus dukungan untuk Table API legacy. Anda hanya dapat memanggil Table API menggunakan Data Definition Language (DDL). Namun, metode Table DDL menyematkan driver JDBC yang didukung secara langsung dalam kodenya dan tidak mencakup dukungan untuk ClickHouse. Topik ini menggunakan contoh Flink 1.10.1 dengan flink-jdbc dan Flink 1.11.0 dengan flink-connector-jdbc untuk menunjukkan cara menulis data dari Flink ke ClickHouse.

Flink 1.10.1 + flink-jdbc

Pada Flink 1.10.1 dan versi sebelumnya, Anda harus menggunakan paket flink-jdbc dan Table API untuk menulis data ke ClickHouse. Bagian ini memberikan contoh yang menggunakan Maven dan Flink 1.10.1.

  1. Buat proyek menggunakan perintah mvn archetype:generate. Selama proses pembuatan, masukkan group-id, artifact-id, dan informasi lainnya saat diminta.

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.10.1
  2. Edit bagian <dependencies /> dalam file pom.xml untuk menambahkan dependensi yang diperlukan.

            <!--//Tambahkan dependensi untuk Flink Table API -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    
            <!--//Tambahkan dependensi untuk Flink JDBC dan ClickHouse JDBC Driver -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
                            
  3. Buat file program untuk menulis data.

    Program contoh menggunakan CsvTableSource untuk membaca file CSV dan membuat sumber tabel. Program tersebut kemudian menggunakan JDBCAppendTableSink untuk menulis data ke sink ClickHouse.

    Catatan
    • Karena ClickHouse memiliki latensi tinggi untuk operasi insert tunggal, atur BatchSize untuk melakukan bulk insert dan meningkatkan performa.

    • Dalam implementasi JDBCAppendTableSink, jika jumlah catatan dalam batch terakhir kurang dari BatchSize, data yang tersisa tidak dimasukkan.

    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.common.typeinfo.TypeInformation
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl =
          "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // Atur ukuran batch Anda
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // Menulis data ke sink ClickHouse 
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(CkJdbcUrl)
          .setUsername(CkUsername)
          .setPassword(CkPassword)
          .setQuery(insertIntoCkSql)
          .setBatchSize(BatchSize)
          .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
          .build()
    
        tEnv.registerTableSink(
          "sink",
          Array("name", "grade", "rate"),
          Array(Types.STRING, Types.LONG, Types.FLOAT),
          sink
        )
    
        tEnv.insertInto(resultTable, "sink")
    
        env.execute("Flink Table API to ClickHouse Example")
      }
    }

    Parameter:

    • SourceCsvPath: Jalur file CSV sumber.

    • CkJdbcUrl: Alamat kluster ClickHouse tujuan.

    • CkUsername: Username untuk kluster ClickHouse tujuan.

    • CkPassword: Password untuk kluster ApsaraDB for ClickHouse tujuan.

  4. Kompilasi dan jalankan program.

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink 1.11.0 + flink-connector-jdbc

Pada Flink 1.11.0 dan versi selanjutnya, Anda harus menggunakan paket flink-connector-jdbc dan DataStream API untuk menulis data ke ClickHouse. Bagian ini memberikan contoh yang menggunakan Maven dan Flink 1.11.0.

  1. Buat proyek menggunakan perintah mvn archetype:generate. Saat diminta, masukkan group-id, artifact-id, dan informasi lain yang diperlukan.

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.11.0
  2. Edit bagian <dependencies /> dalam file pom.xml untuk menambahkan dependensi yang diperlukan.

            <!--//Tambahkan dependensi untuk Flink Table API -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <!--//Tambahkan dependensi untuk Flink JDBC Connector dan ClickHouse JDBC Driver -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
  3. Buat file program untuk menulis data.

    Program contoh menggunakan CsvTableSource untuk membaca file CSV dan membuat sumber tabel. Program tersebut kemudian mengubah tabel menjadi DataStream menggunakan TableEnvironment.toAppendStream. Terakhir, program menggunakan JdbcSink untuk menulis data ke ClickHouse.

    Catatan
    • Karena ClickHouse memiliki latensi tinggi untuk operasi insert tunggal, atur BatchSize untuk melakukan bulk insert dan meningkatkan performa.

    • Pada versi saat ini dari flink-connector-jdbc, terjadi masalah serialisasi dengan fungsi lambda ketika Anda memanggil JdbcSink menggunakan Scala API. Untuk mengatasi masalah ini, Anda harus mengimplementasikan antarmuka secara manual untuk meneruskan fungsi pembuat pernyataan JDBC. Kelas CkSinkBuilder merupakan contoh implementasi tersebut.

      class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
        def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
          ps.setString(1, v._1)
          ps.setLong(2, v._2)
          ps.setFloat(3, v._3)
        }
      }
    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.connector.jdbc._
    import java.sql.PreparedStatement
    
    // Implementasikan antarmuka secara manual untuk meneruskan fungsi pembuat pernyataan JDBC
    class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
      def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
        ps.setString(1, v._1)
        ps.setLong(2, v._2)
        ps.setFloat(3, v._3)
      }
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // Atur ukuran batch Anda
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
    // Mengubah tabel menjadi DataStream
        val resultDataStream =
          tEnv.toAppendStream[(String, Long, Float)](resultTable)
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // Menulis data ke sink JDBC ClickHouse
        resultDataStream.addSink(
          JdbcSink.sink[(String, Long, Float)](
            insertIntoCkSql,
            new CkSinkBuilder,
            new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
              .withUrl(CkJdbcUrl)
              .withUsername(CkUsername)
              .withPassword(CkPassword)
              .build()
          )
        )
    
        env.execute("Flink DataStream to ClickHouse Example")
      }
    }

    Parameter:

    • SourceCsvPath: Jalur file CSV sumber.

    • CkJdbcUrl: Alamat kluster ClickHouse tujuan.

    • CkUsername: Username untuk kluster ClickHouse tujuan.

    • CkPassword: Password untuk kluster ClickHouse tujuan.

  4. Kompilasi dan jalankan program.

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar