Connect EMR Serverless Spark to a Data Lake Formation (DLF) catalog using the Iceberg REST protocol to read and write Iceberg tables through Spark SQL.
Prerequisites
Before you begin, ensure that you have:
-
A Serverless Spark workspace created in the same region as your DLF instance. See Create a workspace
Supported task types
All three task types support connecting to DLF through the Iceberg REST catalog:
| Task type | Reference |
|---|---|
| SQL session | Manage SQL sessions |
| Spark Thrift Server | Manage Spark Thrift Server sessions |
| Batch job | Develop a batch job |
Step 1: Grant catalog permissions
-
Log on to the Data Lake Formation console.
-
On the Catalogs page, click the catalog name to open its details page.
-
Click the Permissions tab to grant access to the entire catalog. To grant access to a specific database or table instead, navigate to that resource and click its Permissions tab.
-
Configure the following fields and click OK:
NoteIf AliyunECSInstanceForEMRRole does not appear in the dropdown, go to the user management page and click Sync.
Field Value User/Role Select RAM User/RAM Role Select Authorization Object Select AliyunECSInstanceForEMRRole from the dropdown Preset Permission Type Select read permissions manually or choose a predefined role such as Data Reader or Data Editor
If you are a Resource Access Management (RAM) user, grant the required resource permissions before performing data operations. See Data authorization management.
Step 2: Connect to the catalog and read/write data
Choose one of the following connection methods based on how you manage your catalog.
Option 1: Use a data catalog (recommended)
If you work with a DLF-managed data catalog, no Spark session configuration is needed. Go to the Data Catalog page, click Add data catalog, and then select the catalog directly in Spark SQL development.
Option 2: Use a custom catalog
Add the following configuration in the Spark Configuration section of Custom Configuration.
The configuration below uses iceberg_catalog as the catalog name. This name registers an Iceberg table management service in Spark backed by the Iceberg REST catalog, which connects to DLF through REST APIs. Change the catalog name and related parameters as needed.
# Enable the Iceberg Spark extension
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
# Register the catalog
spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog
# Use the Iceberg REST catalog implementation
spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog
# DLF Iceberg REST endpoint
spark.sql.catalog.iceberg_catalog.uri http://${regionID}-vpc.dlf.aliyuncs.com/iceberg
# Your DLF catalog name
spark.sql.catalog.iceberg_catalog.warehouse ${catalogName}
# Use the DLF FileIO implementation
spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO
# Enable SigV4 signature authentication
spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4
spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none
spark.sql.catalog.iceberg_catalog.rest.signing-region ${regionID}
spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext
# Access credentials
spark.sql.catalog.iceberg_catalog.rest.access-key-id ${access_key_id}
spark.sql.catalog.iceberg_catalog.rest.secret-access-key ${access_key_secret}
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
${regionID} |
Region ID where your DLF instance is deployed. See Endpoints. | cn-hangzhou |
${catalogName} |
Your DLF catalog name | my-catalog |
${access_key_id} |
AccessKey ID of your Alibaba Cloud account | — |
${access_key_secret} |
AccessKey secret of your Alibaba Cloud account | — |
For SQL sessions, use engine version esr-4.7.0, esr-3.6.0, or later.
Read and write data
The following examples show common Spark SQL operations against an Iceberg table in DLF. All statements reference tables in the format iceberg_catalog.<database>.<table>.
If no database is specified, tables are created in the catalog's default database.
For a complete Spark SQL development walkthrough, see Get started with Spark SQL development.
-- Create a database
CREATE DATABASE IF NOT EXISTS db;
-- Create a non-partitioned table
CREATE TABLE iceberg_catalog.db.tbl (
id BIGINT NOT NULL COMMENT 'unique id',
data STRING
)
USING iceberg;
-- Insert rows
INSERT INTO iceberg_catalog.db.tbl VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie');
-- Query all rows
SELECT * FROM iceberg_catalog.db.tbl;
-- Query by condition
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
-- Update a row
UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
-- Confirm the update
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
-- Delete a row
DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
-- Confirm the deletion
SELECT * FROM iceberg_catalog.db.tbl;
-- Create a partitioned table
CREATE TABLE iceberg_catalog.db.part_tbl (
id BIGINT,
data STRING,
category STRING,
ts TIMESTAMP
)
USING iceberg
PARTITIONED BY (category);
-- Insert rows
INSERT INTO iceberg_catalog.db.part_tbl VALUES
(100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')),
(200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')),
(300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')),
(400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00'));
-- Query all rows
SELECT * FROM iceberg_catalog.db.part_tbl;
-- Filter by bucket
SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0;
-- Filter by day
SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01';
-- Filter by partition column
SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
-- Combined filter (bucket + day + category)
SELECT * FROM iceberg_catalog.db.part_tbl
WHERE bucket(16, id) = 0
AND days(ts) = '2025-01-01'
AND category = 'A';
-- Aggregate by category
SELECT category, COUNT(*) AS count
FROM iceberg_catalog.db.part_tbl
GROUP BY category;
-- Drop the database (all tables must be empty first)
-- DROP DATABASE iceberg_catalog.db;