すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for ClickHouse:JDBC コネクタを使用したインポート

最終更新日:Mar 10, 2026

本トピックでは、Java Database Connectivity (JDBC) コネクタを用いて Flink から ClickHouse へデータを書き込む方法について説明します。記載する手順は、さまざまなバージョンの Flink に適用可能です。

背景情報

Flink は、バージョン 1.11.0 で JDBC コネクタを大幅にリファクタリングしました。

  • リファクタリング前(バージョン 1.10.1 以前)では、パッケージ名は flink-jdbc でした。

  • リファクタリング後(バージョン 1.11.0 以降)では、パッケージ名は flink-connector-jdbc です。

以下の表は、各パッケージが ClickHouse sink へのデータ書き込みをどの程度サポートしているかを示しています。

API 名

flink-jdbc

flink-connector-jdbc

DataStream

非対応

対応

Table API(レガシ)

サポート

非対応

Table API(DDL)

非対応

非対応

flink-connector-jdbc パッケージでは、レガシ Table API のサポートが完全に削除されています。Table API を利用するには、Data Definition Language(DDL)のみを使用する必要があります。ただし、Table DDL 方式では、サポートされる JDBC ドライバーがハードコードされており、ClickHouse は含まれていません。本トピックでは、Flink 1.10.1(flink-jdbc 使用)および Flink 1.11.0(flink-connector-jdbc 使用)を例として、Flink から ClickHouse へのデータ書き込み方法を解説します。

Flink 1.10.1 + flink-jdbc

Flink 1.10.1 およびそれ以前のバージョンでは、flink-jdbc パッケージと Table API を用いて ClickHouse へデータを書き込む必要があります。本セクションでは、Maven および Flink 1.10.1 を用いた例を示します。

  1. mvn archetype:generate コマンドを使用してプロジェクトを作成します。作成プロセス中に、プロンプトに従って group-id、artifact-id などの情報を入力します。

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.10.1
  2. pom.xml ファイル内の <dependencies /> セクションを編集し、必要な依存関係を追加します。

            <!--//Flink Table API の依存関係を追加 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    
            <!--//Flink JDBC および ClickHouse JDBC ドライバーの依存関係を追加 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
                            
  3. データ書き込み用のプログラムファイルを作成します。

    この例では、CsvTableSource を使用して CSV ファイルを読み込み、テーブルソースを作成します。その後、JDBCAppendTableSink を使用してデータを ClickHouse sink へ書き込みます。

    説明
    • ClickHouse では単一の INSERT 操作に高いレイテンシーが発生するため、BatchSize を設定して一括挿入を行い、パフォーマンスを向上させることを推奨します。

    • JDBCAppendTableSink の実装では、最終バッチのレコード数が BatchSize 未満の場合、残りのデータは挿入されません。

    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.common.typeinfo.TypeInformation
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl =
          "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // バッチサイズを設定してください
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // ClickHouse sink へデータを書き込みます 
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(CkJdbcUrl)
          .setUsername(CkUsername)
          .setPassword(CkPassword)
          .setQuery(insertIntoCkSql)
          .setBatchSize(BatchSize)
          .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
          .build()
    
        tEnv.registerTableSink(
          "sink",
          Array("name", "grade", "rate"),
          Array(Types.STRING, Types.LONG, Types.FLOAT),
          sink
        )
    
        tEnv.insertInto(resultTable, "sink")
    
        env.execute("Flink Table API から ClickHouse への書き込み(サンプル)")
      }
    }

    パラメーター:

    • SourceCsvPath:ソース CSV ファイルのパス。

    • CkJdbcUrl:送信先 ClickHouse クラスターのアドレス。

    • CkUsername:送信先 ClickHouse クラスターのユーザー名。

    • CkPassword:ApsaraDB for ClickHouse クラスターのパスワード。

  4. プログラムをコンパイルして実行します。

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink 1.11.0 + flink-connector-jdbc

Flink 1.11.0 およびそれ以降のバージョンでは、flink-connector-jdbc パッケージと DataStream API を用いて ClickHouse へデータを書き込む必要があります。本セクションでは、Maven および Flink 1.11.0 を用いた例を示します。

  1. mvn archetype:generate コマンドを使用してプロジェクトを作成します。プロンプトに従って、group-id、artifact-id などの情報を入力します。

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.11.0
  2. pom.xml ファイル内の <dependencies /> セクションを編集し、必要な依存関係を追加します。

            <!--//Flink Table API の依存関係を追加 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <!--//Flink JDBC コネクタおよび ClickHouse JDBC ドライバーの依存関係を追加 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
  3. データ書き込み用のプログラムファイルを作成します。

    この例では、CsvTableSource を使用して CSV ファイルを読み込み、テーブルソースを作成します。次に、TableEnvironment.toAppendStream を使用してテーブルを DataStream に変換します。最後に、JdbcSink を使用してデータを ClickHouse へ書き込みます。

    説明
    • ClickHouse では単一の INSERT 操作に高いレイテンシーが発生するため、BatchSize を設定して一括挿入を行い、パフォーマンスを向上させることを推奨します。

    • 現在の flink-connector-jdbc バージョンでは、Scala API を使用して JdbcSink を呼び出す際に、ラムダ関数でシリアル化の問題が発生します。この問題を回避するため、JDBC ステートメントビルダー関数を渡すために、インターフェイスを手動で実装する必要があります。CkSinkBuilder クラスは、そのような実装の例です。

      class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
        def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
          ps.setString(1, v._1)
          ps.setLong(2, v._2)
          ps.setFloat(3, v._3)
        }
      }
    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.connector.jdbc._
    import java.sql.PreparedStatement
    
    // JDBC ステートメントビルダー関数を渡すために、インターフェイスを手動で実装します
    class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
      def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
        ps.setString(1, v._1)
        ps.setLong(2, v._2)
        ps.setFloat(3, v._3)
      }
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // バッチサイズを設定してください
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
    // テーブルを DataStream に変換します
        val resultDataStream =
          tEnv.toAppendStream[(String, Long, Float)](resultTable)
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    // ClickHouse JDBC sink へデータを書き込みます
        resultDataStream.addSink(
          JdbcSink.sink[(String, Long, Float)](
            insertIntoCkSql,
            new CkSinkBuilder,
            new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
              .withUrl(CkJdbcUrl)
              .withUsername(CkUsername)
              .withPassword(CkPassword)
              .build()
          )
        )
    
        env.execute("Flink DataStream から ClickHouse への書き込み(サンプル)")
      }
    }

    パラメーター:

    • SourceCsvPath:ソース CSV ファイルのパス。

    • CkJdbcUrl:送信先 ClickHouse クラスターのアドレス。

    • CkUsername:送信先 ClickHouse クラスターのユーザー名。

    • CkPassword:ClickHouse クラスターのパスワード。

  4. プログラムをコンパイルして実行します。

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar