すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Flink から ClickHouse クラスターにデータをインポートする

最終更新日:Jan 11, 2025

このトピックでは、Flink から ClickHouse クラスターにデータをインポートする方法について説明します。

前提条件

  • E-MapReduce(EMR)Flink クラスターが作成されていること。詳細については、クラスターの作成をご参照ください。

  • EMR ClickHouse クラスターが作成されていること。詳細については、ClickHouse クラスターの作成をご参照ください。

背景情報

Flink の詳細については、Apache Flink 公式 Web サイトをご覧ください。

サンプルコード

サンプルコード:

  • ストリーム処理

    package com.company.packageName
    
    import java.util.concurrent.ThreadLocalRandom
    
    import scala.annotation.tailrec
    
    import org.apache.flink.api.common.typeinfo.Types
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2RowDataStream}
    
    object StreamingJob {
    
      case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
    
      private var dbName: String = "default"
      private var tableName: String = ""
      private var ckHost: String = ""
      private var ckPort: String = "8123"
      private var user: String = "default"
      private var password: String = ""
    
      def main(args: Array[String]) {
        // 引数を解析する
        parse(args.toList)
        // 引数を確認する
        checkArguments()
    
        // ストリーミング実行環境をセットアップする
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tableEnv = StreamTableEnvironment.create(env)
    
        val insertIntoCkSql =
          s"""
            | INSERT INTO $tableName (
            |   id, key1, value1, key2, value2
            | ) VALUES (
            |   ?, ?, ?, ?, ?
            | )
            |""".stripMargin
    
        val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    
        println(s"jdbc url: $jdbcUrl")
        println(s"insert sql: $insertIntoCkSql")
    
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(jdbcUrl)
          .setUsername(user)
          .setPassword(password)
          .setQuery(insertIntoCkSql)
          .setBatchSize(1000)
          .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
          .build()
    
        val data: DataStream[Test] = env.fromCollection(1 to 1000).map(i => {
          val rand = ThreadLocalRandom.current()
          val randString = (0 until rand.nextInt(10, 20))
            .map(_ => rand.nextLong())
            .mkString("")
          Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
        })
    
        val table = table2RowDataStream(tableEnv.fromDataStream(data))
        sink.emitDataStream(table.javaStream)
    
        // プログラムを実行する
        env.execute("Flink Streaming Scala API Skeleton")
      }
    
      private def printUsageAndExit(exitCode: Int = 0): Unit = {
        // 使用方法を表示して終了する
        println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
        println("  --dbName      Specifies the name of the ClickHouse database. Default value: default.")
        println("  --tableName   Specifies the name of the table in the ClickHouse database.")
        println("  --ckHost      Specifies the IP address of the ClickHouse cluster.")
        println("  --ckPort      Specifies the port number of the ClickHouse cluster. Default value: 8123.")
        println("  --user        Specifies the username that is used to access the ClickHouse cluster.")
        println("  --password    Specifies the password that is used to access the ClickHouse cluster.")
        System.exit(exitCode)
      }
    
      @tailrec
      private def parse(args: List[String]): Unit = args match {
        case ("--help" | "-h") :: _ =>
          printUsageAndExit()
        case "--dbName" :: value :: tail =>
          dbName = value
          parse(tail)
        case "--tableName" :: value :: tail =>
          tableName = value
          parse(tail)
        case "--ckHost" :: value :: tail =>
          ckHost = value
          parse(tail)
        case "--ckPort" :: value :: tail =>
          ckPort = value
          parse(tail)
        case "--user" :: value :: tail =>
          user = value
          parse(tail)
        case "--password" :: value :: tail =>
          password = value
          parse(tail)
        case Nil =>
        case _ =>
          printUsageAndExit(1)
      }
    
      private def checkArguments(): Unit = {
        // 引数を確認する
        if ("".equals(tableName) || "".equals(ckHost)) {
          printUsageAndExit(2)
        }
      }
    }
  • バッチ処理

    package com.company.packageName
    
    import java.util.concurrent.ThreadLocalRandom
    
    import scala.annotation.tailrec
    
    import org.apache.flink.Utils
    import org.apache.flink.api.common.typeinfo.Types
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.scala._
    import org.apache.flink.table.api.scala.{BatchTableEnvironment, table2RowDataSet}
    
    object BatchJob {
    
      case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
    
      private var dbName: String = "default"
      private var tableName: String = ""
      private var ckHost: String = ""
      private var ckPort: String = "8123"
      private var user: String = "default"
      private var password: String = ""
    
      def main(args: Array[String]) {
        // 引数を解析する
        parse(args.toList)
        // 引数を確認する
        checkArguments()
    
    
        // バッチ実行環境をセットアップする
        val env = ExecutionEnvironment.getExecutionEnvironment
        val tableEnv = BatchTableEnvironment.create(env)
    
        val insertIntoCkSql =
          s"""
            | INSERT INTO $tableName (
            |   id, key1, value1, key2, value2
            | ) VALUES (
            |   ?, ?, ?, ?, ?
            | )
            |""".stripMargin
        val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    
        println(s"jdbc url: $jdbcUrl")
        println(s"insert sql: $insertIntoCkSql")
    
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(jdbcUrl)
          .setUsername(user)
          .setPassword(password)
          .setQuery(insertIntoCkSql)
          .setBatchSize(1000)
          .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
          .build()
    
        val data = env.fromCollection(1 to 1000).map(i => {
          val rand = ThreadLocalRandom.current()
          val randString = (0 until rand.nextInt(10, 20))
            .map(_ => rand.nextLong())
            .mkString("")
          Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
        })
    
        val table = table2RowDataSet(tableEnv.fromDataSet(data))
    
        sink.emitDataSet(Utils.convertScalaDatasetToJavaDataset(table))
    
        // プログラムを実行する
        env.execute("Flink Batch Scala API Skeleton")
      }
    
      private def printUsageAndExit(exitCode: Int = 0): Unit = {
        // 使用方法を表示して終了する
        println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
        println("  --dbName      Specifies the name of the ClickHouse database. Default value: default.")
        println("  --tableName   Specifies the name of the table in the ClickHouse database.")
        println("  --ckHost      Specifies the IP address of the ClickHouse cluster.")
        println("  --ckPort      Specifies the port number of the ClickHouse cluster. Default value: 8123.")
        println("  --user        Specifies the username that is used to access the ClickHouse cluster.")
        println("  --password    Specifies the password that is used to access the ClickHouse cluster.")
        System.exit(exitCode)
      }
    
      @tailrec
      private def parse(args: List[String]): Unit = args match {
        case ("--help" | "-h") :: _ =>
          printUsageAndExit()
        case "--dbName" :: value :: tail =>
          dbName = value
          parse(tail)
        case "--tableName" :: value :: tail =>
          tableName = value
          parse(tail)
        case "--ckHost" :: value :: tail =>
          ckHost = value
          parse(tail)
        case "--ckPort" :: value :: tail =>
          ckPort = value
          parse(tail)
        case "--user" :: value :: tail =>
          user = value
          parse(tail)
        case "--password" :: value :: tail =>
          password = value
          parse(tail)
        case Nil =>
        case _ =>
          printUsageAndExit(1)
      }
    
      private def checkArguments(): Unit = {
        // 引数を確認する
        if ("".equals(tableName) || "".equals(ckHost)) {
          printUsageAndExit(2)
        }
      }
    }

手順

  1. 手順 1: ClickHouse テーブルを作成する

  2. 手順 2: コードをコンパイルしてパッケージ化する

  3. 手順 3: ジョブを送信する

手順 1: ClickHouse テーブルを作成する

  1. SSH モードで ClickHouse クラスターにログオンします。詳細については、クラスターへのログオンをご参照ください。

  2. 次のコマンドを実行して、ClickHouse クライアントを起動します。

    clickhouse-client -h core-1-1 -m
    説明

    サンプルコマンドでは、core-1-1 はログオンするコアノードの名前を示しています。複数のコアノードがある場合は、いずれかのノードにログオンできます。

  3. ClickHouse データベースと必要な ClickHouse テーブルを作成します。

    1. clickhouse_database_name という名前のデータベースを作成するには、次のコマンドを実行します。

      CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

      Alibaba Cloud EMR は、cluster_emr という名前の ClickHouse クラスターを自動的に生成します。データベースの名前はカスタマイズできます。

    2. clickhouse_table_name_local という名前のテーブルを作成するには、次のコマンドを実行します。

      CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
        id            UInt32,
        key1            String,
        value1        UInt8,
        key2            Int64,
        value2        Float64
      ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
      ORDER BY id;
      説明

      テーブル名はカスタマイズできますが、テーブル名の末尾は _local にする必要があります。layershardreplica パラメーターは、Alibaba Cloud EMR によって ClickHouse クラスター用に自動的に生成されるマクロであり、直接使用できます。

    3. clickhouse_table_name_all という名前のテーブルを作成するには、次のコマンドを実行します。このテーブルのフィールドは、clickhouse_table_name_local テーブルのフィールドと同じ方法で定義されています。

      説明

      テーブル名はカスタマイズできますが、テーブル名の末尾は _all にする必要があります。

      CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
        id                    UInt32,
        key1                  String,
        value1                UInt8,
        key2                  Int64,
        value2                Float64
      ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

手順 2: コードをコンパイルしてパッケージ化する

  1. flink-clickhouse-demo.tgz ファイルをダウンロードして、オンプレミスマシンに解凍します。

  2. CLI で、pom.xml ファイルが格納されているパスに移動し、次のコマンドを実行してファイルをパッケージ化します。

    mvn clean package

    pom.xml ファイルの artifactId 情報に基づいて、flink-clickhouse-demo-1.0.0.jar という名前の JAR パッケージが target ディレクトリに表示されます。

手順 3: ジョブを送信する

  1. SSH モードで Flink クラスターにログオンします。詳細については、クラスターへのログオンをご参照ください。

  2. flink-clickhouse-demo-1.0.0.jar パッケージを Flink クラスターのルートディレクトリにアップロードします。

    説明

    この例では、flink-clickhouse-demo-1.0.0.jar パッケージはルートディレクトリにアップロードされます。ビジネス要件に基づいて、パッケージをディレクトリにアップロードできます。

  3. ジョブを送信します。

    サンプルコード:

    • ストリーム処理ジョブ

      flink run -m yarn-cluster \
                -c com.aliyun.emr.StreamingJob \
                flink-clickhouse-demo-1.0.0.jar \
                --dbName clickhouse_database_name \
                --tableName clickhouse_table_name_all \
                --ckHost ${clickhouse_host} \
                --password ${password};
    • バッチ処理ジョブ

      flink run -m yarn-cluster \
                -c com.aliyun.emr.BatchJob \
                flink-clickhouse-demo-1.0.0.jar \
                --dbName clickhouse_database_name \
                --tableName clickhouse_table_name_all \
                --ckHost ${clickhouse_host} \
                --password ${password};

    パラメーター

    説明

    dbName

    ClickHouse データベースの名前。デフォルト値: default。この例では、clickhouse_database_name という名前のデータベースが使用されます。

    tableName

    ClickHouse データベース内のテーブルの名前。この例では、clickhouse_table_name_all という名前のテーブルが使用されます。

    ckHost

    ClickHouse クラスターのマスターノードのプライベート IP アドレスまたはパブリック IP アドレス。IP アドレスの取得方法の詳細については、このトピックのマスターノードの IP アドレスを取得するセクションをご参照ください。

    password

    ClickHouse クラスターへのアクセスに使用するパスワード。

    ClickHouse サービスページの [構成] タブで、users.default.password パラメーターを表示してパスワードを取得できます。

    password

マスターノードの IP アドレスを取得する

  1. [ノード] タブに移動します。

    1. EMR コンソール にログオンします。左側のナビゲーションペインで、[ECS 上の EMR] をクリックします。

    2. 上部のナビゲーションバーで、クラスターが存在するリージョンとリソースグループをビジネス要件に基づいて選択します。

    3. [ECS 上の EMR] ページで、管理するクラスターを見つけ、[アクション] 列の [ノード] をクリックします。

  2. [ノード] タブで、マスターノードグループを見つけ、open アイコンをクリックし、[パブリック IP アドレス] 列の IP アドレスをコピーします。