All Products
Search
Document Center

E-MapReduce:Use Paimon

Last Updated:Dec 22, 2025

Apache Paimon is a unified lake storage format for streaming and batch processing. It supports high-throughput writes and low-latency queries. This topic describes how to read data from and write data to Paimon tables in EMR Serverless Spark.

Prerequisites

A workspace has been created. For more information, see Create a workspace.

Procedure

Step 1: Create an SQL session

  1. Go to the Sessions page.

    1. Log on to the EMR console.

    2. In the left-side navigation pane, choose EMR Serverless > Spark.

    3. On the Spark page, click the name of the workspace that you want to manage.

    4. In the left-side navigation pane of the EMR Serverless Spark page, choose Operation Center > Sessions.

  2. On the SQL Sessions tab, click Create SQL Session.

  3. On the Create SQL Session page, in the Spark Configuration section, configure the following parameters and click Create. For more information, see Manage SQL sessions.

    Spark uses catalogs to read data from and write data to Paimon. You must select a catalog. For more information about catalogs, see Manage data catalogs.

    Use a data catalog

    If you use a data catalog, you do not need to configure parameters in the session. On the Catalogs page, click Add Catalog, and then select the data catalog for SparkSQL development.

    Note

    This feature requires EMR engine versions esr-4.3.0 or later, esr-3.3.0 or later, or esr-2.7.0 or later.

    Use a custom catalog

    DLF (formerly DLF 2.5)

    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       rest
    spark.sql.catalog.<catalogName>.uri                             http://cn-hangzhou-vpc.dlf.aliyuncs.com
    spark.sql.catalog.<catalogName>.warehouse                       <catalog_name>
    spark.sql.catalog.<catalogName>.token.provider                  dlf
    spark.sql.catalog.<catalogName>.dlf.access-key-id               <access_key_id>
    spark.sql.catalog.<catalogName>.dlf.access-key-secret           <access_key_secret>

    The following table describes the parameters.

    Parameter

    Description

    Example

    spark.sql.catalog.<catalogName>

    The catalog implementation.

    Static field: org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    Specifies the metastore type. Set the value to rest to use the DLF REST API.

    Static field: rest

    spark.sql.catalog.<catalogName>.uri

    Specifies the URI of DLF. The format is http://<endpoint>-vpc.dlf.aliyuncs.com.

    http://cn-hangzhou-vpc.dlf.aliyuncs.com

    spark.sql.catalog.<catalogName>.warehouse

    Specifies the data storage path (warehouse path). For DLF, specify the catalog name.

    <catalog_name>

    spark.sql.catalog.<catalogName>.token.provider

    Specifies the authentication provider. DLF uses dlf.

    Static field: dlf

    spark.sql.catalog.<catalogName>.dlf.access-key-id

    The AccessKey ID of your Alibaba Cloud account or Resource Access Management (RAM) user.

    <access_key_id>

    spark.sql.catalog.<catalogName>.dlf.access-key-secret

    The AccessKey secret of your Alibaba Cloud account or RAM user.

    <access_key_secret>

    DLF-Legacy (formerly DLF 1.0)

    The metadata is stored in DLF-Legacy (formerly DLF 1.0).

    spark.sql.catalog.<catalogName>                          org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                dlf
    spark.sql.catalog.<catalogName>.dlf.catalog.id           <catalog_name>
    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint     dlf-vpc.cn-hangzhou.aliyuncs.com

    The following table describes the parameters.

    Parameter

    Description

    Example

    spark.sql.catalog.<catalogname>

    The catalog implementation.

    Static field: org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogname>.metastore

    Specifies the metastore type. Set the value to dlf to use Alibaba Cloud DLF as the metastore.

    Static field: dlf

    spark.sql.catalog.<catalogName>.dlf.catalog.id

    Specifies the catalog name in DLF.

    <catalog_name>

    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint

    Specifies the endpoint of DLF. Select the correct DLF endpoint for your region.

    dlf-vpc.cn-hangzhou.aliyuncs.com

    Hive metastore

    The metadata is stored in the specified Hive metastore.

    spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore       hive
    spark.sql.catalog.<catalogName>.uri             thrift://<yourHMSUri>:<port>

    The following table describes the parameters.

    Parameter

    Description

    Example

    spark.sql.catalog.<catalogName>

    The catalog implementation.

    Static field: org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    Specifies the metastore type. Set the value to hive to use a Hive metastore.

    Static field: hive

    spark.sql.catalog.<catalogName>.uri

    The URI of the Hive metastore. The format is thrift://<IP address of Hive metastore>:9083.

    <IP address of Hive metastore> is the private IP address of the HMS service. To specify an external metastore service, see Connect to an external Hive Metastore service.

    thrift://192.168.**.**:9083

    File system

    The metadata is stored in a file system.

    spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore       filesystem
    spark.sql.catalog.<catalogName>.warehouse       oss://<yourBucketName>/warehouse

    The following table describes the parameters.

    Parameter

    Description

    Example

    spark.sql.catalog.<catalogName>

    The catalog implementation.

    Static field: org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    Specifies the metastore type. Set the value to filesystem to use a file system as the metastore.

    Static field: filesystem

    spark.sql.catalog.<catalogName>.warehouse

    Specifies the metadata storage path (warehouse path). In the code, <yourBucketName> is the name of the bucket in OSS.

    oss://my-bucket/warehouse

    You can also configure multiple catalogs, such as DLF, DLF-Legacy, and Hive, at the same time. The following code provides an example.

    # Configure a DLF catalog
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       rest
    spark.sql.catalog.<catalogName>.uri                             http://cn-hangzhou-vpc.dlf.aliyuncs.com
    spark.sql.catalog.<catalogName>.warehouse                       <catalog_name>
    spark.sql.catalog.<catalogName>.token.provider                  dlf
    spark.sql.catalog.<catalogName>.dlf.access-key-id               <access_key_id>
    spark.sql.catalog.<catalogName>.dlf.access-key-secret           <access_key_secret>
    
    # Configure a dlf-legacy catalog
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       dlf
    spark.sql.catalog.<catalogName>.dlf.catalog.id                  <catalog_name>
    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint            dlf-vpc.cn-hangzhou.aliyuncs.com
    
    
    # Configure a hive1 catalog
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       hive
    spark.sql.catalog.<catalogName>.uri                             thrift://<yourHMSUri-1>:<port>
    
    # Configure a hive2 catalog
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       hive
    spark.sql.catalog.<catalogName>.uri                             thrift://<yourHMSUri-2>:<port>

    Use the built-in catalog

    spark.sql.extensions               org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    spark.sql.catalog.spark_catalog    org.apache.paimon.spark.SparkGenericCatalog

Step 2: Read from and write to tables based on a Paimon catalog and spark_catalog

  1. Go to the SQL development page.

    On the EMR Serverless Spark page, click Data Development in the navigation pane on the left.

  2. On the Development tab, click the image icon.

  3. In the New dialog box, enter a name, such as users_task, leave the type as the default SparkSQL, and click OK.

  4. Copy the following code to the new Spark SQL tab (users_task).

    Use a Paimon catalog

    -- Create a database.
    CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;             
    
    -- Create a Paimon table.
    CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    
    -- Write data to the Paimon table.
    INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"), (3, "c");
    
    -- Query the write results from the Paimon table.
    SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;
    
    -- Drop the database.
    DROP DATABASE paimon.ss_paimon_db CASCADE;

    Use spark_catalog

    -- Create a database.
    CREATE DATABASE IF NOT EXISTS ss_paimon_db; 
    CREATE DATABASE IF NOT EXISTS ss_parquet_db;
    
    -- Create a Paimon table and a Parquet table.
    CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c";
    
    -- Write data.
    INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b");
    INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;
    
    -- Query the write results.
    SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;
    SELECT * FROM ss_parquet_db.parquet_tbl;
    
    -- Drop the database.
    DROP DATABASE ss_paimon_db CASCADE;
    DROP DATABASE ss_parquet_db CASCADE;
  5. From the drop-down lists, select a database and the SQL session that you created.

  6. Click Run to execute the task. The following information is returned.

    image

FAQ

What do I do if an error occurs when I run a DELETE, UPDATE, or MERGE operation on a table?

  • Symptom: When you run a DELETE, UPDATE, or MERGE operation, an error message similar to the following appears.

    Caused by: org.apache.spark.sql.AnalysisException: Table does not support deletes/updates/merge: <tableName>.
        at org.apache.spark.sql.errors.QueryCompilationErrors$.tableDoesNotSupportError(QueryCompilationErrors.scala:1391)
  • Cause: The storage format of the table does not support row-level update operations, or a required Spark configuration is missing.

  • Solution:

    1. Check the table type.

      Run the following command to check whether the table is a Paimon table.

      SHOW CREATE TABLE <tableName>;

      If the output contains USING PAIMON, the table is a Paimon table. If the output indicates another storage format, such as USING hive, check whether that format supports row-level update operations.

    2. Check the Spark configuration.

      If the table is a Paimon table, check the Spark Configuration section to ensure that the following configuration is added to enable Paimon support.

      spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

      If this configuration is not added, add it to the Spark Configuration section.

References