This topic describes how to import data from Spark to a ClickHouse cluster.
Prerequisites
An E-MapReduce (EMR) Hadoop cluster is created. For more information, see Create a cluster.
An EMR ClickHouse cluster is created. For more information, see Create a ClickHouse cluster.
Background information
For more information about Spark, see Overview.
Sample code
The following sample code is provided:
package com.company.packageName
import java.util.Properties
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import com.google.common.collect.ImmutableMap
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}
case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
object CKDataImporter extends Logging {
private var dbName: String = "default"
private var tableName: String = ""
private var ckHost: String = ""
private var ckPort: String = "8123"
private var user: String = "default"
private var password: String = ""
private var local: Boolean = false
def main(args: Array[String]): Unit = {
parse(args.toList)
checkArguments()
val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
logInfo(s"Use jdbc: $jdbcUrl")
logInfo(s"Use table: $tableName")
val spark = getSparkSession
// generate test data
val rdd = spark.sparkContext.parallelize(1 to 1000).map(i => {
val rand = ThreadLocalRandom.current()
val randString = (0 until rand.nextInt(10, 20))
.map(_ => rand.nextLong())
.mkString("")
Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
})
val df = spark.createDataFrame(rdd)
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, tableName, getCKJdbcProperties(user, password))
}
private def printUsageAndExit(exitCode: Int = 0): Unit = {
logError("Usage: java -jar /path/to/CKDataImporter.jar [options]")
logError(" --dbName Specifies the name of the ClickHouse database. Default value: default.")
logError(" --tableName Specifies the name of the table in the ClickHouse database.")
logError(" --ckHost Specifies the IP address of the ClickHouse cluster.")
logError(" --ckPort Specifies the port number of the ClickHouse cluster. Default value: 8123.")
logError(" --user Specifies the username that is used to access the ClickHouse cluster.")
logError(" --password Specifies the password that is used to access the ClickHouse cluster.")
logError(" --local Specifies whether the sample code runs in Spark local mode.")
System.exit(exitCode)
}
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--help" | "-h") :: _ =>
printUsageAndExit()
case "--dbName" :: value :: tail =>
dbName = value
parse(tail)
case "--tableName" :: value :: tail =>
tableName = value
parse(tail)
case "--ckHost" :: value :: tail =>
ckHost = value
parse(tail)
case "--ckPort" :: value :: tail =>
ckPort = value
parse(tail)
case "--user" :: value :: tail =>
user = value
parse(tail)
case "--password" :: value :: tail =>
password = value
parse(tail)
case "--local" :: tail =>
local = true
parse(tail)
case Nil =>
case _ =>
printUsageAndExit(1)
}
private def checkArguments(): Unit = {
if ("".equals(tableName) || "".equals(ckHost)) {
printUsageAndExit(2)
}
}
private def getCKJdbcProperties(
user: String,
password: String,
batchSize: String = "1000",
socketTimeout: String = "300000",
numPartitions: String = "8",
rewriteBatchedStatements: String = "true"): Properties = {
val kvMap = ImmutableMap.builder()
.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.put("user", user)
.put("password", password)
.put("batchsize", batchSize)
.put("socket_timeout", socketTimeout)
.put("numPartitions", numPartitions)
.put("rewriteBatchedStatements", rewriteBatchedStatements)
.build()
val properties = new Properties
properties.putAll(kvMap)
properties
}
private def getSparkSession: SparkSession = {
val builder = SparkSession.builder()
if (local) {
builder.master("local[*]")
}
builder.appName("ClickHouse-Data-Importer")
builder.getOrCreate()
}
}Procedure
Step 1: Create ClickHouse tables
Log on to the ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to start the ClickHouse client:
clickhouse-client -h core-1-1 -mNoteIn the sample command, core-1-1 indicates the name of the core node that you log on to. If you have multiple core nodes, you can log on to one of the nodes.
Create a ClickHouse database and the required ClickHouse tables.
Run the following command to create a database named clickhouse_database_name:
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;Alibaba Cloud EMR automatically generates a ClickHouse cluster named cluster_emr. You can customize the name of the database.
Run the following command to create a table named clickhouse_table_name_local:
CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}') ORDER BY id;NoteYou can customize the name of the table, but the table name must end with _local. The layer, shard, and replica parameters are macros that are automatically generated by Alibaba Cloud EMR for ClickHouse clusters and can be directly used.
Run the following command to create a table named clickhouse_table_name_all. The fields in this table are defined in the same way as the fields in the clickhouse_table_name_local table.
NoteYou can customize the name of the table, but the table name must end with _all.
CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
Step 2: Compile and package the code
Download and decompress the CKDataImporter file to your on-premises machine.
In the CLI, go to the path that stores the pom.xml file and run the following command to package the file:
mvn clean packageThe JAR package CKDataImporter-1.0.0.jar is generated in the target directory based on the artifactId information in the pom.xml file.
Step 3: Submit a job
Log on to the Hadoop cluster in SSH mode. For more information, see Log on to a cluster.
Upload the CKDataImporter-1.0.0.jar package to the root directory of the Hadoop cluster.
NoteIn this example, the CKDataImporter-1.0.0.jar package is uploaded to the root directory. You can upload the package to a directory based on your business requirements.
Run the following command to submit the job:
spark-submit --master yarn \ --class com.aliyun.emr.CKDataImporter \ CKDataImporter-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host} \ --password ${password};Parameter
Description
dbName
The name of the ClickHouse database. Default value: default. In this example, a database named clickhouse_database_name is used.
tableName
The name of the table in the ClickHouse database. In this example, a table named clickhouse_table_name_all is used.
ckHost
The private or public IP address of the master node of the ClickHouse cluster. For more information about how to obtain the IP address of the master node, see Obtain the IP address of the master node.
password
The password that is used to access the ClickHouse cluster.
The value of the users.default.password parameter is the password that you can use to access the ClickHouse cluster. You can view the value of the parameter on the Configure tab of the ClickHouse service page.
Obtain the IP address of the master node
Go to the Nodes tab.
Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
On the EMR on ECS page, find the cluster that you want to manage and click Nodes in the Actions column.
On the Nodes tab, find the master node group, click the
icon, and then copy the IP address in the Public IP Address column.