Use the CKDataImporter tool to write data from a Spark job to an E-MapReduce (EMR) ClickHouse cluster over JDBC. The tool connects to ClickHouse using the JDBC URL format jdbc:clickhouse://<host>:<port>/<database> and appends data to an existing Distributed table.
Prerequisites
Before you begin, ensure that you have:
An EMR Hadoop cluster. For details, see Create a cluster.
An EMR ClickHouse cluster. For details, see Create a ClickHouse cluster.
Background information
For more information about Spark, see Overview.
How it works
EMR automatically provisions a ClickHouse cluster named cluster_emr and generates macros ({layer}, {shard}, {replica}) for distributed table configuration. To write data from Spark, you need two tables:
A local
ReplicatedMergeTreetable (name must end with_local) on each shardA
Distributedtable (name must end with_all) that routes writes across shards
The Spark job connects to the Distributed table's JDBC endpoint. SaveMode.Append is used for all writes.
Step 1: Create ClickHouse tables
Log on to the ClickHouse cluster in SSH mode. For details, see Log on to a cluster.
Start the ClickHouse client. Replace
core-1-1with the name of a core node in your cluster.clickhouse-client -h core-1-1 -mCreate a database on the cluster:
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;Replace
clickhouse_database_namewith your database name.cluster_emris the cluster name that EMR generates automatically.Create the local table. The table name must end with
_local.CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}') ORDER BY id;The
{layer},{shard}, and{replica}placeholders are macros that EMR generates for your ClickHouse cluster. Use them as shown.Create the Distributed table. The table name must end with
_all, and its columns must match the local table.CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
Step 2: Compile and package the code
Download and decompress CKDataImporter.tgz to your on-premises machine.
In a terminal, go to the directory that contains
pom.xmland run:mvn clean packageThe output JAR
CKDataImporter-1.0.0.jaris generated in thetargetdirectory. The name comes from theartifactIdfield inpom.xml.
Step 3: Submit a job
Log on to the Hadoop cluster in SSH mode. For details, see Log on to a cluster.
Upload
CKDataImporter-1.0.0.jarto the Hadoop cluster. The following example uploads to the root directory; adjust the path to match your environment.Submit the Spark job:
spark-submit --master yarn \ --class com.aliyun.emr.CKDataImporter \ CKDataImporter-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost <clickhouse-master-node-ip> \ --password <password>Parameter Description Default --dbNameName of the ClickHouse database default--tableNameName of the Distributed table (must end with _all)Required --ckHostPrivate or public IP address of the ClickHouse cluster master node. See Get the master node IP address. Required --userUsername for the ClickHouse cluster default--passwordPassword for the ClickHouse cluster. Find this value in the users.default.passwordparameter on the Configure tab of the ClickHouse service page.None --localRun the Spark job in local mode instead of YARN false
JDBC connection parameters
The CKDataImporter tool uses the following default JDBC connection parameters. Adjust these values based on your data volume and cluster capacity.
| Parameter | Default | Description |
|---|---|---|
batchsize | 1000 | Number of rows sent per JDBC batch insert. Increase for higher throughput on large datasets. |
socket_timeout | 300000 | Socket timeout in milliseconds (5 minutes). Increase if you experience timeout errors on slow networks or large batches. |
numPartitions | 8 | Number of parallel write partitions. Set this to match the number of shards in your ClickHouse cluster for optimal parallelism. |
rewriteBatchedStatements | true | Rewrites batch inserts into a single multi-row statement for better throughput. Keep this enabled unless you encounter compatibility issues. |
Get the master node IP address
Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.
In the top navigation bar, select the region where your cluster resides and select a resource group.
On the EMR on ECS page, find your cluster and click Nodes in the Actions column.
On the Nodes tab, find the master node group, click the
icon, and copy the IP address from the Public IP Address column.