Gunakan program Spark untuk mengimpor data CSV secara batch ke ApsaraDB for ClickHouse melalui koneksi JDBC. Pendekatan ini cocok jika Anda telah memiliki pipeline Spark dan ingin menulis DataFrames langsung ke ClickHouse menggunakan driver JDBC ClickHouse.
Jika Anda memerlukan dukungan penuh untuk tipe kompleks (MAP, ARRAY, STRUCT) atau lebih memilih integrasi yang lebih native, gunakan Spark-ClickHouse native connector sebagai gantinya. Pendekatan JDBC yang dijelaskan di sini tidak mendukung tipe-tipe tersebut.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Menambahkan alamat IP mesin on-premises Anda ke daftar putih kluster ApsaraDB for ClickHouse. Lihat Konfigurasi daftar putih.
Membuat tabel ApsaraDB for ClickHouse dengan tipe data kolom yang sesuai dengan data yang akan diimpor. Lihat Buat tabel.
Cara kerja
Program Spark membaca file CSV ke dalam DataFrame dan menuliskannya ke tabel ApsaraDB for ClickHouse menggunakan driver JDBC ClickHouse. Baris-baris dimasukkan secara batch melalui koneksi JDBC pada port 8123.
Parameter koneksi utama:
| Parameter | Nilai | Deskripsi |
|---|---|---|
batchsize | 100000 | Jumlah baris per penyisipan batch |
socket_timeout | 300000 | Timeout socket dalam milidetik |
numPartitions | 8 | Jumlah partisi penulisan paralel. Tingkatkan untuk set data yang lebih besar; kurangi untuk mengurangi beban pada kluster. |
rewriteBatchedStatements | true | Menulis ulang penyisipan batch menjadi satu pernyataan multi-baris untuk throughput yang lebih baik |
Impor data dari CSV
Langkah 1: Siapkan struktur proyek
Buat struktur direktori berikut untuk proyek Spark Anda:
find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/com
./src/main/scala/com/spark
./src/main/scala/com/spark/test
./src/main/scala/com/spark/test/WriteToCk.scalaLangkah 2: Tambahkan dependensi
Tambahkan konten berikut ke build.sbt:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"Langkah 3: Tulis program Spark
Buat file src/main/scala/com/spark/test/WriteToCk.scala dengan konten berikut. Ganti placeholder yang tercantum dalam tabel di bawah sebelum menjalankan.
package com.spark.test
import java.util
import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
object WriteToCk {
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "<yourUserName>")
properties.put("password", "<yourPassword>")
properties.put("batchsize","100000")
properties.put("socket_timeout","300000")
properties.put("numPartitions","8")
properties.put("rewriteBatchedStatements","true")
val url = "jdbc:clickhouse://<yourUrl>:8123/default"
val table = "<yourTableName>"
def main(args: Array[String]): Unit = {
val sc = new SparkConf()
sc.set("spark.driver.memory", "1G")
sc.set("spark.driver.cores", "4")
sc.set("spark.executor.memory", "1G")
sc.set("spark.executor.cores", "2")
val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()
val df = session.read.format("csv")
.option("header", "true")
.option("sep", ",")
.option("inferSchema", "true")
.load("<yourFilePath>")
.selectExpr(
"colName1",
"colName2",
"colName3",
...
)
.persist(StorageLevel.MEMORY_ONLY_SER_2)
println(s"read done")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
println(s"write done")
df.unpersist(true)
}
}Ganti placeholder berikut dengan nilai aktual Anda:
| Placeholder | Deskripsi | Wajib |
|---|---|---|
<yourUserName> | Username akun database di ApsaraDB for ClickHouse | Ya |
<yourPassword> | Password akun database | Ya |
<yourUrl> | Endpoint kluster ApsaraDB for ClickHouse | Ya |
<yourTableName> | Nama tabel tujuan di ApsaraDB for ClickHouse | Ya |
<yourFilePath> | Path ke file CSV yang akan diimpor, termasuk nama file-nya | Ya |
colName1,colName2,colName3 | Nama kolom di tabel ApsaraDB for ClickHouse yang dipilih dari DataFrame | Ya |
Langkah 4: Bangun paket
Jalankan perintah berikut untuk mengompilasi dan membuat paket program:
sbt packageFile JAR output dihasilkan di target/scala-2.12/simple-project_2.12-1.0.jar.
Langkah 5: Kirim pekerjaan Spark
Jalankan perintah berikut untuk mengirim pekerjaan. Perintah ini menambahkan driver JDBC ClickHouse ke classpath untuk proses driver maupun executor.
${SPARK_HOME}/bin/spark-submit --class "com.spark.test.WriteToCk" --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jarKeterbatasan
Tidak mendukung tipe data kompleks: Driver JDBC ClickHouse tidak mendukung tipe kompleks Spark seperti MAP, ARRAY, atau STRUCT. Untuk menggunakan tipe-tipe ini, beralihlah ke Spark-ClickHouse native connector.
Tabel harus sudah ada sebelum impor: JDBC tidak dapat membuat tabel tujuan secara otomatis. Buat tabel di ApsaraDB for ClickHouse sebelum menjalankan pekerjaan Spark. Lihat Buat tabel.