Use a Spark program to batch-import CSV data into ApsaraDB for ClickHouse over a JDBC connection. This approach is suitable when you already have a Spark pipeline and want to write DataFrames directly to ClickHouse using the ClickHouse JDBC driver.
If you need full support for complex types (MAP, ARRAY, STRUCT) or prefer a more native integration, use the Spark-ClickHouse native connector instead. The JDBC approach described here does not support those types.
Prerequisites
Before you begin, ensure that you have:
-
Added the IP address of your on-premises machine to the ApsaraDB for ClickHouse cluster whitelist. See Configure a whitelist.
-
Created an ApsaraDB for ClickHouse table whose column data types match the data you want to import. See Create a table.
How it works
The Spark program reads a CSV file into a DataFrame and writes it to an ApsaraDB for ClickHouse table using the ClickHouse JDBC driver. Rows are inserted in batches over a JDBC connection on port 8123.
Key connection parameters:
| Parameter | Value | Description |
|---|---|---|
batchsize |
100000 |
Number of rows per batch insert |
socket_timeout |
300000 |
Socket timeout in milliseconds |
numPartitions |
8 |
Number of parallel write partitions. Increase for larger datasets; decrease to reduce load on the cluster. |
rewriteBatchedStatements |
true |
Rewrites batch inserts into a single multi-row statement for better throughput |
Import data from CSV
Step 1: Set up the project structure
Create the following directory layout for your Spark project:
find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/com
./src/main/scala/com/spark
./src/main/scala/com/spark/test
./src/main/scala/com/spark/test/WriteToCk.scala
Step 2: Add dependencies
Add the following content to build.sbt:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"
Step 3: Write the Spark program
Create src/main/scala/com/spark/test/WriteToCk.scala with the following content. Replace the placeholders listed in the table below before running.
package com.spark.test
import java.util
import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
object WriteToCk {
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "<yourUserName>")
properties.put("password", "<yourPassword>")
properties.put("batchsize","100000")
properties.put("socket_timeout","300000")
properties.put("numPartitions","8")
properties.put("rewriteBatchedStatements","true")
val url = "jdbc:clickhouse://<yourUrl>:8123/default"
val table = "<yourTableName>"
def main(args: Array[String]): Unit = {
val sc = new SparkConf()
sc.set("spark.driver.memory", "1G")
sc.set("spark.driver.cores", "4")
sc.set("spark.executor.memory", "1G")
sc.set("spark.executor.cores", "2")
val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()
val df = session.read.format("csv")
.option("header", "true")
.option("sep", ",")
.option("inferSchema", "true")
.load("<yourFilePath>")
.selectExpr(
"colName1",
"colName2",
"colName3",
...
)
.persist(StorageLevel.MEMORY_ONLY_SER_2)
println(s"read done")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
println(s"write done")
df.unpersist(true)
}
}
Replace the following placeholders with your actual values:
| Placeholder | Description | Required |
|---|---|---|
<yourUserName> |
Username of the database account in ApsaraDB for ClickHouse | Yes |
<yourPassword> |
Password of the database account | Yes |
<yourUrl> |
Endpoint of the ApsaraDB for ClickHouse cluster | Yes |
<yourTableName> |
Name of the destination table in ApsaraDB for ClickHouse | Yes |
<yourFilePath> |
Path to the CSV file to import, including the filename | Yes |
colName1,colName2,colName3 |
Column names in the ApsaraDB for ClickHouse table to select from the DataFrame | Yes |
Step 4: Build the package
Run the following command to compile and package the program:
sbt package
The output JAR is generated at target/scala-2.12/simple-project_2.12-1.0.jar.
Step 5: Submit the Spark job
Run the following command to submit the job. The command adds the ClickHouse JDBC driver to the classpath for both the driver and executor processes.
${SPARK_HOME}/bin/spark-submit --class "com.spark.test.WriteToCk" --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar
Limitations
-
No support for complex data types: The ClickHouse JDBC driver does not support Spark complex types such as MAP, ARRAY, or STRUCT. To use these types, switch to the Spark-ClickHouse native connector.
-
Tables must exist before import: JDBC cannot auto-create the destination table. Create the table in ApsaraDB for ClickHouse before running the Spark job. See Create a table.