すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:DMLステートメント

最終更新日:Jan 11, 2025

このトピックでは、HudiとSpark SQLの統合後にサポートされるDMLステートメントについて説明します。

前提条件

SparkおよびHudiサービスを含むE-MapReduce(EMR)クラスターが作成されていること。詳細については、「クラスターの作成」をご参照ください。

制限事項

EMR V3.36.0以降のマイナーバージョンのクラスター、およびEMR V5.2.0以降のマイナーバージョンのクラスターのみで、Spark SQLを使用してHudiからデータを読み取ったり、Hudiにデータを書き込んだりできます。

Spark SQLを開始する方法

  • Spark 2またはSpark 3、および 0.11より前のバージョンのHudi
    spark-sql \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
  • Spark 3、およびHudi 0.11以降
    spark-sql \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

MERGE INTO

特定の照合条件に基づいて、データの挿入、更新、または削除を行います。

  • 構文:

    MERGE INTO tableIdentifier AS target_alias
    USING (sub_query | tableIdentifier) AS source_alias
    ON <merge_condition>
     WHEN MATCHED [ AND <condition> ] THEN <matched_action> 
    [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
    [ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]
    
    <merge_condition> = ブール条件と等しい
    <matched_action>  =
      DELETE  |
      UPDATE SET *  |
      UPDATE SET column1 = value1 [, column2 = value2 ...]
    <not_matched_action>  =
      INSERT *  |
      INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  • 例:

    -- 削除なし
    merge into h0 as target
    using (
      select 1 as id, 'a1' as name, 10.0 as price
    ) source
    on target.id = source.id
    when matched then update set *
    when not matched then insert *;
    -- 削除あり
    merge into h0 as target
    using (
      select 1 as id, 'a1' as name, 10.0 as price
    ) source
    on target.id = source.id
    when matched then update set id = source.id, name = source.name, price = source.price
    when matched and name = 'delete' then delete
    when not matched then insert (id,name,price) values(id, name, price);

INSERT INTO

パーティションテーブルまたは非パーティションテーブルにデータを挿入します。

例:

  • 非パーティションテーブル h0 にデータを挿入します。

    insert into h0 select 1, 'a1', 20;
  • 静的パーティションを含むテーブル h_p0 にデータを挿入します。

    insert into h_p0 partition(dt='2021-01-02') select 1, 'a1';
  • 動的パーティションを含むテーブル h_p0 にデータを挿入します。

    insert into h_p0 partition(dt) select 1, 'a1', dt from s;
  • 動的パーティションを含むテーブル h_p1 にデータを挿入します。パーティションフィールドはselect式の最後に配置されます。

    insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;
  • テーブル h0 のデータを上書きします。

    insert overwrite table h0 select 1, 'a1', 20;

UPDATE

パーティションテーブルまたは非パーティションテーブルの行に対応する1つ以上の列のデータを更新します。

  • 構文:

    UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION);
  • 例: テーブル h0 内で、id が 1 である price フィールドの値を 20 に変更します。

    update h0 set price=20 where id=1;

DELETE

パーティションテーブルまたは非パーティションテーブルから、指定された条件を満たす1つ以上のデータエントリを削除します。

  • 構文:

    DELETE FROM tableIdentifier [WHERE BOOL_EXPRESSION];
  • 例: テーブル h0 から、id が 100 より大きいデータエントリを削除します。

    delete from h0 where id>100;