本文為您介紹如何將Flink中的資料匯入至ClickHouse叢集。
前提條件
已建立Flink叢集,詳情請參見建立叢集。
已建立ClickHouse叢集,詳情請參見建立ClickHouse叢集。
背景資訊
關於Flink的更多介紹,請參見Apache Flink。
程式碼範例
程式碼範例如下:
流處理
package com.company.packageName import java.util.concurrent.ThreadLocalRandom import scala.annotation.tailrec import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2RowDataStream} object StreamingJob { case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double) private var dbName: String = "default" private var tableName: String = "" private var ckHost: String = "" private var ckPort: String = "8123" private var user: String = "default" private var password: String = "" def main(args: Array[String]) { parse(args.toList) checkArguments() // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val insertIntoCkSql = s""" | INSERT INTO $tableName ( | id, key1, value1, key2, value2 | ) VALUES ( | ?, ?, ?, ?, ? | ) |""".stripMargin val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName" println(s"jdbc url: $jdbcUrl") println(s"insert sql: $insertIntoCkSql") val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(jdbcUrl) .setUsername(user) .setPassword(password) .setQuery(insertIntoCkSql) .setBatchSize(1000) .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE) .build() val data: DataStream[Test] = env.fromCollection(1 to 1000).map(i => { val rand = ThreadLocalRandom.current() val randString = (0 until rand.nextInt(10, 20)) .map(_ => rand.nextLong()) .mkString("") Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian()) }) val table = table2RowDataStream(tableEnv.fromDataStream(data)) sink.emitDataStream(table.javaStream) // execute program env.execute("Flink Streaming Scala API Skeleton") } private def printUsageAndExit(exitCode: Int = 0): Unit = { println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]") println(" --dbName 設定ClickHouse資料庫的名稱,預設為default") println(" --tableName 設定ClickHouse庫中表的名稱") println(" --ckHost 設定ClickHouse地址") println(" --ckPort 設定ClickHouse連接埠,預設為8123") println(" --user 設定ClickHouse所使用的使用者名稱") println(" --password 設定ClickHouse使用者的密碼") System.exit(exitCode) } @tailrec private def parse(args: List[String]): Unit = args match { case ("--help" | "-h") :: _ => printUsageAndExit() case "--dbName" :: value :: tail => dbName = value parse(tail) case "--tableName" :: value :: tail => tableName = value parse(tail) case "--ckHost" :: value :: tail => ckHost = value parse(tail) case "--ckPort" :: value :: tail => ckPort = value parse(tail) case "--user" :: value :: tail => user = value parse(tail) case "--password" :: value :: tail => password = value parse(tail) case Nil => case _ => printUsageAndExit(1) } private def checkArguments(): Unit = { if ("".equals(tableName) || "".equals(ckHost)) { printUsageAndExit(2) } } }批處理
package com.company.packageName import java.util.concurrent.ThreadLocalRandom import scala.annotation.tailrec import org.apache.flink.Utils import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala.{BatchTableEnvironment, table2RowDataSet} object BatchJob { case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double) private var dbName: String = "default" private var tableName: String = "" private var ckHost: String = "" private var ckPort: String = "8123" private var user: String = "default" private var password: String = "" def main(args: Array[String]) { parse(args.toList) checkArguments() // set up the batch execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = BatchTableEnvironment.create(env) val insertIntoCkSql = s""" | INSERT INTO $tableName ( | id, key1, value1, key2, value2 | ) VALUES ( | ?, ?, ?, ?, ? | ) |""".stripMargin val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName" println(s"jdbc url: $jdbcUrl") println(s"insert sql: $insertIntoCkSql") val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(jdbcUrl) .setUsername(user) .setPassword(password) .setQuery(insertIntoCkSql) .setBatchSize(1000) .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE) .build() val data = env.fromCollection(1 to 1000).map(i => { val rand = ThreadLocalRandom.current() val randString = (0 until rand.nextInt(10, 20)) .map(_ => rand.nextLong()) .mkString("") Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian()) }) val table = table2RowDataSet(tableEnv.fromDataSet(data)) sink.emitDataSet(Utils.convertScalaDatasetToJavaDataset(table)) // execute program env.execute("Flink Batch Scala API Skeleton") } private def printUsageAndExit(exitCode: Int = 0): Unit = { println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]") println(" --dbName 設定ClickHouse資料庫的名稱,預設為default") println(" --tableName 設定ClickHouse庫中表的名稱") println(" --ckHost 設定ClickHouse地址") println(" --ckPort 設定ClickHouse連接埠,預設為8123") println(" --user 設定ClickHouse所使用的使用者名稱") println(" --password 設定ClickHouse使用者的密碼") System.exit(exitCode) } @tailrec private def parse(args: List[String]): Unit = args match { case ("--help" | "-h") :: _ => printUsageAndExit() case "--dbName" :: value :: tail => dbName = value parse(tail) case "--tableName" :: value :: tail => tableName = value parse(tail) case "--ckHost" :: value :: tail => ckHost = value parse(tail) case "--ckPort" :: value :: tail => ckPort = value parse(tail) case "--user" :: value :: tail => user = value parse(tail) case "--password" :: value :: tail => password = value parse(tail) case Nil => case _ => printUsageAndExit(1) } private def checkArguments(): Unit = { if ("".equals(tableName) || "".equals(ckHost)) { printUsageAndExit(2) } } }
操作流程
步驟一:建立ClickHouse表
使用SSH方式登入ClickHouse叢集,詳情請參見登入叢集。
執行如下命令,啟動ClickHouse用戶端。
clickhouse-client -h core-1-1 -m說明本樣本登入core-1-1節點,如果您有多個Core節點,可以登入任意一個節點。
建立ClickHouse資訊。
執行如下命令,建立資料庫clickhouse_database_name。
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;阿里雲EMR會為ClickHouse叢集自動產生一個名為cluster_emr的叢集。資料庫名您可以自訂。
執行如下命令,建立表clickhouse_table_name_local。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}') ORDER BY id;說明表名您可以自訂,但請確保表名是以_local結尾。layer、shard和replica是阿里雲EMR為ClickHouse叢集自動產生的宏定義,可以直接使用。
執行如下命令,建立與表clickhouse_table_name_local欄位定義一致的表clickhouse_table_name_all。
說明表名您可以自訂,但請確保表名是以_all結尾。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
步驟二:編譯並打包
下載並解壓flink-clickhouse-demo.tgz樣本到本地。
在CMD命令列中,進入到下載檔案中pom.xml所在的目錄下,執行如下命令打包檔案。
mvn clean package根據您pom.xml檔案中artifactId的資訊,下載檔案中的target目錄下會出現flink-clickhouse-demo-1.0.0.jar的JAR包。
步驟三:提交作業
使用SSH方式登入Flink叢集,詳情請參見登入叢集。
上傳打包好的flink-clickhouse-demo-1.0.0.jar至Flink叢集的根目錄下。
說明本文樣本中flink-clickhouse-demo-1.0.0.jar是上傳至root根目錄下,您也可以自訂上傳路徑。
執行如下命令提交作業。
程式碼範例如下:
流作業
flink run -m yarn-cluster \ -c com.aliyun.emr.StreamingJob \ flink-clickhouse-demo-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host} \ --password ${password};批作業
flink run -m yarn-cluster \ -c com.aliyun.emr.BatchJob \ flink-clickhouse-demo-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host} \ --password ${password};
參數
說明
dbName
ClickHouse叢集資料庫的名稱,預設為default。本文樣本為clickhouse_database_name。
tableName
ClickHouse叢集資料庫中表的名稱。本文樣本為clickhouse_table_name_all。
ckHost
ClickHouse叢集的Master節點的內網IP地址或公網IP地址。ip地址擷取方式,請參見擷取主節點的IP地址。
password
ClickHouse使用者的密碼。
您可以在ClickHouse服務的配置頁面,通過查看users.default.password參數,擷取密碼。

擷取主節點的IP地址
進入節點管理頁面。
在頂部功能表列處,根據實際情況選擇地區和資源群組。
在EMR on ECS頁面,單擊目的地組群操作列的節點管理。
在節點管理頁面,單擊Master節點群組所在行的
表徵圖,複製公網IP列的IP地址。