すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for ClickHouse:Spark からのインポート

最終更新日:Mar 29, 2026

Spark プログラムを使用して、JDBC 接続経由で CSV データを ApsaraDB for ClickHouse にバッチインポートします。このアプローチは、既存の Spark パイプラインがあり、ClickHouse JDBC ドライバーを使用して DataFrames を ClickHouse に直接書き込みたい場合に適しています。

複合型 (MAP、ARRAY、STRUCT) の完全なサポートが必要な場合、またはよりネイティブな統合を希望する場合は、代わりに Spark-ClickHouse ネイティブコネクタ を使用してください。ここで説明する JDBC アプローチは、これらの型をサポートしていません。

前提条件

開始する前に、以下を確認してください。

  • ご利用のオンプレミス環境のマシンの IP アドレスを ApsaraDB for ClickHouse クラスターのホワイトリストに追加済みであること。詳細については、「ホワイトリストの設定」をご参照ください。

  • インポートするデータとカラムのデータの型が一致する ApsaraDB for ClickHouse テーブルを作成済みであること。詳細については、「テーブルの作成」をご参照ください。

仕組み

Spark プログラムは、CSV ファイルを DataFrame に読み込み、ClickHouse JDBC ドライバーを使用して ApsaraDB for ClickHouse テーブルに書き込みます。行は、ポート 8123 の JDBC 接続経由でバッチで挿入されます。

主要な接続パラメーター:

パラメーター説明
batchsize100000バッチ挿入あたりの行数
socket_timeout300000ミリ秒単位のソケットタイムアウト
numPartitions8並列書き込みパーティション数。より大きなデータセットの場合は増加し、クラスターへの負荷を軽減するために減少させます。
rewriteBatchedStatementstrueスループット向上のため、バッチ挿入を単一の複数行ステートメントに書き換えます

CSV からのデータインポート

ステップ 1: プロジェクト構造の設定

Spark プロジェクトのディレクトリレイアウトを次に示します。

find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/com
./src/main/scala/com/spark
./src/main/scala/com/spark/test
./src/main/scala/com/spark/test/WriteToCk.scala

ステップ 2: 依存関係の追加

build.sbt に次のコンテンツを追加します。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"

libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"

ステップ 3: Spark プログラムの作成

次のコンテンツで src/main/scala/com/spark/test/WriteToCk.scala を作成します。実行する前に、以下の表に記載されているプレースホルダーを置き換えてください。

package com.spark.test

import java.util
import java.util.Properties

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

object WriteToCk {
  val properties = new Properties()
  properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
  properties.put("user", "<yourUserName>")
  properties.put("password", "<yourPassword>")
  properties.put("batchsize","100000")
  properties.put("socket_timeout","300000")
  properties.put("numPartitions","8")
  properties.put("rewriteBatchedStatements","true")

  val url = "jdbc:clickhouse://<yourUrl>:8123/default"
  val table = "<yourTableName>"

  def main(args: Array[String]): Unit = {
    val sc = new SparkConf()
    sc.set("spark.driver.memory", "1G")
    sc.set("spark.driver.cores", "4")
    sc.set("spark.executor.memory", "1G")
    sc.set("spark.executor.cores", "2")

    val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()

    val df = session.read.format("csv")
      .option("header", "true")
      .option("sep", ",")
      .option("inferSchema", "true")
      .load("<yourFilePath>")
      .selectExpr(
        "colName1",
        "colName2",
        "colName3",
         ...
      )
      .persist(StorageLevel.MEMORY_ONLY_SER_2)
    println(s"read done")

    df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
    println(s"write done")

    df.unpersist(true)
  }
}

次のプレースホルダーを実際の値に置き換えてください。

プレースホルダー説明必須
<yourUserName>ApsaraDB for ClickHouse のデータベースアカウントのユーザー名はい
<yourPassword>データベースアカウントのパスワードはい
<yourUrl>ApsaraDB for ClickHouse クラスターのエンドポイントはい
<yourTableName>ApsaraDB for ClickHouse の送信先テーブル名はい
<yourFilePath>インポートする CSV ファイルへのパス (ファイル名を含む)はい
colName1,colName2,colName3DataFrame から選択する ApsaraDB for ClickHouse テーブルのカラム名はい

ステップ 4: パッケージのビルド

プログラムをコンパイルしてパッケージ化するには、次のコマンドを実行します。

sbt package

出力 JAR は target/scala-2.12/simple-project_2.12-1.0.jar に生成されます。

ステップ 5: Spark ジョブの送信

ジョブを送信するには、次のコマンドを実行します。このコマンドは、ドライバーとエグゼキュータの両方のプロセスに対して、ClickHouse JDBC ドライバーをクラスパスに追加します。

${SPARK_HOME}/bin/spark-submit  --class "com.spark.test.WriteToCk"  --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar

制限事項

  • 複合データの型はサポートされていません: ClickHouse JDBC ドライバーは、MAP、ARRAY、STRUCT などの Spark 複合データの型をサポートしていません。これらの型を使用するには、Spark-ClickHouse ネイティブコネクタに切り替えてください。

  • インポート前にテーブルが存在している必要があります: JDBC は送信先テーブルを自動作成できません。Spark ジョブを実行する前に、ApsaraDB for ClickHouse でテーブルを作成してください。詳細については、「テーブルの作成」をご参照ください。