全部產品
Search
文件中心

Data Lake Formation:EMR on ECS Spark訪問DLF

更新時間:Jan 05, 2026

本文為您介紹在EMR on ECS Spark環境中如何以Iceberg REST訪問DLF Catalog。

前提條件

  • 版本要求:已建立EMR叢集且版本 ≥ 5.12.0,組件選擇Spark3,且Spark3使用JDK 11

  • 地區要求:EMR與DLF在同一地區,且已將EMR叢集所在的VPC加入DLF的白名單中。

  • 許可權要求:如果是RAM使用者,在進行資料操作之前,需要先授予相應的資源許可權。詳情請參見資料授權管理

下載依賴配置

授予角色DLF許可權

  1. 授予AliyunECSInstanceForEMRRole角色RAM許可權(EMR產品化整合後可以省略該步驟)。

    1. 使用阿里雲帳號或Resource Access Management員登入RAM控制台

    2. 單擊身份管理 > 角色,查詢AliyunECSInstanceForEMRRole角色。

    3. 單擊操作列的新增授權,進入新增授權頁面。

    4. 權限原則中,查詢並勾選AliyunDLFFullAccess,單擊確認新增授權

    image

  2. 授予AliyunECSInstanceForEMRRole角色DLF許可權。

    1. 登入資料湖構建控制台

    2. Catalogs列表頁面,單擊Catalog名稱,進入Catalog詳情頁。

    3. 單擊許可權頁簽,單擊授權

    4. 在授權頁面,配置以下資訊,單擊確定

      • 使用者/角色:選擇RAM使用者/RAM角色

      • 選擇授權對象:在下拉式清單中選擇AliyunECSInstanceForEMRRole

        說明

        如果使用者下拉式清單中未找到AliyunECSInstanceForEMRRole,可以在使用者管理頁面單擊同步。

      • 預置權限類別型:選擇Data Editor。

串連Catalog

依賴配置

直接將四個依賴包加入到$SPARK_HOME/jars中。

會話啟動

在Terminal中執行spark-sql命令,注意替換對應參數。

  • ${regionID}:為實際region,如cn-hangzhou。

  • ${catalogName}:替換成在DLF中建立好的catalog名稱。

spark-sql \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg_catalog.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.iceberg_catalog.uri=http://${regionID}-vpc.dlf.aliyuncs.com/iceberg \
--conf spark.sql.catalog.iceberg_catalog.warehouse=${catalogName}  \
--conf spark.sql.catalog.iceberg_catalog.io-impl=org.apache.iceberg.rest.DlfFileIO \
--conf spark.sql.catalog.iceberg_catalog.rest.auth.type=sigv4 \
--conf spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type=none \
--conf spark.sql.catalog.iceberg_catalog.rest.signing-region=${regionID} \
--conf spark.sql.catalog.iceberg_catalog.client.credentials-provider=org.apache.iceberg.rest.credentials.DlfEcsTokenCredentialsProvider \
--conf spark.sql.catalog.iceberg_catalog.rest.signing-name=DlfNext \
--master local

讀寫資料

重要

Iceberg 表不具備自動儲存回收機制。為避免儲存成本激增,請務必參考Spark Procedures以定期清理到期快照和孤立檔案。

說明

不指定資料庫時,建立資料表會預設建在Catalog的default資料庫中,也可建立並指定其他資料庫。

-- 切換到 Iceberg Catalog
USE iceberg_catalog;

-- 建立資料庫
CREATE DATABASE db;

-- 建立非分區表
CREATE TABLE iceberg_catalog.db.tbl (
    id BIGINT NOT NULL COMMENT 'unique id',
    data STRING
)
USING iceberg;

-- 插入非分區表資料
INSERT INTO iceberg_catalog.db.tbl VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie');

-- 查詢非分區表所有資料
SELECT * FROM iceberg_catalog.db.tbl;

-- 按條件查詢非分區表
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;

-- 更新非分區表資料
UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;

-- 重新查詢確認更新
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;

-- 刪除非分區表資料
DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;

-- 重新查詢確認刪除
SELECT * FROM iceberg_catalog.db.tbl;

-- 建立分區表
CREATE TABLE iceberg_catalog.db.part_tbl (
    id BIGINT,
    data STRING,
    category STRING,
    ts TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);

-- 插入分區表資料
INSERT INTO iceberg_catalog.db.part_tbl VALUES
(100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')),
(200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')),
(300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')),
(400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00'));


-- 查詢分區表所有資料
SELECT * FROM iceberg_catalog.db.part_tbl;

-- 查詢 bucket(16, id) = 0 的資料
SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0;

-- 查詢 day(ts) = '2025-01-01' 的資料
SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01';

-- 查詢某個 category 的資料
SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';

-- 多條件組合查詢(bucket + day + category)
SELECT * FROM iceberg_catalog.db.part_tbl 
WHERE bucket(16, id) = 0 
  AND days(ts) = '2025-01-01'
  AND category = 'A';

-- 彙總統計每個分類的資料數量
SELECT category, COUNT(*) AS count 
FROM iceberg_catalog.db.part_tbl 
GROUP BY category;

-- 清理測試資料(可選)
TRUNCATE TABLE iceberg_catalog.db.tbl;
TRUNCATE TABLE iceberg_catalog.db.part_tbl;

-- 刪除資料庫(謹慎操作)
DROP DATABASE iceberg_catalog.db;