Spark プログラムを使用して、JDBC 接続経由で CSV データを ApsaraDB for ClickHouse にバッチインポートします。このアプローチは、既存の Spark パイプラインがあり、ClickHouse JDBC ドライバーを使用して DataFrames を ClickHouse に直接書き込みたい場合に適しています。
複合型 (MAP、ARRAY、STRUCT) の完全なサポートが必要な場合、またはよりネイティブな統合を希望する場合は、代わりに Spark-ClickHouse ネイティブコネクタ を使用してください。ここで説明する JDBC アプローチは、これらの型をサポートしていません。
前提条件
開始する前に、以下を確認してください。
ご利用のオンプレミス環境のマシンの IP アドレスを ApsaraDB for ClickHouse クラスターのホワイトリストに追加済みであること。詳細については、「ホワイトリストの設定」をご参照ください。
インポートするデータとカラムのデータの型が一致する ApsaraDB for ClickHouse テーブルを作成済みであること。詳細については、「テーブルの作成」をご参照ください。
仕組み
Spark プログラムは、CSV ファイルを DataFrame に読み込み、ClickHouse JDBC ドライバーを使用して ApsaraDB for ClickHouse テーブルに書き込みます。行は、ポート 8123 の JDBC 接続経由でバッチで挿入されます。
主要な接続パラメーター:
| パラメーター | 値 | 説明 |
|---|---|---|
batchsize | 100000 | バッチ挿入あたりの行数 |
socket_timeout | 300000 | ミリ秒単位のソケットタイムアウト |
numPartitions | 8 | 並列書き込みパーティション数。より大きなデータセットの場合は増加し、クラスターへの負荷を軽減するために減少させます。 |
rewriteBatchedStatements | true | スループット向上のため、バッチ挿入を単一の複数行ステートメントに書き換えます |
CSV からのデータインポート
ステップ 1: プロジェクト構造の設定
Spark プロジェクトのディレクトリレイアウトを次に示します。
find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/com
./src/main/scala/com/spark
./src/main/scala/com/spark/test
./src/main/scala/com/spark/test/WriteToCk.scalaステップ 2: 依存関係の追加
build.sbt に次のコンテンツを追加します。
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"ステップ 3: Spark プログラムの作成
次のコンテンツで src/main/scala/com/spark/test/WriteToCk.scala を作成します。実行する前に、以下の表に記載されているプレースホルダーを置き換えてください。
package com.spark.test
import java.util
import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
object WriteToCk {
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "<yourUserName>")
properties.put("password", "<yourPassword>")
properties.put("batchsize","100000")
properties.put("socket_timeout","300000")
properties.put("numPartitions","8")
properties.put("rewriteBatchedStatements","true")
val url = "jdbc:clickhouse://<yourUrl>:8123/default"
val table = "<yourTableName>"
def main(args: Array[String]): Unit = {
val sc = new SparkConf()
sc.set("spark.driver.memory", "1G")
sc.set("spark.driver.cores", "4")
sc.set("spark.executor.memory", "1G")
sc.set("spark.executor.cores", "2")
val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()
val df = session.read.format("csv")
.option("header", "true")
.option("sep", ",")
.option("inferSchema", "true")
.load("<yourFilePath>")
.selectExpr(
"colName1",
"colName2",
"colName3",
...
)
.persist(StorageLevel.MEMORY_ONLY_SER_2)
println(s"read done")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
println(s"write done")
df.unpersist(true)
}
}次のプレースホルダーを実際の値に置き換えてください。
| プレースホルダー | 説明 | 必須 |
|---|---|---|
<yourUserName> | ApsaraDB for ClickHouse のデータベースアカウントのユーザー名 | はい |
<yourPassword> | データベースアカウントのパスワード | はい |
<yourUrl> | ApsaraDB for ClickHouse クラスターのエンドポイント | はい |
<yourTableName> | ApsaraDB for ClickHouse の送信先テーブル名 | はい |
<yourFilePath> | インポートする CSV ファイルへのパス (ファイル名を含む) | はい |
colName1,colName2,colName3 | DataFrame から選択する ApsaraDB for ClickHouse テーブルのカラム名 | はい |
ステップ 4: パッケージのビルド
プログラムをコンパイルしてパッケージ化するには、次のコマンドを実行します。
sbt package出力 JAR は target/scala-2.12/simple-project_2.12-1.0.jar に生成されます。
ステップ 5: Spark ジョブの送信
ジョブを送信するには、次のコマンドを実行します。このコマンドは、ドライバーとエグゼキュータの両方のプロセスに対して、ClickHouse JDBC ドライバーをクラスパスに追加します。
${SPARK_HOME}/bin/spark-submit --class "com.spark.test.WriteToCk" --master local[4] --conf "spark.driver.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" --conf "spark.executor.extraClassPath=${HOME}/.m2/repository/ru/yandex/clickhouse/clickhouse-jdbc/0.2.4/clickhouse-jdbc-0.2.4.jar" target/scala-2.12/simple-project_2.12-1.0.jar制限事項
複合データの型はサポートされていません: ClickHouse JDBC ドライバーは、MAP、ARRAY、STRUCT などの Spark 複合データの型をサポートしていません。これらの型を使用するには、Spark-ClickHouse ネイティブコネクタに切り替えてください。
インポート前にテーブルが存在している必要があります: JDBC は送信先テーブルを自動作成できません。Spark ジョブを実行する前に、ApsaraDB for ClickHouse でテーブルを作成してください。詳細については、「テーブルの作成」をご参照ください。