This topic describes the DML statements that are supported after Hudi is integrated with Spark SQL.

Prerequisites

An EMR Hadoop cluster is created. For more information, see Create a cluster.

Limits

Only EMR V3.36.0 and later minor versions or EMR V5.2.0 and later minor versions allow you to use Spark SQL to read data from or write data to Hudi.

Open the Spark SQL CLI

  1. Log on to your EMR cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following command to open the Spark SQL CLI:
    spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    If the output contains the following information, the Spark SQL CLI is opened:
    spark-sql>

MERGE INTO

Inserts, updates, or deletes data.

  • Syntax:
    MERGE INTO tableIdentifier AS target_alias
    USING (sub_query | tableIdentifier) AS source_alias
    ON <merge_condition>
     WHEN MATCHED [ AND <condition> ] THEN <matched_action> 
    [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
    [ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]
    
    <merge_condition> =A equal bool condition 
    <matched_action>  =
      DELETE  |
      UPDATE SET *  |
      UPDATE SET column1 = value1 [, column2 = value2 ...]
    <not_matched_action>  =
      INSERT *  |
      INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  • Examples:
    -- without delete
    merge into h0 as target
    using (
      select 1 as id, 'a1' as name, 10.0 as price
    ) source
    on target.id = source.id
    when matched then update set *
    when not matched then insert *;
    -- with delete
    merge into h0 as target
    using (
      select 1 as id, 'a1' as name, 10.0 as price
    ) source
    on target.id = source.id
    when matched then update set id = source.id, name = source.name, price = source.price
    when matched and name = 'delete' then delete
    when not matched then insert (id,name,price) values(id, name, price);

INSERT INTO

Inserts data into a partitioned table or a non-partitioned table.

Examples:
  • Insert data into the non-partitioned table h0.
    insert into h0 select 1, 'a1', 20;
  • Insert data into the h_p0 table that contains static partitions.
    insert into h_p0 partition(dt='2021-01-02') select 1, 'a1';
  • Insert data into the h_p0 table that contains dynamic partitions.
    insert into h_p0 partition(dt) select 1, 'a1', dt from s;
  • Insert data into the h_p1 table that contains dynamic partitions. The partition fields are placed at the end of the SELECT expression.
    insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;
  • Rewrite data in the h0 table.
    insert overwrite table h0 select 1, 'a1', 20;

UPDATE

Updates data in one or more columns that correspond to rows in a partitioned table or a non-partitioned table.

  • Syntax:
    UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION);
  • Example: Change the value of the price field whose id is 1 to 20 in the h0 table.
    update h0 set price=20 where id=1;

DELETE

Deletes one or more data entries that meet the specified conditions from a partitioned table or a non-partitioned table.

  • Syntax:
    DELETE FROM tableIdentifier [WHERE BOOL_EXPRESSION];
  • Example: Delete the data entries whose id is greater than 100 from the h0 table.
    delete from h0 where id>100;