本文为您介绍如何在Hudi中写数据以及查询数据。

写数据

EMR-3.32.0以及后续版本中,已经将Hudi相关依赖集成到各个开源组件中,包括Spark、Hive和Presto,因此运行时不需要引入额外的Hudi依赖,只需要在pom文件中添加Hudi依赖即可。
<dependency>
   <groupId>org.apache.hudi</groupId>
   <artifactId>hudi-spark_2.11</artifactId>
   <version>0.6.0</version>
  <scope>provided</scope>
</dependency>
写数据示例如下:
  • 插入或更新数据
     val spark = SparkSession
          .builder()
          .master("local[*]")
          .appName("hudi test")
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .getOrCreate()
    
    import spark.implicits._
        val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
          .toDF("id", "name", "price", "version", "dt")
    
        df.write.format("hudi")
          .option(TABLE_NAME, "hudi_test_0")
          // .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) for update
          .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // for insert
          .option(RECORDKEY_FIELD_OPT_KEY, "id")
          .option(PRECOMBINE_FIELD_OPT_KEY, "version")
          .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
          .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
          .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
          .option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
          .option(META_SYNC_ENABLED_OPT_KEY, "true")
          .option(HIVE_USE_JDBC_OPT_KEY, "false")
          .option(HIVE_DATABASE_OPT_KEY, "default")
          .option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
          .option(INSERT_PARALLELISM, "8")
          .option(UPSERT_PARALLELISM, "8")
          .mode(Overwrite)
          .save("/tmp/hudi/h0")
  • 删除数据
    df.write.format("hudi")
          .option(TABLE_NAME, "hudi_test_0")
          .option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // for delete
          .option(RECORDKEY_FIELD_OPT_KEY, "id")
          .option(PRECOMBINE_FIELD_OPT_KEY, "version")
          .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
          .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
          .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
          .option(DELETE_PARALLELISM, "8")
          .mode(Append)
          .save("/tmp/hudi/h0")

查询数据

EMR支持Spark SQL、Hive和Presto查询Hudi表。
场景 描述
Spark SQL查询Hudi 启动spark-sql命令时,需要加上kryo序列化参数,即可通过spark-sql查询Hudi表数据。
spark-sql --conf'spark.serializer=org.apache.spark.serializer.KryoSerializer'

Spark SQL查询Hudi数据的详情,请参见Hudi与Spark SQL集成

Hive查询Hudi 因为EMR Hive已经集成Hudi的hudi-hadoop-mr-bundle包,所以您无需额外引入相关依赖,即可查询Hudi表数据。
注意 查询MOR表时,需要设置以下参数,COW表不需要设置任何参数。
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
Presto查询Hudi 因为EMR Presto已与Hudi集成,所以您无需进行额外的配置,即可查询Hudi表数据。