全部产品
Search
文档中心

数据湖构建:EMR on ECS Spark访问DLF

更新时间:Jan 04, 2026

本文为您介绍在EMR on ECS Spark环境中如何以Iceberg REST访问DLF Catalog。

前提条件

  • 版本要求:已创建EMR集群且版本 ≥ 5.12.0,组件选择Spark3,且Spark3使用JDK 11

  • 地域要求:EMR与DLF在同一地域,且已将EMR集群所在的VPC加入DLF的白名单中。

  • 权限要求:如果是RAM用户,在进行数据操作之前,需要先授予相应的资源权限。详情请参见数据授权管理

下载依赖配置

授予角色DLF权限

  1. 授予AliyunECSInstanceForEMRRole角色RAM权限(EMR产品化集成后可以省略该步骤)。

    1. 使用阿里云账号或RAM管理员登录RAM控制台

    2. 单击身份管理 > 角色,查询AliyunECSInstanceForEMRRole角色。

    3. 单击操作列的新增授权,进入新增授权页面。

    4. 权限策略中,查询并勾选AliyunDLFFullAccess,单击确认新增授权

    image

  2. 授予AliyunECSInstanceForEMRRole角色DLF权限。

    1. 登录数据湖构建控制台

    2. Catalogs列表页面,单击Catalog名称,进入Catalog详情页。

    3. 单击权限页签,单击授权

    4. 在授权页面,配置以下信息,单击确定

      • 用户/角色:选择RAM用户/RAM角色

      • 选择授权对象:在下拉列表中选择AliyunECSInstanceForEMRRole

        说明

        如果用户下拉列表中未找到AliyunECSInstanceForEMRRole,可以在用户管理页面单击同步。

      • 预置权限类型:选择Data Editor。

连接Catalog

依赖配置

直接将四个依赖包加入到$SPARK_HOME/jars中。

会话启动

在Terminal中执行spark-sql命令,注意替换对应参数。

  • ${regionID}:为实际region,如cn-hangzhou。

  • ${catalogName}:替换成在DLF中创建好的catalog名称。

spark-sql \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg_catalog.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.iceberg_catalog.uri=http://${regionID}-vpc.dlf.aliyuncs.com/iceberg \
--conf spark.sql.catalog.iceberg_catalog.warehouse=${catalogName}  \
--conf spark.sql.catalog.iceberg_catalog.io-impl=org.apache.iceberg.rest.DlfFileIO \
--conf spark.sql.catalog.iceberg_catalog.rest.auth.type=sigv4 \
--conf spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type=none \
--conf spark.sql.catalog.iceberg_catalog.rest.signing-region=${regionID} \
--conf spark.sql.catalog.iceberg_catalog.client.credentials-provider=org.apache.iceberg.rest.credentials.DlfEcsTokenCredentialsProvider \
--conf spark.sql.catalog.iceberg_catalog.rest.signing-name=DlfNext \
--master local

读写数据

重要

Iceberg 表不具备自动存储回收机制。为避免存储成本激增,请务必参考Spark Procedures以定期清理过期快照和孤立文件。

说明

不指定数据库时,创建数据表会默认建在Catalog的default数据库中,也可创建并指定其他数据库。

-- 切换到 Iceberg Catalog
USE iceberg_catalog;

-- 创建数据库
CREATE DATABASE db;

-- 创建非分区表
CREATE TABLE iceberg_catalog.db.tbl (
    id BIGINT NOT NULL COMMENT 'unique id',
    data STRING
)
USING iceberg;

-- 插入非分区表数据
INSERT INTO iceberg_catalog.db.tbl VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie');

-- 查询非分区表所有数据
SELECT * FROM iceberg_catalog.db.tbl;

-- 按条件查询非分区表
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;

-- 更新非分区表数据
UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;

-- 再次查询确认更新
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;

-- 删除非分区表数据
DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;

-- 再次查询确认删除
SELECT * FROM iceberg_catalog.db.tbl;

-- 创建分区表
CREATE TABLE iceberg_catalog.db.part_tbl (
    id BIGINT,
    data STRING,
    category STRING,
    ts TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);

-- 插入分区表数据
INSERT INTO iceberg_catalog.db.part_tbl VALUES
(100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')),
(200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')),
(300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')),
(400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00'));


-- 查询分区表所有数据
SELECT * FROM iceberg_catalog.db.part_tbl;

-- 查询 bucket(16, id) = 0 的数据
SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0;

-- 查询 day(ts) = '2025-01-01' 的数据
SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01';

-- 查询某个 category 的数据
SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';

-- 多条件组合查询(bucket + day + category)
SELECT * FROM iceberg_catalog.db.part_tbl 
WHERE bucket(16, id) = 0 
  AND days(ts) = '2025-01-01'
  AND category = 'A';

-- 聚合统计每个分类的数据数量
SELECT category, COUNT(*) AS count 
FROM iceberg_catalog.db.part_tbl 
GROUP BY category;

-- 清理测试数据(可选)
TRUNCATE TABLE iceberg_catalog.db.tbl;
TRUNCATE TABLE iceberg_catalog.db.part_tbl;

-- 删除数据库(谨慎操作)
DROP DATABASE iceberg_catalog.db;