Use Spark SQL to read from and write to Apache Paimon tables stored on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS) in an EMR cluster.
Prerequisites
Before you begin, make sure that you have:
-
A DataLake or custom cluster with both Spark and Paimon services enabled. For details, see Create a cluster.
Limitations
| Constraint | Detail |
|---|---|
| Supported EMR versions | EMR V3.46.0, EMR V5.12.0, and later minor versions |
| Spark version requirement | Catalog-based access requires Spark SQL of Spark 3 |
Choose a catalog type
Spark accesses Paimon data through a catalog. Two catalog types are supported:
| Catalog type | Class | Supports non-Paimon tables | When to use |
|---|---|---|---|
| Paimon catalog | org.apache.paimon.spark.SparkCatalog |
No | Paimon-only workloads |
| spark_catalog | org.apache.paimon.spark.SparkGenericCatalog |
Yes | Mixed Paimon and non-Paimon tables (such as Parquet) from the same catalog |
After you add Paimon to a cluster, spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions is automatically added to the Spark service configuration. To verify, go to the Services tab of your cluster, click Configure for Spark, and search by configuration item name for spark.sql.extensions.
Step 1: Configure a catalog
Log on to the master node of your cluster over SSH. For details, see Log on to a cluster.
Use a Paimon catalog
A Paimon catalog manages Paimon metadata exclusively. The warehouse root path is set by spark.sql.catalog.paimon.warehouse. If the path does not exist, it is created automatically. If it already exists, the catalog provides access to any tables in that path.
Choose a metadata storage type based on how you want to manage metadata.
File system catalog
A file system catalog stores metadata directly in a file system or object storage.
spark-sql \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=filesystem \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Replace <yourBucketName> with your OSS bucket name. To create a bucket, see Create a bucket.
DLF catalog
A Data Lake Formation (DLF) catalog synchronizes Paimon metadata to DLF, making tables accessible from other DLF-integrated services.
Select DLF Unified Metadata for Metadata when creating the EMR cluster.
spark-sql \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=dlf \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Replace <yourBucketName> with your OSS bucket name.
Hive catalog
A Hive catalog synchronizes Paimon metadata to Hive Metastore. This lets Hive query the same Paimon tables. For details, see Integrate Paimon with Hive.
spark-sql \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=hive \
--conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Key parameters:
| Parameter | Description |
|---|---|
spark.sql.catalog.paimon.uri |
Hive Metastore address. thrift://master-1-1:9083 connects to Hive Metastore on the master-1-1 node at port 9083. |
spark.sql.catalog.paimon.warehouse |
OSS path for the data warehouse. Replace <yourBucketName> with your bucket name. |
Use spark_catalog
spark_catalog is Spark's default built-in catalog. It can manage both Paimon and non-Paimon tables. The warehouse root path is set by spark.sql.warehouse.dir and does not need to change in most cases.
spark-sql \
--conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
Step 2: Read and write Paimon tables
With a Paimon catalog
Reference tables using the paimon.<db_name>.<tbl_name> format.
-- Create a database.
CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;
-- Create a Paimon table.
CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
-- Write data.
INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana"), (3, "cherry");
-- Query data.
SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;
-- Clean up.
DROP DATABASE paimon.ss_paimon_db CASCADE;
If you see metastore: Failed to connect to the MetaStore Server after configuring a Hive catalog, Hive Metastore is not running. Start it with the following command, then reconfigure the Hive catalog.
hive --service metastore &
If you selected DLF Unified Metadata when creating your cluster, use a DLF catalog to synchronize Paimon metadata to DLF.
With spark_catalog
Reference tables using spark_catalog.<db_name>.<tbl_name>, or omit the catalog name since spark_catalog is the default. Both Paimon and non-Paimon tables are accessible from the same catalog.
-- Create databases.
CREATE DATABASE IF NOT EXISTS ss_paimon_db;
CREATE DATABASE IF NOT EXISTS ss_parquet_db;
-- Create a Paimon table and a Parquet table.
CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "cherry";
-- Write to the Paimon table from values and from another table.
INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana");
INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;
-- Query data.
SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;
-- Clean up.
DROP DATABASE ss_paimon_db CASCADE;
DROP DATABASE ss_parquet_db CASCADE;
Expected output:
1 apple
2 banana
3 cherry
FAQ
Can I use Spark Shell instead of Spark SQL?
Yes. Start Spark Shell with spark-shell, then use the paimon format to load a table directly by its OSS path:
val dataset = spark.read.format("paimon").load("oss://<yourBucketName>/warehouse/test_db.db/test_tbl")
dataset.createOrReplaceTempView("test_tbl")
spark.sql("INSERT INTO test_tbl VALUES (4, 'apple1', 3.5), (5, 'banana1', 4.0), (6, 'cherry1', 20.5)")
spark.sql("SELECT * FROM test_tbl").show()
Replace <yourBucketName> with your bucket name and update the path to match your table location.
What's next
-
For Paimon concepts and advanced SQL DDL, see Apache Paimon documentation.
-
To access Paimon tables from Hive, see Integrate Paimon with Hive.