This topic describes how to use the Spark program to write data to an ApsaraDB for ClickHouse cluster.

Procedure

  1. Prepare the directory structure of the Spark program.
     find .
    .
    ./build.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/com
    ./src/main/scala/com/spark
    ./src/main/scala/com/spark/test
    ./src/main/scala/com/spark/test/WriteToCk.scala
  2. Add dependencies to the build.sbt configuration file.
    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.12.10"
    
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
    
    libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"
  3. Create a WriteToCk.scala file used to write data.
    package com.spark.test
    
    import java.util
    import java.util.Properties
    
    import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.storage.StorageLevel
    
    object WriteToCk {
      val properties = new Properties()
      properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      properties.put("user", "<your-user-name>")
      properties.put("password", "<your-password>")
      properties.put("batchsize","100000")
      properties.put("socket_timeout","300000")
      properties.put("numPartitions","8")
      properties.put("rewriteBatchedStatements","true")
    
      val url = "jdbc:clickhouse://<you-url>:8123/default"
      val table = "<your-table-name>"
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkConf()
        sc.set("spark.driver.memory", "1G")
        sc.set("spark.driver.cores", "4")
        sc.set("spark.executor.memory", "1G")
        sc.set("spark.executor.cores", "2")
    
        val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()
    
        val df = session.read.format("csv")
          .option("header", "true")
          .option("sep", ",")
          .option("inferSchema", "true")
          .load("</your/path/to/test/data/a.txt>")
          .selectExpr(
            "Year",
            "Quarter",
            "Month"
          )
          .persist(StorageLevel.MEMORY_ONLY_SER_2)
        println(s"read done")
    
        df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
        println(s"write done")
    
        df.unpersist(true)
      }
    }

    Parameters in the file:

    • your-user-name: the name of the database account created in the destination ApsaraDB for ClickHouse cluster.
    • your-password: the password of the database account.
    • your-url: the endpoint of the destination ApsaraDB for ClickHouse cluster.
    • /your/path/to/test/data/a.txt: the path of the data file to be imported, which includes the file path and file name.
      Note The data and schema in the file must be consistent with the structure of the destination table in the ApsaraDB for ClickHouse cluster.
    • your-table-name: the name of the destination table in the ApsaraDB for ClickHouse cluster.
  4. Compile and package the file.
    sbt package
  5. Run the file.
    ${SPARK_HOME}/bin/spark-submit  --class "com.spark.test.WriteToCk"  --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar