Topik ini menjelaskan cara menggunakan API DataFrame Spark untuk menulis data ke tabel Iceberg dan membaca data dari tabel dalam mode batch. Dalam topik ini, Spark 3.x digunakan.
Prasyarat
Prosedur
Buat proyek Maven dan tambahkan dependensi ke file model objek proyek (POM) dari proyek.
Tambahkan dependensi Spark dan dependensi Iceberg ke file POM. Dalam kode berikut, dependensi untuk Spark 3.1.1 dan dependensi untuk Iceberg 0.12.0 ditambahkan. Kode dikompilasi menggunakan ruang lingkup dependensi yang disediakan. Kami merekomendasikan Anda menggunakan paket perangkat lunak Iceberg yang berjalan di cluster E-MapReduce (EMR) Anda.
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-core</artifactId> <version>0.12.0</version> <scope>provided</scope> </dependency>CatatanPaket perangkat lunak Iceberg di cluster EMR Anda berbeda dari paket dependensi Iceberg sumber terbuka. Sebagai contoh, katalog Data Lake Formation (DLF) secara otomatis terintegrasi dengan paket perangkat lunak Iceberg di cluster EMR Anda. Kami merekomendasikan Anda menambahkan dependensi Iceberg sumber terbuka menggunakan ruang lingkup dependensi yang disediakan untuk mengkompilasi kode di mesin lokal Anda, mengemas kode, dan kemudian menggunakan dependensi di lingkungan cluster Anda untuk menjalankan kode.
Konfigurasikan katalog.
Sebelum Anda memanggil API Spark untuk melakukan operasi pada tabel Iceberg, tambahkan item konfigurasi yang diperlukan ke objek SparkConf terkait untuk mengonfigurasi katalog.
EMR V3.40 atau versi minor lebih baru, dan EMR V5.6.0 atau lebih baru
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")EMR V3.39.X dan EMR V5.5.X
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")EMR V3.38.X, EMR V5.3.X, dan EMR V5.4.X
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")CatatanAnda harus mengonfigurasi variabel lingkungan sebelum dapat menjalankan kode contoh. Untuk informasi lebih lanjut tentang cara mengonfigurasi variabel lingkungan, lihat bagian Konfigurasikan Variabel Lingkungan dalam topik ini.
Tulis data ke tabel Iceberg.
Jika menggunakan Spark 3.x, Anda dapat menggunakan API DataFrameWriterV2 untuk menulis data ke tabel Iceberg. Kami merekomendasikan agar Anda tidak menggunakan API DataFrameWriterV1. Contoh kode berikut memberikan contoh tentang cara menggunakan API DataFrameWriterV2 untuk menulis data ke tabel Iceberg.
Dalam contoh berikut,
<yourCatalogName>menentukan nama katalog Anda. Anda dapat mengganti <yourCatalogName> dengan nama katalog aktual.Jalankan kode berikut untuk membuat tabel data:
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").create()CatatanAnda dapat menjalankan perintah create, replace, atau createOrReplace untuk membuat tabel data. Anda juga dapat menggunakan metode tableProperty untuk mengonfigurasi properti untuk tabel data dan menggunakan metode partitionedBy untuk mengonfigurasi bidang partisi untuk tabel data.
Anda dapat menjalankan perintah berikut untuk menambahkan dan menimpa data:
Tulis data ke tabel dalam mode tambahan
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").append()Tulis data ke tabel dalam mode penimpaan
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()
Baca data dari tabel.
Pilih metode pembacaan data berdasarkan versi Spark yang Anda gunakan:
Spark 3.x (direkomendasikan)
val df = spark.table("<yourCatalogName>.iceberg_db.sample")Spark 2.4
val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")
Contoh
Dalam contoh ini, API DataFrame Spark dipanggil untuk menulis data ke tabel Iceberg dan membaca data dari tabel dalam mode batch.
Parameter dan nama default katalog bervariasi berdasarkan versi cluster Anda. Dalam contoh ini, DLF digunakan untuk mengelola metadata. Dalam contoh ini, cluster EMR V5.3.0 dan katalog bernama dlf_catalog digunakan. Untuk informasi lebih lanjut, lihat Konfigurasi Metadata DLF.
Gunakan SQL Spark untuk membuat database bernama iceberg_db untuk pengujian Anda. Untuk informasi lebih lanjut, lihat Gunakan Iceberg.
Tulis kode Spark.
Contoh kode dalam Scala:
def main(args: Array[String]): Unit = { // Konfigurasikan parameter untuk katalog. val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>") val spark = SparkSession .builder() .config(sparkConf) .appName("IcebergReadWriteTest") .getOrCreate() // Buat atau ganti tabel Iceberg dalam DataFrame. val firstDF = spark.createDataFrame(Seq( (1, "a"), (2, "b"), (3, "c") )).toDF("id", "data") firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace() // Tulis data dari DataFrame ke tabel Iceberg. val secondDF = spark.createDataFrame(Seq( (4, "d"), (5, "e"), (6, "f") )).toDF("id", "data") secondDF.writeTo("dlf_catalog.iceberg_db.sample").append() // Baca data dari tabel Iceberg. val icebergTable = spark.table("dlf_catalog.iceberg_db.sample") icebergTable.show() }Kemas kode, dan deploy kode ke cluster EMR.
Periksa plug-in Maven yang digunakan untuk mengkompilasi kode dalam Scala. Anda dapat mengonfigurasi plug-in berikut dalam file pom.xml:
<build> <plugins> <!-- the Maven Scala plugin will compile Scala source files --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>Debug kode di mesin lokal Anda dan jalankan perintah berikut untuk mengemas kode:
mvn clean installMasuk ke cluster EMR Anda dalam mode SSH. Untuk informasi lebih lanjut, lihat Masuk ke Cluster.
Unggah paket JAR ke cluster EMR.
Dalam contoh ini, paket JAR diunggah ke direktori root cluster EMR.
Jalankan perintah spark-submit untuk mengirimkan pekerjaan Spark:
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 1 \ --executor-memory 1g \ --num-executors 1 \ --class com.aliyun.iceberg.IcebergTest \ iceberg-demos.jarCatatanDalam contoh ini, paket JAR bernama iceberg-demos.jar digunakan. Anda dapat mengubah nilai parameter --class dan nama paket JAR berdasarkan kebutuhan bisnis Anda.
Hasil berikut dikembalikan:
+---+----+ | id|data| +---+----+ | 4| d| | 1| a| | 5| e| | 6| f| | 2| b| | 3| c| +---+----+