This topic describes how to access a Data Lake Formation (DLF) catalog from EMR Serverless Spark using Iceberg REST.
Prerequisites
You have created a Serverless Spark workspace in the same region as your DLF instance. For more information, see Create a workspace.
If you are a Resource Access Management (RAM) user, you must grant the required resource permissions before performing data operations. For more information, see Data authorization management.
Limits
The following task types are supported:
SQL sessions: Manage SQL sessions.
Spark Thrift Server: Manage Spark Thrift Server sessions.
Batch jobs: Develop a batch job.
Step 1: Grant catalog permissions
Log on to the Data Lake Formation console.
On the Catalogs page, click the name of a catalog to go to its details page.
To grant permissions to the entire catalog, click the Permissions tab. Alternatively, you can navigate to a specific database or table and click its Permissions tab to grant access.
On the authorization page, configure the following settings and click OK.
User/Role: Select RAM User/RAM Role.
Select Authorization Object: Choose AliyunECSInstanceForEMRRole from the dropdown list.
NoteIf AliyunECSInstanceForEMRRole does not appear in the dropdown list, go to the user management page and click Sync.
Preset Permission Type: Select read permissions manually or use a predefined role, such as Data Reader or Data Editor.
Step 2: Read and write data
You can connect to the catalog.
You can create an SQL session. For more information, see Manage SQL sessions. You can use engine version esr-4.7.0, esr-3.6.0, or later.
Use a data catalog
If you use a data catalog, you do not need to configure parameters in the session. You can go to the Data Catalog page and click Add data catalog. Then you can select the data catalog directly in SparkSQL development.
Use a custom catalog
In the Spark Configuration section of Custom Configuration, you can add the following configurations.
ImportantIn the following configuration example,
iceberg_catalogis a custom catalog name. It registers an Iceberg table management service in Spark, based on the Iceberg REST catalog. This catalog connects to Alibaba Cloud DLF using REST APIs. You can change the catalog name and related parameters as needed.${regionID}: You can replace this with the actual region ID, such ascn-hangzhou. For more information, see Endpoint.${catalogName}: You can replace this with your DLF catalog name.${access_key_id}and${access_key_secret}: You can replace these with the AccessKey ID and secret of your Alibaba Cloud account.
# Enable the Iceberg Spark extension spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions # Register the iceberg_catalog catalog spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog # The underlying catalog implementation is Iceberg REST catalog spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog # The REST API endpoint of the DLF Iceberg service. spark.sql.catalog.iceberg_catalog.uri http://${regionID}-vpc.dlf.aliyuncs.com/iceberg # Specify your DLF catalog name. spark.sql.catalog.iceberg_catalog.warehouse ${catalogName} # Use the custom DLF FileIO implementation spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO # Enable SigV4 signature authentication spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4 spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none spark.sql.catalog.iceberg_catalog.rest.signing-region ${regionID} spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext # Access credentials spark.sql.catalog.iceberg_catalog.rest.access-key-id ${access_key_id} spark.sql.catalog.iceberg_catalog.rest.secret-access-key ${access_key_secret}Read and write data.
For a complete example of SQL job development, see Get started with SparkSQL development.
NoteIf no database is specified, tables are created in the catalog's
defaultdatabase. You can also create and specify another database.-- Create a database CREATE DATABASE IF NOT EXISTS db; -- Create a non-partitioned table CREATE TABLE iceberg_catalog.db.tbl ( id BIGINT NOT NULL COMMENT 'unique id', data STRING ) USING iceberg; -- Insert data into the non-partitioned table INSERT INTO iceberg_catalog.db.tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); -- Full table query SELECT * FROM iceberg_catalog.db.tbl; -- Conditional query SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2; -- Update data UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3; -- Query the table again to confirm the update SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3; -- Delete data DELETE FROM iceberg_catalog.db.tbl WHERE id = 1; -- Query the table again to confirm the deletion SELECT * FROM iceberg_catalog.db.tbl; -- Create a partitioned table CREATE TABLE iceberg_catalog.db.part_tbl ( id BIGINT, data STRING, category STRING, ts TIMESTAMP ) USING iceberg PARTITIONED BY (category); -- Insert data INSERT INTO iceberg_catalog.db.part_tbl VALUES (100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')), (200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')), (300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')), (400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00')); -- Full table query SELECT * FROM iceberg_catalog.db.part_tbl; -- Conditional query SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0; -- Conditional query SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01'; -- Conditional query SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A'; -- Combined conditional query (bucket + day + category) SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0 AND days(ts) = '2025-01-01' AND category = 'A'; -- Aggregate and count the number of data entries for each category SELECT category, COUNT(*) AS count FROM iceberg_catalog.db.part_tbl GROUP BY category; -- Delete the database. Ensure all tables in it are empty. -- DROP DATABASE iceberg_catalog.db;