All Products
Search
Document Center

E-MapReduce:Import data from Spark to a ClickHouse cluster

Last Updated:Mar 12, 2024

This topic describes how to import data from Spark to a ClickHouse cluster.

Prerequisites

Background information

For more information about Spark, see Overview.

Sample code

The following sample code is provided:

package com.company.packageName

import java.util.Properties
import java.util.concurrent.ThreadLocalRandom

import scala.annotation.tailrec

import com.google.common.collect.ImmutableMap
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}

case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)

object CKDataImporter extends Logging {

  private var dbName: String = "default"
  private var tableName: String = ""
  private var ckHost: String = ""
  private var ckPort: String = "8123"
  private var user: String = "default"
  private var password: String = ""
  private var local: Boolean = false

  def main(args: Array[String]): Unit = {
    parse(args.toList)
    checkArguments()

    val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"

    logInfo(s"Use jdbc: $jdbcUrl")
    logInfo(s"Use table: $tableName")

    val spark = getSparkSession

    // generate test data
    val rdd = spark.sparkContext.parallelize(1 to 1000).map(i => {
      val rand = ThreadLocalRandom.current()
      val randString = (0 until rand.nextInt(10, 20))
        .map(_ => rand.nextLong())
        .mkString("")

      Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
    })

    val df = spark.createDataFrame(rdd)

    df.write
      .mode(SaveMode.Append)
      .jdbc(jdbcUrl, tableName, getCKJdbcProperties(user, password))
  }

  private def printUsageAndExit(exitCode: Int = 0): Unit = {
    logError("Usage: java -jar /path/to/CKDataImporter.jar [options]")
    logError("  --dbName      Specifies the name of the ClickHouse database. Default value: default.")
    logError("  --tableName   Specifies the name of the table in the ClickHouse database.")
    logError("  --ckHost      Specifies the IP address of the ClickHouse cluster.")
    logError("  --ckPort      Specifies the port number of the ClickHouse cluster. Default value: 8123.")
    logError("  --user        Specifies the username that is used to access the ClickHouse cluster.")
    logError("  --password    Specifies the password that is used to access the ClickHouse cluster.")
    logError("  --local       Specifies whether the sample code runs in Spark local mode.")
    System.exit(exitCode)
  }

  @tailrec
  private def parse(args: List[String]): Unit = args match {
    case ("--help" | "-h") :: _ =>
      printUsageAndExit()
    case "--dbName" :: value :: tail =>
      dbName = value
      parse(tail)
    case "--tableName" :: value :: tail =>
      tableName = value
      parse(tail)
    case "--ckHost" :: value :: tail =>
      ckHost = value
      parse(tail)
    case "--ckPort" :: value :: tail =>
      ckPort = value
      parse(tail)
    case "--user" :: value :: tail =>
      user = value
      parse(tail)
    case "--password" :: value :: tail =>
      password = value
      parse(tail)
    case "--local" :: tail =>
      local = true
      parse(tail)
    case Nil =>
    case _ =>
      printUsageAndExit(1)
  }

  private def checkArguments(): Unit = {
    if ("".equals(tableName) || "".equals(ckHost)) {
      printUsageAndExit(2)
    }
  }

  private def getCKJdbcProperties(
      user: String,
      password: String,
      batchSize: String = "1000",
      socketTimeout: String = "300000",
      numPartitions: String = "8",
      rewriteBatchedStatements: String = "true"): Properties = {
    val kvMap = ImmutableMap.builder()
      .put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      .put("user", user)
      .put("password", password)
      .put("batchsize", batchSize)
      .put("socket_timeout", socketTimeout)
      .put("numPartitions", numPartitions)
      .put("rewriteBatchedStatements", rewriteBatchedStatements)
      .build()
    val properties = new Properties
    properties.putAll(kvMap)
    properties
  }

  private def getSparkSession: SparkSession = {
    val builder = SparkSession.builder()
    if (local) {
      builder.master("local[*]")
    }
    builder.appName("ClickHouse-Data-Importer")
    builder.getOrCreate()
  }
}

Procedure

  1. Step 1: Create ClickHouse tables

  2. Step 2: Compile and package the code

  3. Step 3: Submit a job

Step 1: Create ClickHouse tables

  1. Log on to the ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to start the ClickHouse client:

    clickhouse-client -h core-1-1 -m
    Note

    In the sample command, core-1-1 indicates the name of the core node that you log on to. If you have multiple core nodes, you can log on to one of the nodes.

  3. Create a ClickHouse database and the required ClickHouse tables.

    1. Run the following command to create a database named clickhouse_database_name:

      CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

      Alibaba Cloud EMR automatically generates a ClickHouse cluster named cluster_emr. You can customize the name of the database.

    2. Run the following command to create a table named clickhouse_table_name_local:

      CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
        id            UInt32,
        key1            String,
        value1        UInt8,
        key2            Int64,
        value2        Float64
      ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
      ORDER BY id;
      Note

      You can customize the name of the table, but the table name must end with _local. The layer, shard, and replica parameters are macros that are automatically generated by Alibaba Cloud EMR for ClickHouse clusters and can be directly used.

    3. Run the following command to create a table named clickhouse_table_name_all. The fields in this table are defined in the same way as the fields in the clickhouse_table_name_local table.

      Note

      You can customize the name of the table, but the table name must end with _all.

      CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
        id                    UInt32,
        key1                  String,
        value1                UInt8,
        key2                  Int64,
        value2                Float64
      ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

Step 2: Compile and package the code

  1. Download and decompress the CKDataImporter file to your on-premises machine.

  2. In the CLI, go to the path that stores the pom.xml file and run the following command to package the file:

    mvn clean package

    The JAR package CKDataImporter-1.0.0.jar is generated in the target directory based on the artifactId information in the pom.xml file.

Step 3: Submit a job

  1. Log on to the Hadoop cluster in SSH mode. For more information, see Log on to a cluster.

  2. Upload the CKDataImporter-1.0.0.jar package to the root directory of the Hadoop cluster.

    Note

    In this example, the CKDataImporter-1.0.0.jar package is uploaded to the root directory. You can upload the package to a directory based on your business requirements.

  3. Run the following command to submit the job:

    spark-submit --master yarn \
                 --class com.aliyun.emr.CKDataImporter \
                  CKDataImporter-1.0.0.jar \
                 --dbName clickhouse_database_name \
                 --tableName clickhouse_table_name_all \
                 --ckHost ${clickhouse_host} \
                 --password ${password};

    Parameter

    Description

    dbName

    The name of the ClickHouse database. Default value: default. In this example, a database named clickhouse_database_name is used.

    tableName

    The name of the table in the ClickHouse database. In this example, a table named clickhouse_table_name_all is used.

    ckHost

    The private or public IP address of the master node of the ClickHouse cluster. For more information about how to obtain the IP address of the master node, see Obtain the IP address of the master node.

    password

    The password that is used to access the ClickHouse cluster.

    The value of the users.default.password parameter is the password that you can use to access the ClickHouse cluster. You can view the value of the parameter on the Configure tab of the ClickHouse service page.

Obtain the IP address of the master node

  1. Go to the Nodes tab.

    1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.

    2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.

    3. On the EMR on ECS page, find the cluster that you want to manage and click Nodes in the Actions column.

  2. On the Nodes tab, find the master node group, click the open icon, and then copy the IP address in the Public IP Address column.