このトピックでは、Spark Structured Streaming を使用して Iceberg テーブルにデータを書き込む方法について説明します。
前提条件
制限事項
DataLake クラスターまたはカスタムクラスターと Dataflow Kafka クラスターは、同じ仮想プライベートクラウド (VPC) の同じ vSwitch にデプロイする必要があります。
ストリーミングモードで Iceberg テーブルにデータを書き込む
Spark Structured Streaming の DataStreamWriter API を呼び出して、Iceberg テーブルにデータを書き込みます。
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("checkpointLocation", checkpointPath)
.start()コード内の tableIdentifier パラメーターは、メタデータテーブルの名前またはメタデータテーブルのパスを指定します。次のいずれかの方法を使用して、ストリーミングモードで Iceberg テーブルにデータを書き込むことができます。
append: 各バッチのデータを Iceberg テーブルに追加します。このメソッドは INSERT INTO 操作と同等です。
complete: Iceberg テーブルのデータを最新バッチのデータで上書きします。このメソッドは INSERT OVERWRITE 操作と同等です。
例
このセクションでは、Dataflow クラスターからデータを読み取り、Iceberg テーブルにデータを書き込む例を示します。関連コードをパッケージ化し、パッケージ化されたコードを EMR クラスターにアップロードした後、spark-submit コマンドを実行して Spark ジョブを実行し、データの読み取りと書き込みを実装できます。
Kafka スクリプトを使用してテスト用のトピックを作成し、テストデータを準備します。
SSH モードで Dataflow クラスターにログオンします。詳細については、「クラスターへのログオン」をご参照ください。
次のコマンドを実行して、iceberg_test という名前のトピックを作成します。
kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --create次のコマンドを実行して、テストデータを準備します。
kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
Spark SQL を使用して、テスト用の iceberg_db という名前のデータベースと iceberg_table という名前のテーブルを作成します。詳細については、「Iceberg の使用」をご参照ください。
Maven プロジェクトを作成し、Spark の依存関係を追加し、Scala でコードをコンパイルするために使用される Maven プラグインを追加します。pom.xml ファイルのサンプル設定:
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> <plugins> <!-- the Maven Scala plugin will compile Scala source files --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>Spark コードを記述します。
Scala のサンプルコード:
重要カタログのパラメーターとデフォルト名は、クラスターのバージョンによって異なります。この例では、DLF を使用してメタデータを管理します。この例では、EMR V5.3.0 クラスターと
dlf_catalogという名前のカタログを使用しています。詳細については、「DLF メタデータの設定」をご参照ください。def main(args: Array[String]): Unit = { // カタログのパラメーターを設定します。 val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>") val spark = SparkSession .builder() .config(sparkConf) .appName("StructuredSinkIceberg") .getOrCreate() val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint" val bootstrapServers = "192.168.XX.XX:9092" val topic = "iceberg_test" // Dataflow クラスターからデータを読み取ります。 val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .load() val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // Base64 エンコードされた文字列を通常の文字列にデコードします。 "CAST(value AS STRING) AS data") .select( col("strKey").cast(LongType).alias("id"), // STRING 型の文字列を LONG 型の文字列に変換します。 col("data") ) // ストリーミングモードで Iceberg テーブルにデータを書き込みます。 val query = resDF.writeStream .format("iceberg") .outputMode("append") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .option("path", "dlf_catalog.iceberg_db.iceberg_table") .option("checkpointLocation", checkpointPath) .start() query.awaitTermination() }ビジネス要件に基づいて、次の表に示すパラメーターの値を変更できます。
パラメーター
説明
checkpointPath
Spark Structured Streaming を使用して書き込まれたデータのチェックポイントパス。
bootstrapServers
Kafka クラスター内の Kafka ブローカーのプライベート IP アドレス。
topic
トピックの名前。
コードをパッケージ化し、EMR クラスターにデプロイします。
オンプレミスマシンでコードをデバッグした後、次のコマンドを実行してコードをパッケージ化します。
mvn clean installSSH モードで EMR クラスターにログオンします。詳細については、「クラスターへのログオン」をご参照ください。
JAR パッケージを EMR クラスターにアップロードします。
この例では、JAR パッケージは EMR クラスターのルートディレクトリにアップロードされます。
Spark ジョブを送信して実行します。
spark-submit コマンドを実行して Spark ジョブを実行します。
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 2 \ --executor-memory 3g \ --num-executors 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \ --class com.aliyun.iceberg.StructuredSinkIceberg \ iceberg-demos.jar説明上記のコードの <version> を特定のバージョンに置き換えます。spark-sql-kafka は Spark および Kafka と互換性がある必要があります。
この例では、iceberg-demos.jar という名前の JAR パッケージを使用しています。--class パラメーターの値と JAR パッケージの名前は、ビジネス要件に基づいて変更できます。
Spark SQL を使用してデータの変更をクエリします。詳細については、「基本的な使い方」をご参照ください。