このトピックでは、Flink から ClickHouse クラスターにデータをインポートする方法について説明します。
前提条件
E-MapReduce(EMR)Flink クラスターが作成されていること。詳細については、クラスターの作成をご参照ください。
EMR ClickHouse クラスターが作成されていること。詳細については、ClickHouse クラスターの作成をご参照ください。
背景情報
Flink の詳細については、Apache Flink 公式 Web サイトをご覧ください。
サンプルコード
サンプルコード:
ストリーム処理
package com.company.packageName import java.util.concurrent.ThreadLocalRandom import scala.annotation.tailrec import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2RowDataStream} object StreamingJob { case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double) private var dbName: String = "default" private var tableName: String = "" private var ckHost: String = "" private var ckPort: String = "8123" private var user: String = "default" private var password: String = "" def main(args: Array[String]) { // 引数を解析する parse(args.toList) // 引数を確認する checkArguments() // ストリーミング実行環境をセットアップする val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val insertIntoCkSql = s""" | INSERT INTO $tableName ( | id, key1, value1, key2, value2 | ) VALUES ( | ?, ?, ?, ?, ? | ) |""".stripMargin val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName" println(s"jdbc url: $jdbcUrl") println(s"insert sql: $insertIntoCkSql") val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(jdbcUrl) .setUsername(user) .setPassword(password) .setQuery(insertIntoCkSql) .setBatchSize(1000) .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE) .build() val data: DataStream[Test] = env.fromCollection(1 to 1000).map(i => { val rand = ThreadLocalRandom.current() val randString = (0 until rand.nextInt(10, 20)) .map(_ => rand.nextLong()) .mkString("") Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian()) }) val table = table2RowDataStream(tableEnv.fromDataStream(data)) sink.emitDataStream(table.javaStream) // プログラムを実行する env.execute("Flink Streaming Scala API Skeleton") } private def printUsageAndExit(exitCode: Int = 0): Unit = { // 使用方法を表示して終了する println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]") println(" --dbName Specifies the name of the ClickHouse database. Default value: default.") println(" --tableName Specifies the name of the table in the ClickHouse database.") println(" --ckHost Specifies the IP address of the ClickHouse cluster.") println(" --ckPort Specifies the port number of the ClickHouse cluster. Default value: 8123.") println(" --user Specifies the username that is used to access the ClickHouse cluster.") println(" --password Specifies the password that is used to access the ClickHouse cluster.") System.exit(exitCode) } @tailrec private def parse(args: List[String]): Unit = args match { case ("--help" | "-h") :: _ => printUsageAndExit() case "--dbName" :: value :: tail => dbName = value parse(tail) case "--tableName" :: value :: tail => tableName = value parse(tail) case "--ckHost" :: value :: tail => ckHost = value parse(tail) case "--ckPort" :: value :: tail => ckPort = value parse(tail) case "--user" :: value :: tail => user = value parse(tail) case "--password" :: value :: tail => password = value parse(tail) case Nil => case _ => printUsageAndExit(1) } private def checkArguments(): Unit = { // 引数を確認する if ("".equals(tableName) || "".equals(ckHost)) { printUsageAndExit(2) } } }
バッチ処理
package com.company.packageName import java.util.concurrent.ThreadLocalRandom import scala.annotation.tailrec import org.apache.flink.Utils import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala.{BatchTableEnvironment, table2RowDataSet} object BatchJob { case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double) private var dbName: String = "default" private var tableName: String = "" private var ckHost: String = "" private var ckPort: String = "8123" private var user: String = "default" private var password: String = "" def main(args: Array[String]) { // 引数を解析する parse(args.toList) // 引数を確認する checkArguments() // バッチ実行環境をセットアップする val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = BatchTableEnvironment.create(env) val insertIntoCkSql = s""" | INSERT INTO $tableName ( | id, key1, value1, key2, value2 | ) VALUES ( | ?, ?, ?, ?, ? | ) |""".stripMargin val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName" println(s"jdbc url: $jdbcUrl") println(s"insert sql: $insertIntoCkSql") val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(jdbcUrl) .setUsername(user) .setPassword(password) .setQuery(insertIntoCkSql) .setBatchSize(1000) .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE) .build() val data = env.fromCollection(1 to 1000).map(i => { val rand = ThreadLocalRandom.current() val randString = (0 until rand.nextInt(10, 20)) .map(_ => rand.nextLong()) .mkString("") Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian()) }) val table = table2RowDataSet(tableEnv.fromDataSet(data)) sink.emitDataSet(Utils.convertScalaDatasetToJavaDataset(table)) // プログラムを実行する env.execute("Flink Batch Scala API Skeleton") } private def printUsageAndExit(exitCode: Int = 0): Unit = { // 使用方法を表示して終了する println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]") println(" --dbName Specifies the name of the ClickHouse database. Default value: default.") println(" --tableName Specifies the name of the table in the ClickHouse database.") println(" --ckHost Specifies the IP address of the ClickHouse cluster.") println(" --ckPort Specifies the port number of the ClickHouse cluster. Default value: 8123.") println(" --user Specifies the username that is used to access the ClickHouse cluster.") println(" --password Specifies the password that is used to access the ClickHouse cluster.") System.exit(exitCode) } @tailrec private def parse(args: List[String]): Unit = args match { case ("--help" | "-h") :: _ => printUsageAndExit() case "--dbName" :: value :: tail => dbName = value parse(tail) case "--tableName" :: value :: tail => tableName = value parse(tail) case "--ckHost" :: value :: tail => ckHost = value parse(tail) case "--ckPort" :: value :: tail => ckPort = value parse(tail) case "--user" :: value :: tail => user = value parse(tail) case "--password" :: value :: tail => password = value parse(tail) case Nil => case _ => printUsageAndExit(1) } private def checkArguments(): Unit = { // 引数を確認する if ("".equals(tableName) || "".equals(ckHost)) { printUsageAndExit(2) } } }
手順
手順 1: ClickHouse テーブルを作成する
SSH モードで ClickHouse クラスターにログオンします。詳細については、クラスターへのログオンをご参照ください。
次のコマンドを実行して、ClickHouse クライアントを起動します。
clickhouse-client -h core-1-1 -m
説明サンプルコマンドでは、core-1-1 はログオンするコアノードの名前を示しています。複数のコアノードがある場合は、いずれかのノードにログオンできます。
ClickHouse データベースと必要な ClickHouse テーブルを作成します。
clickhouse_database_name という名前のデータベースを作成するには、次のコマンドを実行します。
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;
Alibaba Cloud EMR は、cluster_emr という名前の ClickHouse クラスターを自動的に生成します。データベースの名前はカスタマイズできます。
clickhouse_table_name_local という名前のテーブルを作成するには、次のコマンドを実行します。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}') ORDER BY id;
説明テーブル名はカスタマイズできますが、テーブル名の末尾は _local にする必要があります。layer、shard、replica パラメーターは、Alibaba Cloud EMR によって ClickHouse クラスター用に自動的に生成されるマクロであり、直接使用できます。
clickhouse_table_name_all という名前のテーブルを作成するには、次のコマンドを実行します。このテーブルのフィールドは、clickhouse_table_name_local テーブルのフィールドと同じ方法で定義されています。
説明テーブル名はカスタマイズできますが、テーブル名の末尾は _all にする必要があります。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
手順 2: コードをコンパイルしてパッケージ化する
flink-clickhouse-demo.tgz ファイルをダウンロードして、オンプレミスマシンに解凍します。
CLI で、pom.xml ファイルが格納されているパスに移動し、次のコマンドを実行してファイルをパッケージ化します。
mvn clean package
pom.xml ファイルの artifactId 情報に基づいて、flink-clickhouse-demo-1.0.0.jar という名前の JAR パッケージが target ディレクトリに表示されます。
手順 3: ジョブを送信する
SSH モードで Flink クラスターにログオンします。詳細については、クラスターへのログオンをご参照ください。
flink-clickhouse-demo-1.0.0.jar パッケージを Flink クラスターのルートディレクトリにアップロードします。
説明この例では、flink-clickhouse-demo-1.0.0.jar パッケージはルートディレクトリにアップロードされます。ビジネス要件に基づいて、パッケージをディレクトリにアップロードできます。
ジョブを送信します。
サンプルコード:
ストリーム処理ジョブ
flink run -m yarn-cluster \ -c com.aliyun.emr.StreamingJob \ flink-clickhouse-demo-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host} \ --password ${password};
バッチ処理ジョブ
flink run -m yarn-cluster \ -c com.aliyun.emr.BatchJob \ flink-clickhouse-demo-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host} \ --password ${password};
パラメーター
説明
dbName
ClickHouse データベースの名前。デフォルト値: default。この例では、clickhouse_database_name という名前のデータベースが使用されます。
tableName
ClickHouse データベース内のテーブルの名前。この例では、clickhouse_table_name_all という名前のテーブルが使用されます。
ckHost
ClickHouse クラスターのマスターノードのプライベート IP アドレスまたはパブリック IP アドレス。IP アドレスの取得方法の詳細については、このトピックのマスターノードの IP アドレスを取得するセクションをご参照ください。
password
ClickHouse クラスターへのアクセスに使用するパスワード。
ClickHouse サービスページの [構成] タブで、users.default.password パラメーターを表示してパスワードを取得できます。
マスターノードの IP アドレスを取得する
[ノード] タブに移動します。
EMR コンソール にログオンします。左側のナビゲーションペインで、[ECS 上の EMR] をクリックします。
上部のナビゲーションバーで、クラスターが存在するリージョンとリソースグループをビジネス要件に基づいて選択します。
[ECS 上の EMR] ページで、管理するクラスターを見つけ、[アクション] 列の [ノード] をクリックします。
[ノード] タブで、マスターノードグループを見つけ、
アイコンをクリックし、[パブリック IP アドレス] 列の IP アドレスをコピーします。