All Products
Search
Document Center

E-MapReduce:Import data from Spark to a ClickHouse cluster

Last Updated:Mar 26, 2026

Use the CKDataImporter tool to write data from a Spark job to an E-MapReduce (EMR) ClickHouse cluster over JDBC. The tool connects to ClickHouse using the JDBC URL format jdbc:clickhouse://<host>:<port>/<database> and appends data to an existing Distributed table.

Prerequisites

Before you begin, ensure that you have:

Background information

For more information about Spark, see Overview.

How it works

EMR automatically provisions a ClickHouse cluster named cluster_emr and generates macros ({layer}, {shard}, {replica}) for distributed table configuration. To write data from Spark, you need two tables:

  • A local ReplicatedMergeTree table (name must end with _local) on each shard

  • A Distributed table (name must end with _all) that routes writes across shards

The Spark job connects to the Distributed table's JDBC endpoint. SaveMode.Append is used for all writes.

Step 1: Create ClickHouse tables

  1. Log on to the ClickHouse cluster in SSH mode. For details, see Log on to a cluster.

  2. Start the ClickHouse client. Replace core-1-1 with the name of a core node in your cluster.

    clickhouse-client -h core-1-1 -m
  3. Create a database on the cluster:

    CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

    Replace clickhouse_database_name with your database name. cluster_emr is the cluster name that EMR generates automatically.

  4. Create the local table. The table name must end with _local.

    CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
      id            UInt32,
      key1          String,
      value1        UInt8,
      key2          Int64,
      value2        Float64
    ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
    ORDER BY id;

    The {layer}, {shard}, and {replica} placeholders are macros that EMR generates for your ClickHouse cluster. Use them as shown.

  5. Create the Distributed table. The table name must end with _all, and its columns must match the local table.

    CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
      id            UInt32,
      key1          String,
      value1        UInt8,
      key2          Int64,
      value2        Float64
    ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

Step 2: Compile and package the code

  1. Download and decompress CKDataImporter.tgz to your on-premises machine.

  2. In a terminal, go to the directory that contains pom.xml and run:

    mvn clean package

    The output JAR CKDataImporter-1.0.0.jar is generated in the target directory. The name comes from the artifactId field in pom.xml.

Step 3: Submit a job

  1. Log on to the Hadoop cluster in SSH mode. For details, see Log on to a cluster.

  2. Upload CKDataImporter-1.0.0.jar to the Hadoop cluster. The following example uploads to the root directory; adjust the path to match your environment.

  3. Submit the Spark job:

    spark-submit --master yarn \
                 --class com.aliyun.emr.CKDataImporter \
                 CKDataImporter-1.0.0.jar \
                 --dbName clickhouse_database_name \
                 --tableName clickhouse_table_name_all \
                 --ckHost <clickhouse-master-node-ip> \
                 --password <password>
    ParameterDescriptionDefault
    --dbNameName of the ClickHouse databasedefault
    --tableNameName of the Distributed table (must end with _all)Required
    --ckHostPrivate or public IP address of the ClickHouse cluster master node. See Get the master node IP address.Required
    --userUsername for the ClickHouse clusterdefault
    --passwordPassword for the ClickHouse cluster. Find this value in the users.default.password parameter on the Configure tab of the ClickHouse service page.None
    --localRun the Spark job in local mode instead of YARNfalse

JDBC connection parameters

The CKDataImporter tool uses the following default JDBC connection parameters. Adjust these values based on your data volume and cluster capacity.

ParameterDefaultDescription
batchsize1000Number of rows sent per JDBC batch insert. Increase for higher throughput on large datasets.
socket_timeout300000Socket timeout in milliseconds (5 minutes). Increase if you experience timeout errors on slow networks or large batches.
numPartitions8Number of parallel write partitions. Set this to match the number of shards in your ClickHouse cluster for optimal parallelism.
rewriteBatchedStatementstrueRewrites batch inserts into a single multi-row statement for better throughput. Keep this enabled unless you encounter compatibility issues.

Get the master node IP address

  1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.

  2. In the top navigation bar, select the region where your cluster resides and select a resource group.

  3. On the EMR on ECS page, find your cluster and click Nodes in the Actions column.

  4. On the Nodes tab, find the master node group, click the open icon, and copy the IP address from the Public IP Address column.