Topik ini menjelaskan cara menulis dan mengambil data dari E-MapReduce (EMR) Hudi.
Menulis data
Konfigurasi lingkungan
Pada EMR V3.32.0 dan versi berikutnya, dependensi terkait Hudi telah diintegrasikan ke dalam komponen open source seperti Spark, Hive, dan Presto. Saat menjalankan pekerjaan untuk menulis data ke Hudi, Anda tidak perlu memperkenalkan kembali dependensi tersebut. Cukup tambahkan dependensi Hudi berikut ke file pom.xml. Versi Hudi yang digunakan bervariasi berdasarkan versi EMR. Tabel berikut menunjukkan pemetaan antara versi Hudi dan versi EMR.
Versi Hudi | Versi EMR |
0.6.0 |
|
0.8.0 |
|
0.9.0 |
|
0.10.0 |
|
0.11.0 | EMR V3.42.0 atau EMR V5.8.0 |
0.12.0 |
|
0.12.2 |
|
0.13.1 |
|
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark_2.11</artifactId>
<!-- for spark3 <artifactId>hudi-spark_2.12</artifactId> -->
<version>${hudi_version}</version>
<scope>provided</scope>
</dependency>Menyisipkan atau memperbarui data
Contoh kode:
val spark = SparkSession
.builder()
.master("local[*]")
.appName("hudi test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
.toDF("id", "name", "price", "version", "dt")
df.write.format("hudi")
.option(TABLE_NAME, "hudi_test_0")
// .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) untuk pembaruan
.option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // untuk penyisipan
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PRECOMBINE_FIELD_OPT_KEY, "version")
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
.option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
.option(META_SYNC_ENABLED_OPT_KEY, "true")
.option(HIVE_USE_JDBC_OPT_KEY, "false")
.option(HIVE_DATABASE_OPT_KEY, "default")
.option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
.option(INSERT_PARALLELISM, "8")
.option(UPSERT_PARALLELISM, "8")
.mode(Overwrite)
.save("/tmp/hudi/h0")Menghapus data
Contoh kode:
df.write.format("hudi")
.option(TABLE_NAME, "hudi_test_0")
.option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // untuk penghapusan
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PRECOMBINE_FIELD_OPT_KEY, "version")
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
.option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(DELETE_PARALLELISM, "8")
.mode(Append)
.save("/tmp/hudi/h0")Mengambil data
Paket perangkat lunak terkait Hudi telah diintegrasikan ke dalam komponen open source seperti Spark, Presto, dan Hive di EMR. Anda dapat mengambil data dari tabel Hudi tanpa perlu memperkenalkan dependensi tambahan.
Jika menggunakan Hive atau Presto untuk mengambil tabel Hudi, aktifkan sinkronisasi metadata pada tahap penulisan dengan mengatur parameter META_SYNC_ENABLED_OPT_KEY menjadi true.
Jika menggunakan Hudi open source, atur parameter hive.input.format menjadi org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat untuk tabel Copy on Write dan Merge on Read. Jika menggunakan EMR Hudi, Anda tidak perlu menentukan format input untuk tabel Copy on Write karena tabel tersebut secara otomatis menyesuaikan diri dengan format input Hudi.