This topic describes the DML statements that are supported after Hudi is integrated with Spark SQL.
Prerequisites
An E-MapReduce (EMR) cluster that contains the Spark and Hudi services is created. For more information, see Create a cluster.
Limits
Only EMR V3.36.0 and later minor versions or EMR V5.2.0 and later minor versions allow you to use Spark SQL to read data from or write data to Hudi.
Open the Spark SQL CLI
Log on to your EMR cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to open the Spark SQL CLI:
spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
If the output contains the following information, the Spark SQL CLI is started:
spark-sql>
MERGE INTO
Inserts, updates, or deletes data based on specific match conditions.
Syntax:
MERGE INTO tableIdentifier AS target_alias USING (sub_query | tableIdentifier) AS source_alias ON <merge_condition> WHEN MATCHED [ AND <condition> ] THEN <matched_action> [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ] [ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ] <merge_condition> =A equal bool condition <matched_action> = DELETE | UPDATE SET * | UPDATE SET column1 = value1 [, column2 = value2 ...] <not_matched_action> = INSERT * | INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
Examples:
-- without delete merge into h0 as target using ( select 1 as id, 'a1' as name, 10.0 as price ) source on target.id = source.id when matched then update set * when not matched then insert *; -- with delete merge into h0 as target using ( select 1 as id, 'a1' as name, 10.0 as price ) source on target.id = source.id when matched then update set id = source.id, name = source.name, price = source.price when matched and name = 'delete' then delete when not matched then insert (id,name,price) values(id, name, price);
INSERT INTO
Inserts data into a partitioned table or a non-partitioned table.
Examples:
Insert data into the non-partitioned table h0.
insert into h0 select 1, 'a1', 20;
Insert data into the h_p0 table that contains static partitions.
insert into h_p0 partition(dt='2021-01-02') select 1, 'a1';
Insert data into the h_p0 table that contains dynamic partitions.
insert into h_p0 partition(dt) select 1, 'a1', dt from s;
Insert data into the h_p1 table that contains dynamic partitions. The partition fields are placed at the end of the select expression.
insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;
Rewrite data in the h0 table.
insert overwrite table h0 select 1, 'a1', 20;
UPDATE
Updates data in one or more columns that correspond to rows in a partitioned table or a non-partitioned table.
Syntax:
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION);
Example: Change the value of the price field whose id is 1 to 20 in the h0 table.
update h0 set price=20 where id=1;
DELETE
Deletes one or more data entries that meet the specified conditions from a partitioned table or a non-partitioned table.
Syntax:
DELETE FROM tableIdentifier [WHERE BOOL_EXPRESSION];
Example: Delete the data entries whose id is greater than 100 from the h0 table.
delete from h0 where id>100;