全部产品
Search
文档中心

开源大数据平台E-MapReduce:使用Paimon

更新时间:Dec 22, 2025

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。本文为您介绍如何在EMR Serverless Spark中实现Paimon表的读取与写入操作。

前提条件

已创建工作空间,详情请参见创建工作空间

操作流程

步骤一:创建SQL会话

  1. 进入会话管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的会话管理

  2. SQL会话页面,单击创建SQL会话

  3. 创建 SQL 会话页面的Spark 配置区域,配置以下信息,单击创建。详情请参见管理SQL会话

    Spark对Paimon的读写基于Catalog,您可以根据具体场景进行选择。更多Catalog信息,请参见管理数据目录

    使用数据目录Catalog

    若采用数据目录Catalog方式,则无需在会话中配置参数,只需在数据目录页面单击添加数据目录,然后在SparkSQL开发中直接选择数据目录即可。

    说明

    推荐使用esr-4.3.0及以上版本、esr-3.3.0及以上版本、esr-2.7.0及以上版本的引擎。

    使用自定义Catalog

    DLF(原DLF 2.5)

    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       rest
    spark.sql.catalog.<catalogName>.uri                             http://cn-hangzhou-vpc.dlf.aliyuncs.com
    spark.sql.catalog.<catalogName>.warehouse                       <catalog_name>
    spark.sql.catalog.<catalogName>.token.provider                  dlf
    spark.sql.catalog.<catalogName>.dlf.access-key-id               <access_key_id>
    spark.sql.catalog.<catalogName>.dlf.access-key-secret           <access_key_secret>

    涉及参数如下所示。

    参数

    说明

    示例值

    spark.sql.catalog.<catalogName>

    catalog实现。

    固定值:org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    指定元数据存储方式,设置为rest表示使用DLF REST API。

    固定值:rest

    spark.sql.catalog.<catalogName>.uri

    指定DLF的URI,格式为http://<endpoint>-vpc.dlf.aliyuncs.com

    http://cn-hangzhou-vpc.dlf.aliyuncs.com

    spark.sql.catalog.<catalogName>.warehouse

    指定数据存储路径(Warehouse路径)。对于 DLF,需指定为Catalog名称。

    <catalog_name>

    spark.sql.catalog.<catalogName>.token.provider

    指定认证提供方,DLF 使用 dlf

    固定值:dlf

    spark.sql.catalog.<catalogName>.dlf.access-key-id

    阿里云账号或者RAM用户的AccessKey ID。

    <access_key_id>

    spark.sql.catalog.<catalogName>.dlf.access-key-secret

    阿里云账号或者RAM用户的AccessKey Secret。

    <access_key_secret>

    DLF-Legacy(原DLF1.0)

    元数据保存在DLF-Legacy(原DLF1.0)中。

    spark.sql.catalog.<catalogName>                          org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                dlf
    spark.sql.catalog.<catalogName>.dlf.catalog.id           <catalog_name>
    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint     dlf-vpc.cn-hangzhou.aliyuncs.com

    涉及参数说明如下所示。

    参数

    说明

    示例值

    spark.sql.catalog.<catalogname>

    catalog实现。

    固定值:org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogname>.metastore

    指定元数据存储方式,设置为dlf表示使用阿里云DLF作为元数据存储。

    固定值:dlf

    spark.sql.catalog.<catalogName>.dlf.catalog.id

    指定DLF中的Catalog名称。

    <catalog_name>

    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint

    指定DLF的访问端点地址,需根据您所在的区域选择正确的 DLF 端点。

    dlf-vpc.cn-hangzhou.aliyuncs.com

    Hive MetaStore

    元数据保存在指定的Hive MetaStore中。

    spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore       hive
    spark.sql.catalog.<catalogName>.uri             thrift://<yourHMSUri>:<port>

    涉及参数如下表所示。

    参数

    说明

    示例值

    spark.sql.catalog.<catalogName>

    catalog实现。

    固定值:org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    指定元数据存储方式,设置为 hive 表示使用 Hive MetaStore 作为元数据存储。

    固定值:hive

    spark.sql.catalog.<catalogName>.uri

    Hive MetaStore的URI。格式为thrift://<Hive metastore的IP地址>:9083

    <Hive metastore的IP地址>为HMS服务的内网IP地址。如果您需要指定外部Metastore服务,请参见连接外部Hive Metastore Service

    thrift://192.168.**.**:9083

    FileSystem

    元数据保存在文件系统中。

    spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore       filesystem
    spark.sql.catalog.<catalogName>.warehouse       oss://<yourBucketName>/warehouse

    涉及参数如下表所示。

    参数

    说明

    示例值

    spark.sql.catalog.<catalogName>

    catalog实现。

    固定值:org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    指定元数据存储方式,设置为filesystem表示使用文件系统作为元数据存储。

    固定值:filesystem

    spark.sql.catalog.<catalogName>.warehouse

    指定元数据存储路径(Warehouse 路径)。代码中的<yourBucketName>表示OSS上的Bucket名称。

    oss://my-bucket/warehouse

    您还可以同时配置多个Catalog,例如DLF、DLF-Legacy和Hive,具体信息请参见以下示例。

    # 配置dlf Catalog
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       rest
    spark.sql.catalog.<catalogName>.uri                             http://cn-hangzhou-vpc.dlf.aliyuncs.com
    spark.sql.catalog.<catalogName>.warehouse                       <catalog_name>
    spark.sql.catalog.<catalogName>.token.provider                  dlf
    spark.sql.catalog.<catalogName>.dlf.access-key-id               <access_key_id>
    spark.sql.catalog.<catalogName>.dlf.access-key-secret           <access_key_secret>
    
    # 配置dlf-legacy Catalog
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       dlf
    spark.sql.catalog.<catalogName>.dlf.catalog.id                  <catalog_name>
    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint            dlf-vpc.cn-hangzhou.aliyuncs.com
    
    
    # 配置hive1 Catalog
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       hive
    spark.sql.catalog.<catalogName>.uri                             thrift://<yourHMSUri-1>:<port>
    
    # 配置hive2 Catalog
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       hive
    spark.sql.catalog.<catalogName>.uri                             thrift://<yourHMSUri-2>:<port>

    使用内置Catalog

    spark.sql.extensions               org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    spark.sql.catalog.spark_catalog    org.apache.paimon.spark.SparkGenericCatalog

步骤二:基于Paimon Catalog和spark_catalog的表读写操作

  1. 进入SQL开发页面。

    EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击image图标。

  3. 新建对话框中,输入名称(例如users_task),类型使用默认的SparkSQL,然后单击确定

  4. 复制如下代码到新增的Spark SQL页签(users_task)中。

    使用Paimon Catalog

    -- 创建数据库
    CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;             
    
    -- 创建Paimon表
    CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    
    -- 写入Paimon表
    INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"), (3, "c");
    
    -- 查询 Paimon 表的写入结果
    SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;
    
    -- 删除数据库
    DROP DATABASE paimon.ss_paimon_db CASCADE;

    使用spark_catalog

    -- 创建数据库
    CREATE DATABASE IF NOT EXISTS ss_paimon_db; 
    CREATE DATABASE IF NOT EXISTS ss_parquet_db;
    
    -- 创建Paimon表和Parquet表
    CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c";
    
    -- 写入数据
    INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b");
    INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;
    
    -- 查询写入结果
    SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;
    SELECT * FROM ss_parquet_db.parquet_tbl;
    
    -- 删除数据库
    DROP DATABASE ss_paimon_db CASCADE;
    DROP DATABASE ss_parquet_db CASCADE;
  5. 在数据库下拉列表中选择一个数据库,在会话下拉列表中选择刚刚创建的SQL会话。

  6. 单击运行,执行任务。返回信息如下所示。

    image

常见问题

对表执行DELETEUPDATEMERGE等操作时报错,应该如何处理?

  • 问题现象:执行 DELETEUPDATEMERGE 操作时,出现以下类似错误信息。

    Caused by: org.apache.spark.sql.AnalysisException: Table does not support deletes/updates/merge: <tableName>.
        at org.apache.spark.sql.errors.QueryCompilationErrors$.tableDoesNotSupportError(QueryCompilationErrors.scala:1391)
  • 问题原因:该表的存储格式不支持行级别更新操作,或缺少必要的Spark配置。

  • 解决方法:

    1. 检查表类型。

      执行以下命令,确认该表是否为Paimon表。

      SHOW CREATE TABLE <tableName>;

      如果输出结果中包含 USING PAIMON,则表为 Paimon 表。如果输出中显示其他存储格式(如 USING hive),则需要确认该格式是否支持行级别更新操作。

    2. 确认Spark配置。

      如果所涉及的表为Paimon表,则应检查Spark 配置信息,以确保已添加以下配置,从而启用对Paimon的支持。

      spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

      如果没有添加此配置,请在Spark 配置中添加。

相关文档