This topic describes how to write data to and query data from Hudi.

Write data

In EMR V3.32.0 and later, dependencies related to Hudi are integrated into various open source components, such as Spark, Hive, and Presto. When you run a job to write data to Hudi, you do not need to introduce the dependencies again. You need only to add the following Hudi dependency to the pom.xml file:
<dependency>
   <groupId>org.apache.hudi</groupId>
   <artifactId>hudi-spark_2.11</artifactId>
   <version>0.6.0</version>
  <scope>provided</scope>
</dependency>
Examples:
  • Insert or update data
     val spark = SparkSession
          .builder()
          .master("local[*]")
          .appName("hudi test")
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .getOrCreate()
    
    import spark.implicits._
        val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
          .toDF("id", "name", "price", "version", "dt")
    
        df.write.format("hudi")
          .option(TABLE_NAME, "hudi_test_0")
          // .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) for update
          .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // for insert
          .option(RECORDKEY_FIELD_OPT_KEY, "id")
          .option(PRECOMBINE_FIELD_OPT_KEY, "version")
          .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
          .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
          .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
          .option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
          .option(META_SYNC_ENABLED_OPT_KEY, "true")
          .option(HIVE_USE_JDBC_OPT_KEY, "false")
          .option(HIVE_DATABASE_OPT_KEY, "default")
          .option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
          .option(INSERT_PARALLELISM, "8")
          .option(UPSERT_PARALLELISM, "8")
          .mode(Overwrite)
          .save("/tmp/hudi/h0")
  • Delete data
    df.write.format("hudi")
          .option(TABLE_NAME, "hudi_test_0")
          .option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // for delete
          .option(RECORDKEY_FIELD_OPT_KEY, "id")
          .option(PRECOMBINE_FIELD_OPT_KEY, "version")
          .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
          .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
          .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
          .option(DELETE_PARALLELISM, "8")
          .mode(Append)
          .save("/tmp/hudi/h0")

Query data

EMR allows you to use Spark SQL, Hive, or Presto to query data in a Hudi table.
Scenario Description
Use Spark SQL to query data in a Hudi table To use Spark SQL to query data in a Hudi table, you must specify KryoSerializer in the spark-sql command that is used to access the Spark SQL CLI.
spark-sql --conf'spark.serializer=org.apache.spark.serializer.KryoSerializer'
Use Hive to query data in a Hudi table The hudi-hadoop-mr-bundle package is integrated in EMR Hive. You can query data in a Hudi table without the need to introduce additional dependencies.
Notice If you query data in a Merge on Read table, you must add the following command to the SQL statement and commit them together. If you query data in a Copy on Write table, you can directly execute the SQL statement.
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
Use Presto to query data in a Hudi table EMR Presto is integrated with Hudi. No additional configurations are required when you use Presto to query data in a Hudi table.