全部产品
Search
文档中心

E-MapReduce:Gunakan Spark untuk menulis data ke tabel Iceberg dalam mode streaming

更新时间:Jul 06, 2025

Tema ini menjelaskan cara menulis data ke tabel Iceberg menggunakan Spark Structured Streaming.

Prasyarat

  • Sebuah kluster DataLake atau kluster kustom telah dibuat. Untuk informasi lebih lanjut, lihat Buat Kluster.

  • Sebuah kluster Dataflow yang berisi layanan Kafka telah dibuat. Untuk informasi lebih lanjut, lihat Buat Kluster.

Batasan

Kluster DataLake atau kluster kustom dan kluster Dataflow Kafka harus diterapkan di vSwitch yang sama dari virtual private cloud (VPC) yang sama.

Menulis data ke tabel Iceberg dalam mode streaming

Tulis data ke tabel Iceberg dengan memanggil API DataStreamWriter di Spark Structured Streaming.

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()
Catatan

Parameter tableIdentifier dalam kode menentukan nama tabel metadata atau jalur tabel metadata. Anda dapat menggunakan salah satu metode berikut untuk menulis data ke tabel Iceberg dalam mode streaming:

  • append: menambahkan data di setiap batch ke tabel Iceberg. Metode ini setara dengan operasi INSERT INTO.

  • complete: menimpa data di tabel Iceberg dengan data di batch terbaru. Metode ini setara dengan operasi INSERT OVERWRITE.

Contoh

Bagian ini memberikan contoh tentang cara membaca data dari kluster Dataflow dan menulis data tersebut ke tabel Iceberg. Anda dapat menjalankan perintah spark-submit untuk menjalankan pekerjaan Spark untuk mengimplementasikan pembacaan dan penulisan data setelah Anda mengemas kode terkait dan mengunggah kode yang telah dikemas ke kluster EMR Anda.

  1. Gunakan skrip Kafka untuk membuat topik untuk pengujian dan menyiapkan data uji.

    1. Masuk ke kluster Dataflow dalam mode SSH. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.

    2. Jalankan perintah berikut untuk membuat topik bernama iceberg_test:

      kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --create
    3. Jalankan perintah berikut untuk menyiapkan data uji:

      kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
  2. Gunakan Spark SQL untuk membuat database bernama iceberg_db dan tabel bernama iceberg_table untuk pengujian. Untuk informasi lebih lanjut, lihat Penggunaan Iceberg.

  3. Buat proyek Maven, tambahkan dependensi Spark, dan tambahkan plugin Maven yang digunakan untuk mengkompilasi kode dalam Scala. Konfigurasi sampel dalam file pom.xml:

    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- Plugin Maven Scala akan mengkompilasi file sumber Scala -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  4. Tulis kode Spark.

    Kode sampel dalam Scala:

    Penting

    Parameter dan nama default katalog bervariasi berdasarkan versi kluster Anda. Dalam contoh ini, DLF digunakan untuk mengelola metadata. Dalam contoh ini, kluster EMR V5.3.0 dan katalog bernama dlf_catalog digunakan. Untuk informasi lebih lanjut, lihat Konfigurasi Metadata DLF.

    def main(args: Array[String]): Unit = {
    
      // Konfigurasikan parameter untuk katalog. 
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
        .builder()
        .config(sparkConf)
        .appName("StructuredSinkIceberg")
        .getOrCreate()
    
      val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
      val bootstrapServers = "192.168.XX.XX:9092"
      val topic = "iceberg_test"
    
      // Baca data dari kluster Dataflow.
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topic)
        .load()
    
      val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // Dekode string yang disandikan Base64 menjadi string umum.
          "CAST(value AS STRING) AS data")
          .select(
            col("strKey").cast(LongType).alias("id"), // Ubah string tipe STRING menjadi string tipe LONG.
            col("data")
          )
    
      // Tulis data ke tabel Iceberg dalam mode streaming.
      val query = resDF.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", "dlf_catalog.iceberg_db.iceberg_table")
        .option("checkpointLocation", checkpointPath)
        .start()
    
      query.awaitTermination()
    }

    Anda dapat mengubah nilai parameter yang dijelaskan dalam tabel berikut berdasarkan kebutuhan bisnis Anda.

    Parameter

    Deskripsi

    checkpointPath

    Jalur checkpoint data yang ditulis menggunakan Spark Structured Streaming.

    bootstrapServers

    Alamat IP pribadi broker Kafka di kluster Kafka.

    topic

    Nama topik.

  5. Kemas kode, dan sebarkan kode ke kluster EMR.

    1. Setelah Anda men-debug kode di mesin lokal Anda, jalankan perintah berikut untuk mengemas kode:

      mvn clean install
    2. Masuk ke kluster EMR Anda dalam mode SSH. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.

    3. Unggah paket JAR ke kluster EMR.

      Dalam contoh ini, paket JAR diunggah ke direktori root kluster EMR.

  6. Kirim dan jalankan pekerjaan Spark.

    1. Jalankan perintah spark-submit untuk menjalankan pekerjaan Spark:

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \
       --class com.aliyun.iceberg.StructuredSinkIceberg \
       iceberg-demos.jar
      Catatan
      • Ganti <version> dalam kode di atas dengan versi tertentu. spark-sql-kafka harus kompatibel dengan Spark dan Kafka.

      • Dalam contoh ini, paket JAR bernama iceberg-demos.jar digunakan. Anda dapat mengubah nilai parameter --class dan nama paket JAR berdasarkan kebutuhan bisnis Anda.

    2. Gunakan Spark SQL untuk memeriksa perubahan data. Untuk informasi lebih lanjut, lihat Penggunaan Dasar.