すべてのプロダクト
Search
ドキュメントセンター

Data Lake Formation:EMR Serverless Spark が Iceberg REST を使用して DLF カタログにアクセス

最終更新日:Mar 01, 2026

このトピックでは、Iceberg REST を使用して EMR Serverless Spark から Data Lake Formation (DLF) カタログにアクセスする方法について説明します。

前提条件

DLF インスタンスと同じリージョンに Serverless Spark ワークスペースを作成済みであること。詳細については、「ワークスペースの作成」をご参照ください。

説明

Resource Access Management (RAM) ユーザーの場合、データ操作を実行する前に、必要なリソース権限を付与する必要があります。詳細については、「データ権限付与の管理」をご参照ください。

制限事項

サポートされているタスクタイプは次のとおりです:

ステップ 1:カタログ権限の付与

  1. [Catalogs] ページで、カタログの名前をクリックして詳細ページに移動します。

  2. カタログ全体に権限を付与するには、[Permissions] タブをクリックします。または、特定のデータベースまたはテーブルに移動し、その [Permissions] タブをクリックしてアクセス権を付与することもできます。

  3. 権限付与ページで、次の設定を行い、[OK] をクリックします。

    • [User/Role][RAM User/RAM Role] を選択します。

    • [認証オブジェクトの選択]: ドロップダウンリストから AliyunECSInstanceForEMRRole を選択します。

      説明

      ドロップダウンリストに [AliyunECSInstanceForEMRRole] が表示されない場合は、ユーザー管理ページに移動して [Sync] をクリックしてください。

    • プリセット権限タイプ: 読み取り権限を手動で選択するか、 Data Reader や Data Editor などの事前定義されたロールを使用します。

ステップ 2:データの読み書き

  1. カタログに接続します。

    SQL セッションを作成できます。詳細については、「SQL セッションの管理」をご参照ください。エンジンバージョン esr-4.7.0、esr-3.6.0、またはそれ以降を使用できます。

    データカタログの使用

    データカタログを使用する場合、セッションでパラメーターを設定する必要はありません。[データカタログ] ページに移動し、[データカタログの追加] をクリックします。その後、SparkSQL 開発でデータカタログを直接選択できます。

    カスタムカタログの使用

    [カスタム設定] の [Spark 設定] セクションで、次の設定を追加できます。

    重要

    以下の設定例では、iceberg_catalog はカスタムカタログ名です。これは、Iceberg REST カタログに基づいて、Spark に Iceberg テーブル管理サービスを登録します。このカタログは、REST API を使用して Alibaba Cloud DLF に接続します。必要に応じて、カタログ名と関連パラメーターを変更できます。

    • ${regionID}:これを実際のリージョン ID (例:cn-hangzhou) に置き換えることができます。詳細については、「エンドポイント」をご参照ください。

    • ${catalogName}:これを実際の DLF カタログ名に置き換えることができます。

    • ${access_key_id}${access_key_secret}:これらを Alibaba Cloud アカウントの AccessKey ID とシークレットに置き換えることができます。

    # Iceberg Spark 拡張機能を有効化
    spark.sql.extensions                                org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    # iceberg_catalog カタログを登録
    spark.sql.catalog.iceberg_catalog                   org.apache.iceberg.spark.SparkCatalog
    # 基盤となるカタログ実装は Iceberg REST カタログ
    spark.sql.catalog.iceberg_catalog.catalog-impl      org.apache.iceberg.rest.RESTCatalog
    # DLF Iceberg サービスの REST API エンドポイント
    spark.sql.catalog.iceberg_catalog.uri               http://${regionID}-vpc.dlf.aliyuncs.com/iceberg
    # DLF カタログ名を指定
    spark.sql.catalog.iceberg_catalog.warehouse         ${catalogName}
    # カスタム DLF FileIO 実装を使用
    spark.sql.catalog.iceberg_catalog.io-impl           org.apache.iceberg.rest.DlfFileIO
    # SigV4 署名認証を有効化
    spark.sql.catalog.iceberg_catalog.rest.auth.type    sigv4
    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type  none
    spark.sql.catalog.iceberg_catalog.rest.signing-region  ${regionID}
    spark.sql.catalog.iceberg_catalog.rest.signing-name  DlfNext
    # アクセス認証情報
    spark.sql.catalog.iceberg_catalog.rest.access-key-id ${access_key_id}
    spark.sql.catalog.iceberg_catalog.rest.secret-access-key ${access_key_secret}
  2. データの読み書きを行います。

    SQL ジョブ開発の完全な例については、「SparkSQL 開発のクイックスタート」をご参照ください。

    説明

    データベースが指定されていない場合、テーブルはカタログの default データベースに作成されます。別のデータベースを作成して指定することもできます。

    -- データベースの作成
    CREATE DATABASE IF NOT EXISTS db;
    
    -- 非パーティションテーブルの作成
    CREATE TABLE iceberg_catalog.db.tbl (
        id BIGINT NOT NULL COMMENT 'unique id',
        data STRING
    )
    USING iceberg;
    
    -- 非パーティションテーブルへのデータ挿入
    INSERT INTO iceberg_catalog.db.tbl VALUES
    (1, 'Alice'),
    (2, 'Bob'),
    (3, 'Charlie');
    
    -- フルテーブルクエリ
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 条件付きクエリ
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
    
    -- データの更新
    UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
    
    -- テーブルを再度クエリして更新を確認
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
    
    -- データの削除
    DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
    
    -- テーブルを再度クエリして削除を確認
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- パーティションテーブルの作成
    CREATE TABLE iceberg_catalog.db.part_tbl (
        id BIGINT,
        data STRING,
        category STRING,
        ts TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (category);
    
    -- データの挿入
    INSERT INTO iceberg_catalog.db.part_tbl VALUES
    (100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')),
    (200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')),
    (300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')),
    (400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00'));
    
    
    -- フルテーブルクエリ
    SELECT * FROM iceberg_catalog.db.part_tbl;
    
    -- 条件付きクエリ
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0;
    
    -- 条件付きクエリ
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01';
    
    -- 条件付きクエリ
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
    
    -- 複合条件クエリ (バケット + 日付 + カテゴリ)
    SELECT * FROM iceberg_catalog.db.part_tbl 
    WHERE bucket(16, id) = 0 
      AND days(ts) = '2025-01-01'
      AND category = 'A';
    
    -- 各カテゴリのデータエントリ数を集計してカウント
    SELECT category, COUNT(*) AS count 
    FROM iceberg_catalog.db.part_tbl 
    GROUP BY category;
    
    -- データベースの削除。内部のすべてのテーブルが空であることを確認してください。
    -- DROP DATABASE iceberg_catalog.db;