This topic describes how to use a Spark program to import data to ApsaraDB for ClickHouse.
Prerequisites
The IP address of the on-premises machine is added to the whitelist of the ApsaraDB for ClickHouse cluster. For more information, see Configure a whitelist.
An ApsaraDB for ClickHouse table is created. The data type of the table maps the data type of the data that you want to import. For more information, see Create a table.
Procedure
Prepare the directory structure of the Spark program.
find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/com ./src/main/scala/com/spark ./src/main/scala/com/spark/test ./src/main/scala/com/spark/test/WriteToCk.scalaAdd dependencies to the build.sbt configuration file.
name := "Simple Project" version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"Create a file named WriteToCk.scala and write data to the file.
package com.spark.test import java.util import java.util.Properties import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel object WriteToCk { val properties = new Properties() properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver") properties.put("user", "<yourUserName>") properties.put("password", "<yourPassword>") properties.put("batchsize","100000") properties.put("socket_timeout","300000") properties.put("numPartitions","8") properties.put("rewriteBatchedStatements","true") val url = "jdbc:clickhouse://<yourUrl>:8123/default" val table = "<yourTableName>" def main(args: Array[String]): Unit = { val sc = new SparkConf() sc.set("spark.driver.memory", "1G") sc.set("spark.driver.cores", "4") sc.set("spark.executor.memory", "1G") sc.set("spark.executor.cores", "2") val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate() val df = session.read.format("csv") .option("header", "true") .option("sep", ",") .option("inferSchema", "true") .load("<yourFilePath>") .selectExpr( "colName1", "colName2", "colName3", ... ) .persist(StorageLevel.MEMORY_ONLY_SER_2) println(s"read done") df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties) println(s"write done") df.unpersist(true) } }The following table describes the parameters.
Parameter
Description
yourUserNameThe username of the database account that is created in ApsaraDB for ClickHouse.
yourPasswordThe password of the database account.
yourUrlThe endpoint of the cluster.
yourTableNameThe name of the table that is created in ApsaraDB for ClickHouse.
yourFilePathThe path of the data file that you want to import, which must include the file name.
colName1,colName2,colName3The name of the column in the ApsaraDB for ClickHouse table.
Compile and package the program.
sbt packageRun the program.
${SPARK_HOME}/bin/spark-submit --class "com.spark.test.WriteToCk" --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar