全部產品
Search
文件中心

Data Lake Formation:EMR Serverless Spark以Iceberg REST訪問DLF Catalog

更新時間:Mar 01, 2026

本文為您介紹在EMR Serverless Spark叢集中如何以Iceberg REST對接 DLF Catalog。

前提條件

已建立與DLF執行個體同地區的Serverless Spark工作空間,詳情請參見建立工作空間

說明

如果是RAM使用者,在進行資料操作之前,需要先授予相應的資源許可權。詳情請參見資料授權管理

使用限制

目前支援以下類型的任務:

步驟一:Catalog授權

  1. 登入資料湖構建控制台

  2. Catalogs列表頁面,單擊Catalog名稱,進入Catalog詳情頁。

  3. 如果要授予整個 catalog 許可權,則直接單擊許可權。否則點擊進入對應資料庫或者表,再點擊許可權目錄,進行許可權授予。

  4. 在授權頁面,配置以下資訊,單擊確定

    • 使用者/角色:選擇RAM使用者/RAM角色

    • 選擇授權對象:在下拉式清單中選擇AliyunECSInstanceForEMRRole

      說明

      如果使用者下拉式清單中未找到AliyunECSInstanceForEMRRole,可以在使用者管理頁面單擊同步。

    • 預置權限類別型:可自訂選擇讀取許可權,或者直接使用 Data Reader/Data Editor。

步驟二:讀寫資料

  1. 串連Catalog

    建立SQL會話,請參見管理SQL會話。引擎版本請使用esr-4.7.0、esr-3.6.0及以上版本。

    使用資料目錄Catalog

    採用資料目錄Catalog方式,則無需在會話中配置參數,只需在資料目錄頁面單擊添加資料目錄,然後在SparkSQL開發中直接選擇資料目錄即可。

    使用自訂Catalog

    在自訂配置下的Spark 配置中添加以下配置。

    重要

    以下配置樣本中,iceberg_catalog為自訂的 Catalog 名稱,用於在 Spark 中註冊一個基於 Iceberg REST Catalog 的 Iceberg 表管理服務。該Catalog 通過 REST API 與阿里雲 DLF服務對接,您可根據實際環境修改 Catalog 名稱及相關參數。

    • ${regionID}:替換為實際的地區ID,例如 cn-hangzhou,請參見服務存取點

    • ${catalogName}:替換為在DLF中建立的Catalog 名稱。

    • ${access_key_id}${access_key_secret}:阿里雲賬戶AK/SK。

    # 啟用Iceberg Spark擴充
    spark.sql.extensions                                org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    # 註冊名為 iceberg_catalog 的 Spark Catalog
    spark.sql.catalog.iceberg_catalog                   org.apache.iceberg.spark.SparkCatalog
    # 指定底層Catalog實現為Iceberg REST Catalog
    spark.sql.catalog.iceberg_catalog.catalog-impl      org.apache.iceberg.rest.RESTCatalog
    # DLF Iceberg 服務的 REST API 地址(需替換 ${regionID} 為實際地區 ID)
    spark.sql.catalog.iceberg_catalog.uri               http://${regionID}-vpc.dlf.aliyuncs.com/iceberg
    # 指定關聯的 DLF Catalog 名稱
    spark.sql.catalog.iceberg_catalog.warehouse         ${catalogName}
    # 使用 DLF 定製的 FileIO 實現
    spark.sql.catalog.iceberg_catalog.io-impl           org.apache.iceberg.rest.DlfFileIO
    # 啟用 SigV4 簽名認證
    spark.sql.catalog.iceberg_catalog.rest.auth.type    sigv4
    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type  none
    spark.sql.catalog.iceberg_catalog.rest.signing-region  ${regionID}
    spark.sql.catalog.iceberg_catalog.rest.signing-name  DlfNext
    # 訪問憑證
    spark.sql.catalog.iceberg_catalog.rest.access-key-id ${access_key_id}
    spark.sql.catalog.iceberg_catalog.rest.secret-access-key ${access_key_secret}
  2. 讀寫資料

    SQL任務完整的開發流程樣本,請參見SparkSQL開發快速入門

    說明

    不指定資料庫時,建立資料表會預設建在Catalog下的default資料庫中,也可建立並指定其他資料庫。

    -- 建立資料庫
    CREATE DATABASE IF NOT EXISTS db;
    
    -- 建立非分區表
    CREATE TABLE iceberg_catalog.db.tbl (
        id BIGINT NOT NULL COMMENT 'unique id',
        data STRING
    )
    USING iceberg;
    
    -- 插入非分區表資料
    INSERT INTO iceberg_catalog.db.tbl VALUES
    (1, 'Alice'),
    (2, 'Bob'),
    (3, 'Charlie');
    
    -- 查詢非分區表所有資料
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 按條件查詢非分區表
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
    
    -- 更新非分區表資料
    UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
    
    -- 重新查詢確認更新
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
    
    -- 刪除非分區表資料
    DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
    
    -- 重新查詢確認刪除
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 建立分區表
    CREATE TABLE iceberg_catalog.db.part_tbl (
        id BIGINT,
        data STRING,
        category STRING,
        ts TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (category);
    
    -- 插入分區表資料
    INSERT INTO iceberg_catalog.db.part_tbl VALUES
    (100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')),
    (200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')),
    (300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')),
    (400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00'));
    
    
    -- 查詢分區表所有資料
    SELECT * FROM iceberg_catalog.db.part_tbl;
    
    -- 查詢 bucket(16, id) = 0 的資料
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0;
    
    -- 查詢 day(ts) = '2025-01-01' 的資料
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01';
    
    -- 查詢某個 category 的資料
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
    
    -- 多條件組合查詢(bucket + day + category)
    SELECT * FROM iceberg_catalog.db.part_tbl 
    WHERE bucket(16, id) = 0 
      AND days(ts) = '2025-01-01'
      AND category = 'A';
    
    -- 彙總統計每個分類的資料數量
    SELECT category, COUNT(*) AS count 
    FROM iceberg_catalog.db.part_tbl 
    GROUP BY category;
    
    -- 刪除資料庫(謹慎操作),刪除前需確保該db下tbl為空白
    -- DROP DATABASE iceberg_catalog.db;