全部产品
Search
文档中心

E-MapReduce:Gunakan Spark untuk menulis data ke tabel Iceberg dan membaca data dari tabel dalam mode batch

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menggunakan API DataFrame Spark untuk menulis data ke tabel Iceberg dan membaca data dari tabel dalam mode batch. Dalam topik ini, Spark 3.x digunakan.

Prasyarat

Cluster EMR Hadoop telah dibuat. Untuk informasi lebih lanjut, lihat Buat Cluster.
Catatan Topik ini hanya berlaku untuk cluster Hadoop EMR V3.38.0, EMR V5.4.0, atau versi minor setelah EMR V3.38.0 atau EMR V5.4.0.

Prosedur

  1. Buat proyek Maven dan tambahkan dependensi ke file model objek proyek (POM) dari proyek.

    Tambahkan dependensi Spark dan dependensi Iceberg ke file POM. Dalam kode berikut, dependensi untuk Spark 3.1.1 dan dependensi untuk Iceberg 0.12.0 ditambahkan. Kode dikompilasi menggunakan ruang lingkup dependensi yang disediakan. Kami merekomendasikan Anda menggunakan paket perangkat lunak Iceberg yang berjalan di cluster E-MapReduce (EMR) Anda.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>0.12.0</version>
        <scope>provided</scope>
    </dependency>
    Catatan

    Paket perangkat lunak Iceberg di cluster EMR Anda berbeda dari paket dependensi Iceberg sumber terbuka. Sebagai contoh, katalog Data Lake Formation (DLF) secara otomatis terintegrasi dengan paket perangkat lunak Iceberg di cluster EMR Anda. Kami merekomendasikan Anda menambahkan dependensi Iceberg sumber terbuka menggunakan ruang lingkup dependensi yang disediakan untuk mengkompilasi kode di mesin lokal Anda, mengemas kode, dan kemudian menggunakan dependensi di lingkungan cluster Anda untuk menjalankan kode.

  2. Konfigurasikan katalog.

    Sebelum Anda memanggil API Spark untuk melakukan operasi pada tabel Iceberg, tambahkan item konfigurasi yang diperlukan ke objek SparkConf terkait untuk mengonfigurasi katalog.

    • EMR V3.40 atau versi minor lebih baru, dan EMR V5.6.0 atau lebih baru

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
    • EMR V3.39.X dan EMR V5.5.X

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")
    • EMR V3.38.X, EMR V5.3.X, dan EMR V5.4.X

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
      Catatan

      Anda harus mengonfigurasi variabel lingkungan sebelum dapat menjalankan kode contoh. Untuk informasi lebih lanjut tentang cara mengonfigurasi variabel lingkungan, lihat bagian Konfigurasikan Variabel Lingkungan dalam topik ini.

  3. Tulis data ke tabel Iceberg.

    Jika menggunakan Spark 3.x, Anda dapat menggunakan API DataFrameWriterV2 untuk menulis data ke tabel Iceberg. Kami merekomendasikan agar Anda tidak menggunakan API DataFrameWriterV1. Contoh kode berikut memberikan contoh tentang cara menggunakan API DataFrameWriterV2 untuk menulis data ke tabel Iceberg.

    Dalam contoh berikut, <yourCatalogName> menentukan nama katalog Anda. Anda dapat mengganti <yourCatalogName> dengan nama katalog aktual.

    Jalankan kode berikut untuk membuat tabel data:

    val df: DataFrame = ...
    df.writeTo("<yourCatalogName>.iceberg_db.sample").create()
    Catatan

    Anda dapat menjalankan perintah create, replace, atau createOrReplace untuk membuat tabel data. Anda juga dapat menggunakan metode tableProperty untuk mengonfigurasi properti untuk tabel data dan menggunakan metode partitionedBy untuk mengonfigurasi bidang partisi untuk tabel data.

    Anda dapat menjalankan perintah berikut untuk menambahkan dan menimpa data:

    • Tulis data ke tabel dalam mode tambahan

      val df: DataFrame = ...
      df.writeTo("<yourCatalogName>.iceberg_db.sample").append()
    • Tulis data ke tabel dalam mode penimpaan

      val df: DataFrame = ...
      df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()
  4. Baca data dari tabel.

    Pilih metode pembacaan data berdasarkan versi Spark yang Anda gunakan:

    • Spark 3.x (direkomendasikan)

       val df = spark.table("<yourCatalogName>.iceberg_db.sample")
    • Spark 2.4

      val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")

Contoh

Dalam contoh ini, API DataFrame Spark dipanggil untuk menulis data ke tabel Iceberg dan membaca data dari tabel dalam mode batch.

Penting

Parameter dan nama default katalog bervariasi berdasarkan versi cluster Anda. Dalam contoh ini, DLF digunakan untuk mengelola metadata. Dalam contoh ini, cluster EMR V5.3.0 dan katalog bernama dlf_catalog digunakan. Untuk informasi lebih lanjut, lihat Konfigurasi Metadata DLF.

  1. Gunakan SQL Spark untuk membuat database bernama iceberg_db untuk pengujian Anda. Untuk informasi lebih lanjut, lihat Gunakan Iceberg.

  2. Tulis kode Spark.

    Contoh kode dalam Scala:

    def main(args: Array[String]): Unit = {
    
      // Konfigurasikan parameter untuk katalog. 
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergReadWriteTest")
      .getOrCreate()
    
      // Buat atau ganti tabel Iceberg dalam DataFrame.
      val firstDF = spark.createDataFrame(Seq(
      (1, "a"), (2, "b"), (3, "c")
      )).toDF("id", "data")
    
      firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace()
    
      // Tulis data dari DataFrame ke tabel Iceberg.
      val secondDF = spark.createDataFrame(Seq(
      (4, "d"), (5, "e"), (6, "f")
      )).toDF("id", "data")
    
      secondDF.writeTo("dlf_catalog.iceberg_db.sample").append()
    
      // Baca data dari tabel Iceberg.
      val icebergTable = spark.table("dlf_catalog.iceberg_db.sample")
    
      icebergTable.show()
    }
  3. Kemas kode, dan deploy kode ke cluster EMR.

    1. Periksa plug-in Maven yang digunakan untuk mengkompilasi kode dalam Scala. Anda dapat mengonfigurasi plug-in berikut dalam file pom.xml:

      <build>
          <plugins>
              <!-- the Maven Scala plugin will compile Scala source files -->
              <plugin>
                  <groupId>net.alchim31.maven</groupId>
                  <artifactId>scala-maven-plugin</artifactId>
                  <version>3.2.2</version>
                  <executions>
                      <execution>
                          <goals>
                              <goal>compile</goal>
                              <goal>testCompile</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
    2. Debug kode di mesin lokal Anda dan jalankan perintah berikut untuk mengemas kode:

      mvn clean install
    3. Masuk ke cluster EMR Anda dalam mode SSH. Untuk informasi lebih lanjut, lihat Masuk ke Cluster.

    4. Unggah paket JAR ke cluster EMR.

      Dalam contoh ini, paket JAR diunggah ke direktori root cluster EMR.

  4. Jalankan perintah spark-submit untuk mengirimkan pekerjaan Spark:

    spark-submit \
     --master yarn \
     --deploy-mode cluster \
     --driver-memory 1g \
     --executor-cores 1 \
     --executor-memory 1g \
     --num-executors 1 \
     --class com.aliyun.iceberg.IcebergTest \
     iceberg-demos.jar
    Catatan

    Dalam contoh ini, paket JAR bernama iceberg-demos.jar digunakan. Anda dapat mengubah nilai parameter --class dan nama paket JAR berdasarkan kebutuhan bisnis Anda.

    Hasil berikut dikembalikan:

    +---+----+
    | id|data|
    +---+----+
    |  4|   d|
    |  1|   a|
    |  5|   e|
    |  6|   f|
    |  2|   b|
    |  3|   c|
    +---+----+