本文為您介紹在EMR on ECS Spark環境中如何以Iceberg REST訪問DLF Catalog。
前提條件
版本要求:已建立EMR叢集且版本 ≥ 5.12.0,組件選擇Spark3,且Spark3使用JDK 11。
地區要求:EMR與DLF在同一地區,且已將EMR叢集所在的VPC加入DLF的白名單中。
許可權要求:如果是RAM使用者,在進行資料操作之前,需要先授予相應的資源許可權。詳情請參見資料授權管理。
下載依賴配置
授予角色DLF許可權
授予AliyunECSInstanceForEMRRole角色RAM許可權(EMR產品化整合後可以省略該步驟)。
使用阿里雲帳號或Resource Access Management員登入RAM控制台。
單擊,查詢AliyunECSInstanceForEMRRole角色。
單擊操作列的新增授權,進入新增授權頁面。
在權限原則中,查詢並勾選AliyunDLFFullAccess,單擊確認新增授權。

授予AliyunECSInstanceForEMRRole角色DLF許可權。
登入資料湖構建控制台。
在Catalogs列表頁面,單擊Catalog名稱,進入Catalog詳情頁。
單擊許可權頁簽,單擊授權。
在授權頁面,配置以下資訊,單擊確定。
使用者/角色:選擇RAM使用者/RAM角色。
選擇授權對象:在下拉式清單中選擇AliyunECSInstanceForEMRRole。
說明如果使用者下拉式清單中未找到AliyunECSInstanceForEMRRole,可以在使用者管理頁面單擊同步。
預置權限類別型:選擇Data Editor。
串連Catalog
依賴配置
直接將四個依賴包加入到$SPARK_HOME/jars中。
會話啟動
在Terminal中執行spark-sql命令,注意替換對應參數。
${regionID}:為實際region,如cn-hangzhou。
${catalogName}:替換成在DLF中建立好的catalog名稱。
spark-sql \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg_catalog.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.iceberg_catalog.uri=http://${regionID}-vpc.dlf.aliyuncs.com/iceberg \
--conf spark.sql.catalog.iceberg_catalog.warehouse=${catalogName} \
--conf spark.sql.catalog.iceberg_catalog.io-impl=org.apache.iceberg.rest.DlfFileIO \
--conf spark.sql.catalog.iceberg_catalog.rest.auth.type=sigv4 \
--conf spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type=none \
--conf spark.sql.catalog.iceberg_catalog.rest.signing-region=${regionID} \
--conf spark.sql.catalog.iceberg_catalog.client.credentials-provider=org.apache.iceberg.rest.credentials.DlfEcsTokenCredentialsProvider \
--conf spark.sql.catalog.iceberg_catalog.rest.signing-name=DlfNext \
--master local讀寫資料
Iceberg 表不具備自動儲存回收機制。為避免儲存成本激增,請務必參考Spark Procedures以定期清理到期快照和孤立檔案。
不指定資料庫時,建立資料表會預設建在Catalog的default資料庫中,也可建立並指定其他資料庫。
-- 切換到 Iceberg Catalog
USE iceberg_catalog;
-- 建立資料庫
CREATE DATABASE db;
-- 建立非分區表
CREATE TABLE iceberg_catalog.db.tbl (
id BIGINT NOT NULL COMMENT 'unique id',
data STRING
)
USING iceberg;
-- 插入非分區表資料
INSERT INTO iceberg_catalog.db.tbl VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie');
-- 查詢非分區表所有資料
SELECT * FROM iceberg_catalog.db.tbl;
-- 按條件查詢非分區表
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
-- 更新非分區表資料
UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
-- 重新查詢確認更新
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
-- 刪除非分區表資料
DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
-- 重新查詢確認刪除
SELECT * FROM iceberg_catalog.db.tbl;
-- 建立分區表
CREATE TABLE iceberg_catalog.db.part_tbl (
id BIGINT,
data STRING,
category STRING,
ts TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);
-- 插入分區表資料
INSERT INTO iceberg_catalog.db.part_tbl VALUES
(100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')),
(200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')),
(300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')),
(400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00'));
-- 查詢分區表所有資料
SELECT * FROM iceberg_catalog.db.part_tbl;
-- 查詢 bucket(16, id) = 0 的資料
SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0;
-- 查詢 day(ts) = '2025-01-01' 的資料
SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01';
-- 查詢某個 category 的資料
SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
-- 多條件組合查詢(bucket + day + category)
SELECT * FROM iceberg_catalog.db.part_tbl
WHERE bucket(16, id) = 0
AND days(ts) = '2025-01-01'
AND category = 'A';
-- 彙總統計每個分類的資料數量
SELECT category, COUNT(*) AS count
FROM iceberg_catalog.db.part_tbl
GROUP BY category;
-- 清理測試資料(可選)
TRUNCATE TABLE iceberg_catalog.db.tbl;
TRUNCATE TABLE iceberg_catalog.db.part_tbl;
-- 刪除資料庫(謹慎操作)
DROP DATABASE iceberg_catalog.db;