Topik ini menjelaskan cara menggunakan program Spark untuk mengimpor data ke ApsaraDB for ClickHouse.
Prasyarat
- Alamat IP mesin lokal telah ditambahkan ke daftar putih kluster ApsaraDB for ClickHouse. Untuk informasi lebih lanjut, lihat Konfigurasi Daftar Putih.
- Tabel ApsaraDB for ClickHouse telah dibuat. Tipe data tabel tersebut memetakan tipe data dari data yang ingin diimpor. Untuk informasi lebih lanjut, lihat Buat Tabel.
Prosedur
- Siapkan struktur direktori program Spark.
find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/com ./src/main/scala/com/spark ./src/main/scala/com/spark/test ./src/main/scala/com/spark/test/WriteToCk.scala - Tambahkan dependensi ke file konfigurasi build.sbt.
name := "Simple Project" version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4" - Buat file bernama WriteToCk.scala dan tulis data ke file tersebut.
package com.spark.test import java.util import java.util.Properties import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel object WriteToCk { val properties = new Properties() properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver") properties.put("user", "<yourUserName>") properties.put("password", "<yourPassword>") properties.put("batchsize","100000") properties.put("socket_timeout","300000") properties.put("numPartitions","8") properties.put("rewriteBatchedStatements","true") val url = "jdbc:clickhouse://<yourUrl>:8123/default" val table = "<yourTableName>" def main(args: Array[String]): Unit = { val sc = new SparkConf() sc.set("spark.driver.memory", "1G") sc.set("spark.driver.cores", "4") sc.set("spark.executor.memory", "1G") sc.set("spark.executor.cores", "2") val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate() val df = session.read.format("csv") .option("header", "true") .option("sep", ",") .option("inferSchema", "true") .load("<yourFilePath>") .selectExpr( "colName1", "colName2", "colName3", ... ) .persist(StorageLevel.MEMORY_ONLY_SER_2) println(s"read done") df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties) println(s"write done") df.unpersist(true) } }Tabel berikut menjelaskan parameter-parameter tersebut.Parameter Deskripsi yourUserNameNama pengguna akun database yang dibuat di ApsaraDB for ClickHouse. yourPasswordKata sandi akun database. yourUrlTitik akhir kluster. yourTableNameNama tabel yang dibuat di ApsaraDB for ClickHouse. yourFilePathPath file data yang ingin Anda impor, harus mencakup nama file. colName1,colName2,colName3Nama kolom dalam tabel ApsaraDB for ClickHouse. - Kompilasi dan kemas program.
sbt package - Jalankan program.
${SPARK_HOME}/bin/spark-submit --class "com.spark.test.WriteToCk" --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar