All Products
Search
Document Center

E-MapReduce:Integrate Paimon with Spark

Last Updated:Mar 13, 2024

E-MapReduce (EMR) allows you to use the Spark SQL client to read data from and write data to Paimon. This topic describes how to use the Spark SQL client to read data from and write data to Paimon.

Prerequisites

A DataLake or custom cluster is created, and the Spark and Paimon services are selected when you create the cluster. For more information, see Create a cluster.

Limits

  • Only clusters of EMR V3.46.0, EMR V5.12.0, or a minor version later than EMR V3.46.0 or EMR V5.12.0 allow you to use the Spark SQL client to read data from and write data to Paimon and use the Spark CLI to read data from Paimon.

  • Only Spark SQL of Spark 3 allows you to use catalogs to read data from and write data to Paimon.

  • The Spark CLI allows you to read data from Paimon only by using the path of a file system or an object storage system.

Procedure

Step 1: Create a catalog

Paimon stores data and metadata in a file system such as Hadoop Distributed File System (HDFS) or an object storage system such as Object Storage Service (OSS). The root path for storage is specified by the warehouse parameter. If the specified root path does not exist, a root path is automatically created. If the specified root path exists, you can use the created catalog to access existing tables in the path.

You can synchronize metadata to Hive or Data Lake Formation (DLF). This way, other services can access data of Paimon by using Hive or DLF.

Create a file system catalog

A file system catalog stores metadata in a file system or an object storage system.

spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.metastore=filesystem --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse
Note
  • spark.sql.catalog.paimon: defines a catalog named paimon.

  • spark.sql.catalog.paimon.metastore: specifies the metadata storage type used by the catalog. If you set this parameter to filesystem, metadata is stored in your on-premises file system.

  • spark.sql.catalog.paimon.warehouse: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements.

Create a DLF catalog

A DLF catalog can synchronize metadata to DLF.

Important

When you create an EMR cluster, you must select DLF Unified Metadata for Metadata.

spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.metastore=dlf --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse
Note
  • spark.sql.catalog.paimon: defines a catalog named paimon.

  • spark.sql.catalog.paimon.metastore: specifies the metadata storage type used by the catalog. If you set this parameter to dlf, metadata is synchronized to DLF.

  • spark.sql.catalog.paimon.warehouse: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements.

Create a Hive catalog

A Hive catalog can synchronize metadata to Hive Metastore. Hive allows you to query data in tables that are created in a Hive catalog.

For information about how to query data of Paimon in Hive, see Integrate Paimon with Hive.

spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.metastore=hive --conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse
Note
  • spark.sql.catalog.paimon: defines a catalog named paimon.

  • spark.sql.catalog.paimon.metastore: specifies the metadata storage type used by the catalog. If you set this parameter to hive, metadata is synchronized to Hive Metastore.

  • spark.sql.catalog.paimon.uri: specifies the address and port number of Hive Metastore. If you set this parameter to thrift://master-1-1:9083, the Spark SQL client connects to Hive Metastore that runs on the master-1-1 node and whose listening port is 9083 to obtain metadata information.

  • spark.sql.catalog.paimon.warehouse: specifies the actual location of the data warehouse. Configure this parameter based on your business requirements.

Step 2: Use the Spark SQL client to read data from and write data to Paimon

Execute the following Spark SQL statements to create a Paimon table in the created catalog and read data from and write data to the table:

-- Create a test database in the created paimon catalog. 
CREATE DATABASE paimon.test_db;
USE paimon.test_db;

-- Create a Paimon table. 
CREATE TABLE test_tbl (
    uuid int,
    name string,
    price double
) TBLPROPERTIES (
    'primary-key' = 'uuid'
);

-- Write data to the Paimon table. 
INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);

-- Read data from the Paimon table. 
SELECT * FROM test_tbl;

The following query result is returned:

1       apple   3.5
2       banana  4.0
3       cherry  20.5

Step 3: Use the Spark CLI to read data from Paimon

  1. Run the following command to start the Spark CLI:

    spark-shell
  2. Run the following Scala code on the Spark CLI to query data from the Paimon table that is stored in the specified directory:

    val dataset = spark.read.format("paimon").load("oss://<yourBucketName>/warehouse/test_db.db/test_tbl")
    dataset.createOrReplaceTempView("test_tbl")
    spark.sql("SELECT * FROM test_tbl").show()
    Note
    • paimon: a fixed value. This value indicates that you use Paimon as a data storage format to read or write data.

    • oss://<yourBucketName>/warehouse/test_db.db/test_tbl: the path in which the Paimon table is stored. Replace the path based on your business requirements.

    The following output is returned:

    +----+------+-----+                                                             
    |uuid|  name|price|
    +----+------+-----+
    |   1| apple|  3.5|
    |   2|banana|  4.0|
    |   3|cherry| 20.5|
    +----+------+-----+