Apache Paimon is a unified lake storage format for streaming and batch processing. It supports high-throughput writes and low-latency queries. This topic describes how to read data from and write data to Paimon tables in EMR Serverless Spark.
Prerequisites
A workspace has been created. For more information, see Create a workspace.
Procedure
Step 1: Create an SQL session
Go to the Sessions page.
Log on to the EMR console.
In the left-side navigation pane, choose .
On the Spark page, click the name of the workspace that you want to manage.
In the left-side navigation pane of the EMR Serverless Spark page, choose Operation Center > Sessions.
On the SQL Sessions tab, click Create SQL Session.
On the Create SQL Session page, in the Spark Configuration section, configure the following parameters and click Create. For more information, see Manage SQL sessions.
Spark uses catalogs to read data from and write data to Paimon. You must select a catalog. For more information about catalogs, see Manage data catalogs.
Use a data catalog
If you use a data catalog, you do not need to configure parameters in the session. On the Catalogs page, click Add Catalog, and then select the data catalog for SparkSQL development.
NoteThis feature requires EMR engine versions esr-4.3.0 or later, esr-3.3.0 or later, or esr-2.7.0 or later.
Use a custom catalog
DLF (formerly DLF 2.5)
spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore rest spark.sql.catalog.<catalogName>.uri http://cn-hangzhou-vpc.dlf.aliyuncs.com spark.sql.catalog.<catalogName>.warehouse <catalog_name> spark.sql.catalog.<catalogName>.token.provider dlf spark.sql.catalog.<catalogName>.dlf.access-key-id <access_key_id> spark.sql.catalog.<catalogName>.dlf.access-key-secret <access_key_secret>The following table describes the parameters.
Parameter
Description
Example
spark.sql.catalog.<catalogName>The catalog implementation.
Static field:
org.apache.paimon.spark.SparkCatalogspark.sql.catalog.<catalogName>.metastoreSpecifies the metastore type. Set the value to
restto use the DLF REST API.Static field:
restspark.sql.catalog.<catalogName>.uriSpecifies the URI of DLF. The format is
http://<endpoint>-vpc.dlf.aliyuncs.com.http://cn-hangzhou-vpc.dlf.aliyuncs.comspark.sql.catalog.<catalogName>.warehouseSpecifies the data storage path (warehouse path). For DLF, specify the catalog name.
<catalog_name>spark.sql.catalog.<catalogName>.token.providerSpecifies the authentication provider. DLF uses
dlf.Static field:
dlfspark.sql.catalog.<catalogName>.dlf.access-key-idThe AccessKey ID of your Alibaba Cloud account or Resource Access Management (RAM) user.
<access_key_id>spark.sql.catalog.<catalogName>.dlf.access-key-secretThe AccessKey secret of your Alibaba Cloud account or RAM user.
<access_key_secret>DLF-Legacy (formerly DLF 1.0)
The metadata is stored in DLF-Legacy (formerly DLF 1.0).
spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore dlf spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name> spark.sql.catalog.<catalogName>.dlf.catalog.endpoint dlf-vpc.cn-hangzhou.aliyuncs.comThe following table describes the parameters.
Parameter
Description
Example
spark.sql.catalog.<catalogname>The catalog implementation.
Static field:
org.apache.paimon.spark.SparkCatalogspark.sql.catalog.<catalogname>.metastoreSpecifies the metastore type. Set the value to
dlfto use Alibaba Cloud DLF as the metastore.Static field:
dlfspark.sql.catalog.<catalogName>.dlf.catalog.idSpecifies the catalog name in DLF.
<catalog_name>spark.sql.catalog.<catalogName>.dlf.catalog.endpointSpecifies the endpoint of DLF. Select the correct DLF endpoint for your region.
dlf-vpc.cn-hangzhou.aliyuncs.comHive metastore
The metadata is stored in the specified Hive metastore.
spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore hive spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri>:<port>The following table describes the parameters.
Parameter
Description
Example
spark.sql.catalog.<catalogName>The catalog implementation.
Static field:
org.apache.paimon.spark.SparkCatalogspark.sql.catalog.<catalogName>.metastoreSpecifies the metastore type. Set the value to
hiveto use a Hive metastore.Static field:
hivespark.sql.catalog.<catalogName>.uriThe URI of the Hive metastore. The format is
thrift://<IP address of Hive metastore>:9083.<IP address of Hive metastore>is the private IP address of the HMS service. To specify an external metastore service, see Connect to an external Hive Metastore service.thrift://192.168.**.**:9083File system
The metadata is stored in a file system.
spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore filesystem spark.sql.catalog.<catalogName>.warehouse oss://<yourBucketName>/warehouseThe following table describes the parameters.
Parameter
Description
Example
spark.sql.catalog.<catalogName>The catalog implementation.
Static field:
org.apache.paimon.spark.SparkCatalogspark.sql.catalog.<catalogName>.metastoreSpecifies the metastore type. Set the value to
filesystemto use a file system as the metastore.Static field:
filesystemspark.sql.catalog.<catalogName>.warehouseSpecifies the metadata storage path (warehouse path). In the code,
<yourBucketName>is the name of the bucket in OSS.oss://my-bucket/warehouseYou can also configure multiple catalogs, such as DLF, DLF-Legacy, and Hive, at the same time. The following code provides an example.
# Configure a DLF catalog spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore rest spark.sql.catalog.<catalogName>.uri http://cn-hangzhou-vpc.dlf.aliyuncs.com spark.sql.catalog.<catalogName>.warehouse <catalog_name> spark.sql.catalog.<catalogName>.token.provider dlf spark.sql.catalog.<catalogName>.dlf.access-key-id <access_key_id> spark.sql.catalog.<catalogName>.dlf.access-key-secret <access_key_secret> # Configure a dlf-legacy catalog spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore dlf spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name> spark.sql.catalog.<catalogName>.dlf.catalog.endpoint dlf-vpc.cn-hangzhou.aliyuncs.com # Configure a hive1 catalog spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore hive spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri-1>:<port> # Configure a hive2 catalog spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore hive spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri-2>:<port>Use the built-in catalog
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.spark_catalog org.apache.paimon.spark.SparkGenericCatalog
Step 2: Read from and write to tables based on a Paimon catalog and spark_catalog
Go to the SQL development page.
On the EMR Serverless Spark page, click Data Development in the navigation pane on the left.
On the Development tab, click the
icon.In the New dialog box, enter a name, such as users_task, leave the type as the default SparkSQL, and click OK.
Copy the following code to the new Spark SQL tab (users_task).
Use a Paimon catalog
-- Create a database. CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db; -- Create a Paimon table. CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon; -- Write data to the Paimon table. INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"), (3, "c"); -- Query the write results from the Paimon table. SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id; -- Drop the database. DROP DATABASE paimon.ss_paimon_db CASCADE;Use spark_catalog
-- Create a database. CREATE DATABASE IF NOT EXISTS ss_paimon_db; CREATE DATABASE IF NOT EXISTS ss_parquet_db; -- Create a Paimon table and a Parquet table. CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon; CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c"; -- Write data. INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"); INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl; -- Query the write results. SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id; SELECT * FROM ss_parquet_db.parquet_tbl; -- Drop the database. DROP DATABASE ss_paimon_db CASCADE; DROP DATABASE ss_parquet_db CASCADE;From the drop-down lists, select a database and the SQL session that you created.
Click Run to execute the task. The following information is returned.

FAQ
References
For a complete example of the SQL task development and orchestration process, see Quick Start for SparkSQL development.
For more information about Paimon usage and configurations, see the official Paimon documentation.
To specify an external metastore service, see Connect to an external Hive Metastore service.