全部產品
Search
文件中心

E-MapReduce:使用Iceberg

更新時間:Mar 10, 2026

Iceberg是一種開放的資料湖表格式。您可以藉助Iceberg快速地在HDFS或者阿里雲OSS上構建自己的資料湖儲存服務。本文為您介紹如何在EMR Serverless Spark中實現Iceberg表的讀取與寫入操作。

前提條件

已建立工作空間,詳情請參見建立工作空間

操作流程

說明

SparkSQL與Notebook均支援對Iceberg表的讀寫操作。本文將以SparkSQL任務為例進行介紹。

步驟一:建立會話資源

  1. 進入會話管理頁面。

    1. 登入E-MapReduce控制台

    2. 在左側導覽列,選擇EMR Serverless > Spark

    3. Spark頁面,單擊目標工作空間名稱。

    4. EMR Serverless Spark頁面,單擊左側導覽列中的會話管理

  2. SQL會話頁面,單擊建立SQL會話

  3. 建立SQL會話頁面的Spark配置地區,配置以下資訊,單擊创建。詳情請參見管理SQL會話

    Spark對Iceberg的讀寫基於Catalog,您可以根據具體情境進行選擇。更多Catalog資訊,請參見管理資料目錄

    使用資料目錄Catalog

    若採用資料目錄Catalog方式,則無需在會話中配置參數,只需在数据目录頁面單擊添加数据目录,然後在SparkSQL開發中直接選擇資料目錄即可。

    說明
    • 如果要訪問DLF(原DLF2.5)中的Iceberg,引擎版本請使用esr-4.7.0、esr-3.6.0及以上版本。

    • 如果要訪問DLF-Legacy(原DLF1.0)或Hive MetaStore中的Iceberg,引擎版本推薦使用esr-4.3.0、esr-3.3.0、esr-2.7.0及以上版本。

    使用自訂Catalog

    DLF(原DLF 2.5)

    說明

    引擎版本要求esr-4.7.0、esr-3.6.0及以上版本。

    spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog
    spark.sql.catalog.iceberg_catalog.uri http://<regionID>-vpc.dlf.aliyuncs.com
    spark.sql.catalog.iceberg_catalog.warehouse  <catalog_name>
    spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO
    spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4
    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none
    spark.sql.catalog.iceberg_catalog.rest.signing-region <regionID>
    spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext
    spark.sql.catalog.iceberg_catalog.rest.access-key-id <access_key_id>
    spark.sql.catalog.iceberg_catalog.rest.secret-access-key <access_key_secret>

    涉及參數如下所示。

    參數

    說明

    樣本值

    spark.sql.extensions

    啟用Iceberg Spark擴充。

    固定值:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.iceberg_catalog

    註冊名為 iceberg_catalog 的 Spark Catalog。

    固定值:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.iceberg_catalog.catalog-impl

    指定底層Catalog實現為Iceberg REST Catalog

    org.apache.iceberg.rest.RESTCatalog

    spark.sql.catalog.iceberg_catalog.uri

    DLF Iceberg 服務的 REST API 地址,格式為http://<regionID>-vpc.dlf.aliyuncs.com

    http://cn-hangzhou-vpc.dlf.aliyuncs.com

    spark.sql.catalog.iceberg_catalog.warehouse

    指定關聯的 DLF Catalog 名稱。

    說明

    不建議關聯資料共用建立的DLF Catalog。

    <catalog_name>

    spark.sql.catalog.iceberg_catalog.io-impl

    使用 DLF 定製的 FileIO 實現。

    固定值:org.apache.iceberg.rest.DlfFileIO

    spark.sql.catalog.iceberg_catalog.rest.auth.type

    啟用 AWS SigV4 簽名認證機制,用於對 REST 請求進行身分識別驗證。

    sigv4

    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type

    禁用委託認證,由用戶端直接提供 AK/SK 進行簽名。

    none

    spark.sql.catalog.iceberg_catalog.rest.signing-region

    指定簽名所用的地區(Region),必須與 DLF 服務所在地區一致。

    cn-hangzhou

    spark.sql.catalog.iceberg_catalog.rest.signing-name

    指定簽名所用的服務名稱

    固定值:DlfNext

    spark.sql.catalog.iceberg_catalog.rest.access-key-id

    阿里雲帳號或者RAM使用者的AccessKey ID。

    <access_key_id>

    spark.sql.catalog.iceberg_catalog.rest.secret-access-key

    阿里雲帳號或者RAM使用者的AccessKey Secret。

    <access_key_secret>

    DLF-Legacy(原DLF1.0)

    說明

    引擎版本要求esr-4.3.0、esr-3.3.0、esr-2.7.0及以上版本。

    中繼資料儲存在DLF-Legacy(原DLF1.0)中。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.catalog-impl  org.apache.iceberg.aliyun.dlf.hive.DlfCatalog
    spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name>

    涉及參數說明如下所示。

    參數

    說明

    樣本值

    spark.sql.extensions

    啟用Iceberg Spark擴充。

    固定值:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    註冊一個名為 <catalogName> 的 Catalog。

    固定值:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.catalog-impl

    採用阿里雲 DLF-Legacy 專用的 Hive 相容實現,直連 DLF-Legacy 中繼資料服務。

    固定值:org.apache.iceberg.aliyun.dlf.hive.DlfCatalog

    spark.sql.catalog.<catalogName>.dlf.catalog.id

    指定關聯的 DLF Catalog 名稱。

    <catalog_name>

    Hive MetaStore

    中繼資料儲存在指定的Hive MetaStore中。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.catalog-impl  org.apache.iceberg.hive.HiveCatalog
    spark.sql.catalog.<catalogName>.uri           thrift://<yourHMSUri>:<port>

    涉及參數如下表所示。

    參數

    說明

    樣本值

    spark.sql.extensions

    啟用Iceberg Spark擴充。

    固定值:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    註冊一個名為 <catalogName> 的 Catalog

    固定值:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.catalog-impl

    指定該 Catalog 使用 Iceberg 官方的 HiveCatalog 實現,通過 Hive Metastore 儲存和讀取 Iceberg 表的中繼資料。

    固定值:org.apache.iceberg.hive.HiveCatalog

    spark.sql.catalog.<catalogName>.uri

    Hive MetaStore的URI。格式為thrift://<Hive metastore的IP地址>:9083

    <Hive metastore的IP地址>為HMS服務的內網IP地址。如果您需要指定外部Metastore服務,請參見串連外部Hive Metastore Service

    thrift://192.168.**.**:9083

    FileSystem

    中繼資料儲存在檔案系統中。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.type          hadoop
    spark.sql.catalog.<catalogName>.warehouse     oss://<yourBucketName>/warehouse

    涉及參數如下表所示。

    參數

    說明

    樣本值

    spark.sql.extensions

    啟用Iceberg Spark擴充。

    固定值:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    註冊一個名為 <catalogName> 的 Catalog。

    固定值:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.type

    指定 Catalog 類型為 hadoop。表示使用HadoopCatalog,將中繼資料直接儲存在檔案系統中,無需 Hive Metastore。

    hadoop

    spark.sql.catalog.<catalogName>.warehouse

    指定中繼資料存放區路徑。代碼中的<yourBucketName>表示OSS上的Bucket名稱。

    oss://<yourBucketName>/warehouse

步驟二:讀寫Iceberg

  1. 進入SQL開發頁面。

    EMR Serverless Spark頁面,單擊左側導覽列中的数据开发

  2. 開發目錄頁簽下,單擊image表徵圖。

  3. 建立對話方塊中,輸入名稱(例如users_task),類型使用預設的SparkSQL,然後單擊確定

  4. 拷貝如下代碼到新增的SparkSQL頁簽(users_task)中。

    說明

    在未指定資料庫的情況下,建立資料表將預設位於Catalog下的default資料庫中,使用者亦可建立並指定其他資料庫。

    -- 建立資料庫
    CREATE DATABASE IF NOT EXISTS iceberg_catalog.db;
    
    -- 建立非分區表
    CREATE TABLE iceberg_catalog.db.tbl (
        id BIGINT NOT NULL COMMENT 'unique id',
        data STRING
    )
    USING iceberg;
    
    -- 插入非分區表資料
    INSERT INTO iceberg_catalog.db.tbl VALUES
    (1, 'Alice'),
    (2, 'Bob'),
    (3, 'Charlie');
    
    -- 查詢非分區表所有資料
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 按條件查詢非分區表
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
    
    -- 更新非分區表資料
    UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
    
    -- 重新查詢確認更新
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
    
    -- 刪除非分區表資料
    DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
    
    -- 重新查詢確認刪除
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 建立分區表
    CREATE TABLE iceberg_catalog.db.part_tbl (
        id BIGINT,
        data STRING,
        category STRING,
        ts TIMESTAMP,
        dt DATE
    )
    USING iceberg
    PARTITIONED BY (dt, category);
    
    -- 插入分區表資料
    INSERT INTO iceberg_catalog.db.part_tbl VALUES
      (1 , 'data-01', 'A', timestamp'2026-01-01 10:00:00', date'2026-01-01'),
      (2 , 'data-02', 'A', timestamp'2026-01-01 11:00:00', date'2026-01-01'),
      (3 , 'data-03', 'A', timestamp'2026-01-02 09:30:00', date'2026-01-02'),
      (4 , 'data-04', 'B', timestamp'2026-01-02 12:15:00', date'2026-01-02'),
      (5 , 'data-05', 'B', timestamp'2026-01-03 08:05:00', date'2026-01-03'),
      (6 , 'data-06', 'B', timestamp'2026-01-03 14:20:00', date'2026-01-03'),
      (7 , 'data-07', 'C', timestamp'2026-01-04 16:45:00', date'2026-01-04'),
      (8 , 'data-08', 'C', timestamp'2026-01-04 18:10:00', date'2026-01-04'),
      (9 , 'data-09', 'C', timestamp'2026-01-05 07:55:00', date'2026-01-05'),
      (10, 'data-10', 'A', timestamp'2026-01-05 13:35:00', date'2026-01-05');
    
    
    -- 查詢分區表所有資料
    SELECT * FROM iceberg_catalog.db.part_tbl;
    
    -- 查詢 dt='2026-01-01' 的資料
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01';
    
    -- 查詢某個 category 的資料
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
    
    -- 多條件組合查詢(day + category)
    SELECT * FROM iceberg_catalog.db.part_tbl 
    WHERE dt='2026-01-01'
      AND category = 'A';
    
    -- 彙總統計每個分類的資料數量
    SELECT category, COUNT(*) AS count 
    FROM iceberg_catalog.db.part_tbl 
    GROUP BY category;
    
    -- 刪除資料庫(謹慎操作),刪除前需確保該db下tbl為空白
    -- DROP DATABASE iceberg_catalog.db;
  5. 在會話下拉式清單中選擇剛剛建立的SQL會話執行個體,並單擊“運行”按鈕。成功執行後,您可以在下方查看運行結果。image

相關文檔