すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:基本的な使用方法

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)Hudiへのデータの書き込み方法とデータのクエリ方法について説明します。

データの書き込み

環境設定

EMR V3.32.0以降では、Hudiに関連する依存関係は、Spark、Hive、Prestoなどのさまざまなオープンソースコンポーネントに統合されています。Hudiにデータを書き込むジョブを実行する場合、依存関係を再度導入する必要はありません。次のHudiの依存関係を pom.xml ファイルに追加するだけで済みます。 使用されるHudiのバージョンは、EMRのバージョンによって異なります。次の表に、HudiのバージョンとEMRのバージョンのマッピングを示します。

Hudiバージョン

EMRバージョン

0.6.0

  • EMR V3.32.0~EMR V3.35.0

  • EMR V4.5.0~EMR V4.9.0

  • EMR V5.1.0

0.8.0

  • EMR V3.36.1~EMR V3.37.1

  • EMR V5.2.1~EMR V5.3.1

0.9.0

  • EMR V3.38.0~EMR V3.38.3

  • EMR V5.4.0~EMR V5.4.3

0.10.0

  • EMR V3.39.1~EMR V3.40.0

  • EMR V4.10.0

  • EMR V5.5.0~EMR V5.6.0

0.11.0

EMR V3.42.0またはEMR V5.8.0

0.12.0

  • EMR V5.9.0~EMR V5.10.1

  • EMR V3.43.0~EMR V3.44.1

0.12.2

  • EMR V5.11.0~EMR V5.12.1

  • EMR V3.45.0~EMR V3.46.1

0.13.1

  • EMR V5.13.0またはEMR V5.14.0

  • EMR V3.47.0またはEMR V3.48.0

<dependency>
   <groupId>org.apache.hudi</groupId>
   <artifactId>hudi-spark_2.11</artifactId>
   <!-- Spark3向け <artifactId>hudi-spark_2.12</artifactId> -->
   <version>${hudi_version}</version>
  <scope>provided</scope>
</dependency>

データの挿入または更新

サンプルコード:

 val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("hudi test")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

import spark.implicits._
    val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
      .toDF("id", "name", "price", "version", "dt")

    df.write.format("hudi")
      .option(TABLE_NAME, "hudi_test_0")
      // .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) 更新の場合
      .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // 挿入の場合
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
      .option(META_SYNC_ENABLED_OPT_KEY, "true")
      .option(HIVE_USE_JDBC_OPT_KEY, "false")
      .option(HIVE_DATABASE_OPT_KEY, "default")
      .option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
      .option(INSERT_PARALLELISM, "8")
      .option(UPSERT_PARALLELISM, "8")
      .mode(Overwrite)
      .save("/tmp/hudi/h0")

データの削除

サンプルコード:

df.write.format("hudi")
      .option(TABLE_NAME, "hudi_test_0")
      .option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // 削除の場合
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(DELETE_PARALLELISM, "8")
      .mode(Append)
      .save("/tmp/hudi/h0")

データのクエリ

Hudi に関連するソフトウェアパッケージは、EMR の Spark、Presto、Hive などのさまざまなオープンソースコンポーネントに統合されています。追加の依存関係を導入することなく、Hudi テーブルのデータをクエリできます。

Hive または Presto を使用して Hudi テーブルをクエリする場合は、書き込みステージで META_SYNC_ENABLED_OPT_KEY パラメーターを true に設定して、メタデータの同期を有効にする必要があります。

オープンソースの Hudi を使用する場合は、Copy on Write テーブルと Merge on Read テーブルの両方で、hive.input.format パラメーターを org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat に設定する必要があります。 EMR Hudi を使用する場合は、Copy on Write テーブルの入力形式を指定する必要はありません。Copy on Write テーブルは、Hudi の入力形式に自動的に適応します。