All Products
Search
Document Center

E-MapReduce:Use Iceberg

Last Updated:Feb 04, 2026

Iceberg is an open table format for data lakes. You can use Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). This topic describes how to read data from and write data to an Iceberg table in EMR Serverless Spark.

Prerequisites

A workspace has been created. For more information, see Create a workspace.

Procedure

Note

You can use Spark SQL jobs or notebooks to read data from and write data to Iceberg tables. This topic uses a Spark SQL job as an example.

Step 1: Create a session

  1. Go to the Sessions page.

    1. Log on to the EMR console.

    2. In the left-side navigation pane, choose EMR Serverless > Spark.

    3. On the Spark page, click the name of the workspace that you want to manage.

    4. In the left-side navigation pane of the EMR Serverless Spark page, choose Operation Center > Sessions.

  2. On the SQL Sessions tab, click Create SQL Session.

  3. On the Create SQL Session page, configure the following settings in the Spark Configuration section and click Create. For more information, see Manage SQL sessions.

    Spark reads from and writes to Iceberg tables using catalogs. Choose a catalog based on your use case. For more information about catalogs, see Manage data catalogs.

    Use a data catalog

    If you use a data catalog, you do not need to configure parameters in the session. Instead, you can go to the Catalogs page and click Add Catalog. Then you can select the data catalog directly in Spark SQL development.

    Note
    • To access Iceberg tables in DLF (formerly DLF 2.5), use engine version esr-4.7.0, esr-3.6.0, or later.

    • To access Iceberg tables in DLF-Legacy (formerly DLF 1.0) or Hive Metastore, use engine version esr-4.3.0, esr-3.3.0, esr-2.7.0, or later.

    Use a custom catalog

    DLF (formerly DLF 2.5)

    Note

    Engine version must be esr-4.7.0, esr-3.6.0, or later.

    spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog
    spark.sql.catalog.iceberg_catalog.uri http://<regionID>-vpc.dlf.aliyuncs.com
    spark.sql.catalog.iceberg_catalog.warehouse  <catalog_name>
    spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO
    spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4
    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none
    spark.sql.catalog.iceberg_catalog.rest.signing-region <regionID>
    spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext
    spark.sql.catalog.iceberg_catalog.rest.access-key-id <access_key_id>
    spark.sql.catalog.iceberg_catalog.rest.secret-access-key <access_key_secret>

    The following table describes the parameters.

    Parameter

    Description

    Example value

    spark.sql.extensions

    Enable Iceberg Spark extensions.

    Static value: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.iceberg_catalog

    Register a Spark catalog named iceberg_catalog.

    Static value: org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.iceberg_catalog.catalog-impl

    Specify Iceberg REST Catalog as the underlying catalog implementation.

    org.apache.iceberg.rest.RESTCatalog

    spark.sql.catalog.iceberg_catalog.uri

    The REST API endpoint of the DLF Iceberg service. Format: http://<regionID>-vpc.dlf.aliyuncs.com.

    http://cn-hangzhou-vpc.dlf.aliyuncs.com

    spark.sql.catalog.iceberg_catalog.warehouse

    Specify the name of the associated DLF catalog.

    Note

    We do not recommend associating a DLF catalog created by data sharing.

    <catalog_name>

    spark.sql.catalog.iceberg_catalog.io-impl

    Use the DLF-specific FileIO implementation.

    Static value: org.apache.iceberg.rest.DlfFileIO

    spark.sql.catalog.iceberg_catalog.rest.auth.type

    Enable AWS SigV4 signature authentication to verify REST requests.

    sigv4

    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type

    Disable delegation authentication. The client provides the AccessKey ID and AccessKey secret directly for signing.

    none

    spark.sql.catalog.iceberg_catalog.rest.signing-region

    Specify the region used for signing. This must match the region where the DLF service runs.

    cn-hangzhou

    spark.sql.catalog.iceberg_catalog.rest.signing-name

    Specify the service name used for signing.

    Static value: DlfNext

    spark.sql.catalog.iceberg_catalog.rest.access-key-id

    Your Alibaba Cloud account or Resource Access Management (RAM) user AccessKey ID.

    <access_key_id>

    spark.sql.catalog.iceberg_catalog.rest.secret-access-key

    Your Alibaba Cloud account or RAM user AccessKey secret.

    <access_key_secret>

    DLF-Legacy (formerly DLF 1.0)

    Note

    Engine version must be esr-4.3.0, esr-3.3.0, esr-2.7.0, or later.

    Metadata is stored in DLF-Legacy (formerly DLF 1.0).

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.catalog-impl  org.apache.iceberg.aliyun.dlf.hive.DlfCatalog
    spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name>

    The following table describes the parameters.

    Parameter

    Description

    Example value

    spark.sql.extensions

    Enable Iceberg Spark extensions.

    Static value: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    Register a catalog named <catalogName>.

    Static value: org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.catalog-impl

    Use the Alibaba Cloud DLF-Legacy–specific Hive-compatible implementation to connect directly to the DLF-Legacy global meta service.

    Static value: org.apache.iceberg.aliyun.dlf.hive.DlfCatalog

    spark.sql.catalog.<catalogName>.dlf.catalog.id

    Specify the name of the associated DLF catalog.

    <catalog_name>

    Hive Metastore

    Metadata is stored in a specified Hive Metastore.

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.catalog-impl  org.apache.iceberg.hive.HiveCatalog
    spark.sql.catalog.<catalogName>.uri           thrift://<yourHMSUri>:<port>

    The following table describes the parameters.

    Parameter

    Description

    Example value

    spark.sql.extensions

    Enable Iceberg Spark extensions.

    Static value: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    Register a catalog named <catalogName>.

    Static value: org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.catalog-impl

    Specify Iceberg’s official HiveCatalog implementation to store and retrieve Iceberg table metadata through Hive Metastore.

    Static value: org.apache.iceberg.hive.HiveCatalog

    spark.sql.catalog.<catalogName>.uri

    The Uniform Resource Identifier (URI) of the Hive Metastore. Format: thrift://<IP address of a Hive Metastore>:9083.

    <IP address of a Hive Metastore> is the internal IP address of the Hive Metastore service. For information about how to specify an external Metastore service, see Connect to an external Hive Metastore service.

    thrift://192.168.**.**:9083

    File system

    Metadata is stored in a file system.

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.type          hadoop
    spark.sql.catalog.<catalogName>.warehouse     oss://<yourBucketName>/warehouse

    The following table describes the parameters.

    Parameter

    Description

    Example value

    spark.sql.extensions

    Enable Iceberg Spark extensions.

    Static value: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    Register a catalog named <catalogName>.

    Static value: org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.type

    Set the catalog type to hadoop. This means use HadoopCatalog to store metadata directly in the file system without Hive Metastore.

    hadoop

    spark.sql.catalog.<catalogName>.warehouse

    Specify the metadata storage path. In this code, <yourBucketName> is the name of your OSS bucket.

    oss://<yourBucketName>/warehouse

Step 2: Read from and write to an Iceberg table

  1. You can go to the SQL development page.

    On the EMR Serverless Spark page, click Data Development in the navigation pane on the left.

  2. On the Development tab, click the image icon.

  3. In the New dialog box, enter a name, such as users_task, leave the type as the default SparkSQL, and click OK.

  4. You can copy the following code into the new SparkSQL tab (users_task).

    Note

    If you do not specify a database, tables are created in the default database of the catalog. You can also create and specify another database.

    -- Create a database
    CREATE DATABASE IF NOT EXISTS iceberg_catalog.db;
    
    -- Create a non-partitioned table
    CREATE TABLE iceberg_catalog.db.tbl (
        id BIGINT NOT NULL COMMENT 'unique id',
        data STRING
    )
    USING iceberg;
    
    -- Insert data into the non-partitioned table
    INSERT INTO iceberg_catalog.db.tbl VALUES
    (1, 'Alice'),
    (2, 'Bob'),
    (3, 'Charlie');
    
    -- Query all data from the non-partitioned table
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- Query data from the non-partitioned table by condition
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
    
    -- Update data in the non-partitioned table
    UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
    
    -- Verify the update
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
    
    -- Delete data from the non-partitioned table
    DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
    
    -- Verify the deletion
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- Create a partitioned table
    CREATE TABLE iceberg_catalog.db.part_tbl (
        id BIGINT,
        data STRING,
        category STRING,
        ts TIMESTAMP,
        dt DATE
    )
    USING iceberg
    PARTITIONED BY (dt, category);
    
    -- Insert data into the partitioned table
    INSERT INTO iceberg_catalog.db.part_tbl VALUES
      (1 , 'data-01', 'A', timestamp'2026-01-01 10:00:00', date'2026-01-01'),
      (2 , 'data-02', 'A', timestamp'2026-01-01 11:00:00', date'2026-01-01'),
      (3 , 'data-03', 'A', timestamp'2026-01-02 09:30:00', date'2026-01-02'),
      (4 , 'data-04', 'B', timestamp'2026-01-02 12:15:00', date'2026-01-02'),
      (5 , 'data-05', 'B', timestamp'2026-01-03 08:05:00', date'2026-01-03'),
      (6 , 'data-06', 'B', timestamp'2026-01-03 14:20:00', date'2026-01-03'),
      (7 , 'data-07', 'C', timestamp'2026-01-04 16:45:00', date'2026-01-04'),
      (8 , 'data-08', 'C', timestamp'2026-01-04 18:10:00', date'2026-01-04'),
      (9 , 'data-09', 'C', timestamp'2026-01-05 07:55:00', date'2026-01-05'),
      (10, 'data-10', 'A', timestamp'2026-01-05 13:35:00', date'2026-01-05');
    
    
    -- Query all data from the partitioned table
    SELECT * FROM iceberg_catalog.db.part_tbl;
    
    -- Query data for dt='2026-01-01'
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01';
    
    -- Query data for category = 'A'
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
    
    -- Query with multiple conditions (dt + category)
    SELECT * FROM iceberg_catalog.db.part_tbl 
    WHERE dt='2026-01-01'
      AND category = 'A';
    
    -- Aggregate count by category
    SELECT category, COUNT(*) AS count 
    FROM iceberg_catalog.db.part_tbl 
    GROUP BY category;
    
    -- Drop the database (use caution). Ensure the database is empty before dropping.
    -- DROP DATABASE iceberg_catalog.db;
  5. You can select the SQL session you created from the session drop-down list and click Run. After successful execution, you can view the results below. image

References