すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Iceberg の使用

最終更新日:Feb 05, 2026

Iceberg は、データレイク向けのオープンなテーブルフォーマットです。Hadoop 分散ファイルシステム (HDFS) または Alibaba Cloud Object Storage Service (OSS) 上で、独自のデータレイクストレージサービスを迅速に構築できます。本トピックでは、EMR Serverless Spark を使用して Iceberg テーブルからデータを読み取り、および Iceberg テーブルへデータを書き込む方法について説明します。

前提条件

ワークスペースが作成されました。詳細については、「ワークスペースの作成」をご参照ください。

操作手順

説明

Iceberg テーブルからのデータ読み取りおよび Iceberg テーブルへのデータ書き込みには、Spark SQL ジョブまたはノートブックを使用できます。本トピックでは、Spark SQL ジョブを例として説明します。

ステップ 1:セッションの作成

  1. [セッション] ページに移動します。

    1. EMR コンソール にログインします。

    2. 左側ナビゲーションウィンドウで、EMR Serverless > Spark を選択します。

    3. [Spark] ページ」で、管理するワークスペースの名前をクリックします。

    4. 左側のナビゲーションウィンドウで、EMR Serverless Spark ページの [Operation Center] > [Sessions] を選択します。

  2. SQL セッション」タブで、「SQL セッションの作成」をクリックします。

  3. [SQL セッションの作成] ページで、[Spark 設定] セクションで以下の設定を行い、Create をクリックします。 詳細については、「SQL セッションの管理」をご参照ください。

    Spark はカタログを使用して Iceberg テーブルから読み取りおよび Iceberg テーブルへ書き込みます。ご使用のユースケースに応じて、適切なカタログを選択してください。カタログの詳細については、「データカタログの管理」をご参照ください。

    データカタログの使用

    データカタログを使用する場合、セッション内でパラメーターを設定する必要はありません。代わりに、Catalogs ページに移動し、Add Catalog をクリックします。その後、Spark SQL 開発時に直接データカタログを選択できます。

    説明
    • DLF(旧 DLF 2.5)の Iceberg テーブルにアクセスするには、エンジンバージョン esr-4.7.0、esr-3.6.0 以降を使用してください。

    • DLF-Legacy(旧 DLF 1.0)または Hive Metastore の Iceberg テーブルにアクセスするには、エンジンバージョン esr-4.3.0、esr-3.3.0、esr-2.7.0 以降を使用してください。

    カスタムカタログの使用

    DLF(旧 DLF 2.5)

    説明

    エンジンバージョンは esr-4.7.0、esr-3.6.0 以降である必要があります。

    spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog
    spark.sql.catalog.iceberg_catalog.uri http://<regionID>-vpc.dlf.aliyuncs.com
    spark.sql.catalog.iceberg_catalog.warehouse  <catalog_name>
    spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO
    spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4
    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none
    spark.sql.catalog.iceberg_catalog.rest.signing-region <regionID>
    spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext
    spark.sql.catalog.iceberg_catalog.rest.access-key-id <access_key_id>
    spark.sql.catalog.iceberg_catalog.rest.secret-access-key <access_key_secret>

    以下の表に、各パラメーターの説明を示します。

    パラメーター

    説明

    spark.sql.extensions

    Iceberg Spark 拡張機能を有効化します。

    固定値:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.iceberg_catalog

    iceberg_catalog という名前の Spark カタログを登録します。

    固定値:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.iceberg_catalog.catalog-impl

    Iceberg REST Catalog を基盤となるカタログ実装として指定します。

    org.apache.iceberg.rest.RESTCatalog

    spark.sql.catalog.iceberg_catalog.uri

    DLF Iceberg サービスの REST API エンドポイント。形式:http://<regionID>-vpc.dlf.aliyuncs.com

    http://cn-hangzhou-vpc.dlf.aliyuncs.com

    spark.sql.catalog.iceberg_catalog.warehouse

    関連付けられた DLF カタログの名前を指定します。

    説明

    データ共有によって作成された DLF カタログを関連付けることは推奨されません。

    <catalog_name>

    spark.sql.catalog.iceberg_catalog.io-impl

    DLF 固有の FileIO 実装を使用します。

    固定値:org.apache.iceberg.rest.DlfFileIO

    spark.sql.catalog.iceberg_catalog.rest.auth.type

    REST リクエストの検証に AWS SigV4 署名認証を有効化します。

    sigv4

    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type

    デリゲート認証を無効化します。クライアントが署名用に AccessKey ID および AccessKey Secret を直接提供します。

    none

    spark.sql.catalog.iceberg_catalog.rest.signing-region

    署名に使用するリージョンを指定します。これは DLF サービスが実行されるリージョンと一致している必要があります。

    cn-hangzhou

    spark.sql.catalog.iceberg_catalog.rest.signing-name

    署名に使用するサービス名を指定します。

    固定値:DlfNext

    spark.sql.catalog.iceberg_catalog.rest.access-key-id

    Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID。

    <access_key_id>

    spark.sql.catalog.iceberg_catalog.rest.secret-access-key

    Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

    <access_key_secret>

    DLF-Legacy(旧 DLF 1.0)

    説明

    エンジンバージョンは esr-4.3.0、esr-3.3.0、esr-2.7.0 以降である必要があります。

    メタデータは DLF-Legacy(旧 DLF 1.0)に格納されます。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.catalog-impl  org.apache.iceberg.aliyun.dlf.hive.DlfCatalog
    spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name>

    以下の表に、各パラメーターの説明を示します。

    パラメーター

    説明

    spark.sql.extensions

    Iceberg Spark 拡張機能を有効化します。

    固定値:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    <catalogName> という名前のカタログを登録します。

    固定値:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.catalog-impl

    DLF-Legacy のグローバルメタサービスに直接接続するため、Alibaba Cloud DLF-Legacy 専用の Hive 互換実装を使用します。

    固定値:org.apache.iceberg.aliyun.dlf.hive.DlfCatalog

    spark.sql.catalog.<catalogName>.dlf.catalog.id

    関連付けられた DLF カタログの名前を指定します。

    <catalog_name>

    Hive Metastore

    メタデータは指定された Hive Metastore に格納されます。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.catalog-impl  org.apache.iceberg.hive.HiveCatalog
    spark.sql.catalog.<catalogName>.uri           thrift://<yourHMSUri>:<port>

    以下の表に、各パラメーターの説明を示します。

    パラメーター

    説明

    spark.sql.extensions

    Iceberg Spark 拡張機能を有効化します。

    固定値:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    <catalogName> という名前のカタログを登録します。

    固定値:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.catalog-impl

    Hive Metastore を介して Iceberg テーブルのメタデータを格納および取得するために、Iceberg 公式の HiveCatalog 実装を指定します。

    固定値:org.apache.iceberg.hive.HiveCatalog

    spark.sql.catalog.<catalogName>.uri

    Hive Metastore の URI。形式:thrift://<Hive Metastore の IP アドレス>:9083

    <Hive Metastore の IP アドレス> は Hive Metastore サービスの内部 IP アドレスです。外部 Metastore サービスを指定する方法については、「外部 Hive Metastore サービスへの接続」をご参照ください。

    thrift://192.168.**.**:9083

    ファイルシステム

    メタデータはファイルシステムに格納されます。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.type          hadoop
    spark.sql.catalog.<catalogName>.warehouse     oss://<yourBucketName>/warehouse

    以下の表に、各パラメーターの説明を示します。

    パラメーター

    説明

    spark.sql.extensions

    Iceberg Spark 拡張機能を有効化します。

    固定値:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    <catalogName> という名前のカタログを登録します。

    固定値:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.type

    カタログタイプを hadoop に設定します。これは、Hive Metastore を経由せず、HadoopCatalog を使用してメタデータをファイルシステムに直接格納することを意味します。

    hadoop

    spark.sql.catalog.<catalogName>.warehouse

    メタデータの保存パスを指定します。このコードでは、<yourBucketName> はご利用の OSS バケット名です。

    oss://<yourBucketName>/warehouse

ステップ 2:Iceberg テーブルからの読み取りおよび Iceberg テーブルへの書き込み

  1. SQL 開発ページに移動できます。

    [EMR Serverless Spark] ページで、左側のナビゲーションウィンドウの Data Development をクリックします。

  2. [開発] タブで、image アイコンをクリックします。

  3. [新規] ダイアログボックスで、users_task などの名前を入力し、タイプはデフォルトの [SparkSQL] のままにして、[OK] をクリックします。

  4. 以下のコードを新しい SparkSQL タブ(users_task)にコピーできます。

    説明

    データベースを指定しない場合、テーブルはカタログのデフォルトデータベースに作成されます。別のデータベースを作成して指定することも可能です。

    -- データベースの作成
    CREATE DATABASE IF NOT EXISTS iceberg_catalog.db;
    
    -- パーティションなしテーブルの作成
    CREATE TABLE iceberg_catalog.db.tbl (
        id BIGINT NOT NULL COMMENT '一意の ID',
        data STRING
    )
    USING iceberg;
    
    -- パーティションなしテーブルへのデータ挿入
    INSERT INTO iceberg_catalog.db.tbl VALUES
    (1, 'Alice'),
    (2, 'Bob'),
    (3, 'Charlie');
    
    -- パーティションなしテーブルからの全データのクエリ
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- パーティションなしテーブルからの条件付きデータのクエリ
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
    
    -- パーティションなしテーブルでのデータ更新
    UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
    
    -- 更新の確認
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
    
    -- パーティションなしテーブルからのデータ削除
    DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
    
    -- 削除の確認
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- パーティション付きテーブルの作成
    CREATE TABLE iceberg_catalog.db.part_tbl (
        id BIGINT,
        data STRING,
        category STRING,
        ts TIMESTAMP,
        dt DATE
    )
    USING iceberg
    PARTITIONED BY (dt, category);
    
    -- パーティション付きテーブルへのデータ挿入
    INSERT INTO iceberg_catalog.db.part_tbl VALUES
      (1 , 'data-01', 'A', timestamp'2026-01-01 10:00:00', date'2026-01-01'),
      (2 , 'data-02', 'A', timestamp'2026-01-01 11:00:00', date'2026-01-01'),
      (3 , 'data-03', 'A', timestamp'2026-01-02 09:30:00', date'2026-01-02'),
      (4 , 'data-04', 'B', timestamp'2026-01-02 12:15:00', date'2026-01-02'),
      (5 , 'data-05', 'B', timestamp'2026-01-03 08:05:00', date'2026-01-03'),
      (6 , 'data-06', 'B', timestamp'2026-01-03 14:20:00', date'2026-01-03'),
      (7 , 'data-07', 'C', timestamp'2026-01-04 16:45:00', date'2026-01-04'),
      (8 , 'data-08', 'C', timestamp'2026-01-04 18:10:00', date'2026-01-04'),
      (9 , 'data-09', 'C', timestamp'2026-01-05 07:55:00', date'2026-01-05'),
      (10, 'data-10', 'A', timestamp'2026-01-05 13:35:00', date'2026-01-05');
    
    
    -- パーティション付きテーブルからの全データのクエリ
    SELECT * FROM iceberg_catalog.db.part_tbl;
    
    -- dt='2026-01-01' のデータのクエリ
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01';
    
    -- category = 'A' のデータのクエリ
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
    
    -- 複数条件(dt + category)によるクエリ
    SELECT * FROM iceberg_catalog.db.part_tbl 
    WHERE dt='2026-01-01'
      AND category = 'A';
    
    -- category ごとの集計件数
    SELECT category, COUNT(*) AS count 
    FROM iceberg_catalog.db.part_tbl 
    GROUP BY category;
    
    -- データベースの削除(注意が必要)。削除前にデータベースが空であることを確認してください。
    -- DROP DATABASE iceberg_catalog.db;
  5. 作成した SQL セッションをセッションのドロップダウンリストから選択し、[実行] をクリックします。正常に実行された後、下部に結果が表示されます。image

参考資料