Tema ini menjelaskan cara menulis data ke tabel Iceberg menggunakan Spark Structured Streaming.
Prasyarat
Sebuah kluster DataLake atau kluster kustom telah dibuat. Untuk informasi lebih lanjut, lihat Buat Kluster.
Sebuah kluster Dataflow yang berisi layanan Kafka telah dibuat. Untuk informasi lebih lanjut, lihat Buat Kluster.
Batasan
Kluster DataLake atau kluster kustom dan kluster Dataflow Kafka harus diterapkan di vSwitch yang sama dari virtual private cloud (VPC) yang sama.
Menulis data ke tabel Iceberg dalam mode streaming
Tulis data ke tabel Iceberg dengan memanggil API DataStreamWriter di Spark Structured Streaming.
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("checkpointLocation", checkpointPath)
.start()Parameter tableIdentifier dalam kode menentukan nama tabel metadata atau jalur tabel metadata. Anda dapat menggunakan salah satu metode berikut untuk menulis data ke tabel Iceberg dalam mode streaming:
append: menambahkan data di setiap batch ke tabel Iceberg. Metode ini setara dengan operasi INSERT INTO.
complete: menimpa data di tabel Iceberg dengan data di batch terbaru. Metode ini setara dengan operasi INSERT OVERWRITE.
Contoh
Bagian ini memberikan contoh tentang cara membaca data dari kluster Dataflow dan menulis data tersebut ke tabel Iceberg. Anda dapat menjalankan perintah spark-submit untuk menjalankan pekerjaan Spark untuk mengimplementasikan pembacaan dan penulisan data setelah Anda mengemas kode terkait dan mengunggah kode yang telah dikemas ke kluster EMR Anda.
Gunakan skrip Kafka untuk membuat topik untuk pengujian dan menyiapkan data uji.
Masuk ke kluster Dataflow dalam mode SSH. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.
Jalankan perintah berikut untuk membuat topik bernama iceberg_test:
kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --createJalankan perintah berikut untuk menyiapkan data uji:
kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
Gunakan Spark SQL untuk membuat database bernama iceberg_db dan tabel bernama iceberg_table untuk pengujian. Untuk informasi lebih lanjut, lihat Penggunaan Iceberg.
Buat proyek Maven, tambahkan dependensi Spark, dan tambahkan plugin Maven yang digunakan untuk mengkompilasi kode dalam Scala. Konfigurasi sampel dalam file pom.xml:
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> <plugins> <!-- Plugin Maven Scala akan mengkompilasi file sumber Scala --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>Tulis kode Spark.
Kode sampel dalam Scala:
PentingParameter dan nama default katalog bervariasi berdasarkan versi kluster Anda. Dalam contoh ini, DLF digunakan untuk mengelola metadata. Dalam contoh ini, kluster EMR V5.3.0 dan katalog bernama
dlf_catalogdigunakan. Untuk informasi lebih lanjut, lihat Konfigurasi Metadata DLF.def main(args: Array[String]): Unit = { // Konfigurasikan parameter untuk katalog. val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>") val spark = SparkSession .builder() .config(sparkConf) .appName("StructuredSinkIceberg") .getOrCreate() val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint" val bootstrapServers = "192.168.XX.XX:9092" val topic = "iceberg_test" // Baca data dari kluster Dataflow. val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .load() val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // Dekode string yang disandikan Base64 menjadi string umum. "CAST(value AS STRING) AS data") .select( col("strKey").cast(LongType).alias("id"), // Ubah string tipe STRING menjadi string tipe LONG. col("data") ) // Tulis data ke tabel Iceberg dalam mode streaming. val query = resDF.writeStream .format("iceberg") .outputMode("append") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .option("path", "dlf_catalog.iceberg_db.iceberg_table") .option("checkpointLocation", checkpointPath) .start() query.awaitTermination() }Anda dapat mengubah nilai parameter yang dijelaskan dalam tabel berikut berdasarkan kebutuhan bisnis Anda.
Parameter
Deskripsi
checkpointPath
Jalur checkpoint data yang ditulis menggunakan Spark Structured Streaming.
bootstrapServers
Alamat IP pribadi broker Kafka di kluster Kafka.
topic
Nama topik.
Kemas kode, dan sebarkan kode ke kluster EMR.
Setelah Anda men-debug kode di mesin lokal Anda, jalankan perintah berikut untuk mengemas kode:
mvn clean installMasuk ke kluster EMR Anda dalam mode SSH. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.
Unggah paket JAR ke kluster EMR.
Dalam contoh ini, paket JAR diunggah ke direktori root kluster EMR.
Kirim dan jalankan pekerjaan Spark.
Jalankan perintah spark-submit untuk menjalankan pekerjaan Spark:
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 2 \ --executor-memory 3g \ --num-executors 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \ --class com.aliyun.iceberg.StructuredSinkIceberg \ iceberg-demos.jarCatatanGanti <version> dalam kode di atas dengan versi tertentu. spark-sql-kafka harus kompatibel dengan Spark dan Kafka.
Dalam contoh ini, paket JAR bernama iceberg-demos.jar digunakan. Anda dapat mengubah nilai parameter --class dan nama paket JAR berdasarkan kebutuhan bisnis Anda.
Gunakan Spark SQL untuk memeriksa perubahan data. Untuk informasi lebih lanjut, lihat Penggunaan Dasar.