全部产品
Search
文档中心

ApsaraDB for ClickHouse:Gunakan program Spark untuk mengimpor data

更新时间:Jun 29, 2025

Topik ini menjelaskan cara menggunakan program Spark untuk mengimpor data ke ApsaraDB for ClickHouse.

Prasyarat

  • Alamat IP mesin lokal telah ditambahkan ke daftar putih kluster ApsaraDB for ClickHouse. Untuk informasi lebih lanjut, lihat Konfigurasi Daftar Putih.
  • Tabel ApsaraDB for ClickHouse telah dibuat. Tipe data tabel tersebut memetakan tipe data dari data yang ingin diimpor. Untuk informasi lebih lanjut, lihat Buat Tabel.

Prosedur

  1. Siapkan struktur direktori program Spark.
     find .
    .
    ./build.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/com
    ./src/main/scala/com/spark
    ./src/main/scala/com/spark/test
    ./src/main/scala/com/spark/test/WriteToCk.scala
  2. Tambahkan dependensi ke file konfigurasi build.sbt.
    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.12.10"
    
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
    
    libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"
  3. Buat file bernama WriteToCk.scala dan tulis data ke file tersebut.
    package com.spark.test
    
    import java.util
    import java.util.Properties
    
    import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.storage.StorageLevel
    
    object WriteToCk {
      val properties = new Properties()
      properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
      properties.put("user", "<yourUserName>")
      properties.put("password", "<yourPassword>")
      properties.put("batchsize","100000")
      properties.put("socket_timeout","300000")
      properties.put("numPartitions","8")
      properties.put("rewriteBatchedStatements","true")
    
      val url = "jdbc:clickhouse://<yourUrl>:8123/default"
      val table = "<yourTableName>"
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkConf()
        sc.set("spark.driver.memory", "1G")
        sc.set("spark.driver.cores", "4")
        sc.set("spark.executor.memory", "1G")
        sc.set("spark.executor.cores", "2")
    
        val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()
    
        val df = session.read.format("csv")
          .option("header", "true")
          .option("sep", ",")
          .option("inferSchema", "true")
          .load("<yourFilePath>")
          .selectExpr(
            "colName1",
            "colName2",
            "colName3",
             ...
          )
          .persist(StorageLevel.MEMORY_ONLY_SER_2)
        println(s"read done")
    
        df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
        println(s"write done")
    
        df.unpersist(true)
      }
    }
    Tabel berikut menjelaskan parameter-parameter tersebut.
    ParameterDeskripsi
    yourUserNameNama pengguna akun database yang dibuat di ApsaraDB for ClickHouse.
    yourPasswordKata sandi akun database.
    yourUrlTitik akhir kluster.
    yourTableNameNama tabel yang dibuat di ApsaraDB for ClickHouse.
    yourFilePathPath file data yang ingin Anda impor, harus mencakup nama file.
    colName1,colName2,colName3Nama kolom dalam tabel ApsaraDB for ClickHouse.
  4. Kompilasi dan kemas program.
    sbt package
  5. Jalankan program.
    ${SPARK_HOME}/bin/spark-submit  --class "com.spark.test.WriteToCk"  --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar