Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。本文为您介绍如何在EMR Serverless Spark中实现Paimon表的读取与写入操作。
前提条件
已创建工作空间,详情请参见创建工作空间。
操作流程
步骤一:创建SQL会话
进入会话管理页面。
在左侧导航栏,选择。
在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
在SQL会话页面,单击创建SQL会话。
在创建 SQL 会话页面的Spark 配置区域,配置以下信息,单击创建。详情请参见管理SQL会话。
Spark对Paimon的读写基于Catalog,您可以根据具体场景进行选择。更多Catalog信息,请参见管理数据目录。
使用数据目录Catalog
若采用数据目录Catalog方式,则无需在会话中配置参数,只需在数据目录页面单击添加数据目录,然后在SparkSQL开发中直接选择数据目录即可。
说明推荐使用esr-4.3.0及以上版本、esr-3.3.0及以上版本、esr-2.7.0及以上版本的引擎。
使用自定义Catalog
DLF(原DLF 2.5)
spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore rest spark.sql.catalog.<catalogName>.uri http://cn-hangzhou-vpc.dlf.aliyuncs.com spark.sql.catalog.<catalogName>.warehouse <catalog_name> spark.sql.catalog.<catalogName>.token.provider dlf spark.sql.catalog.<catalogName>.dlf.access-key-id <access_key_id> spark.sql.catalog.<catalogName>.dlf.access-key-secret <access_key_secret>涉及参数如下所示。
参数
说明
示例值
spark.sql.catalog.<catalogName>catalog实现。
固定值:
org.apache.paimon.spark.SparkCatalogspark.sql.catalog.<catalogName>.metastore指定元数据存储方式,设置为
rest表示使用DLF REST API。固定值:
restspark.sql.catalog.<catalogName>.uri指定DLF的URI,格式为
http://<endpoint>-vpc.dlf.aliyuncs.com。http://cn-hangzhou-vpc.dlf.aliyuncs.comspark.sql.catalog.<catalogName>.warehouse指定数据存储路径(Warehouse路径)。对于 DLF,需指定为Catalog名称。
<catalog_name>spark.sql.catalog.<catalogName>.token.provider指定认证提供方,DLF 使用
dlf。固定值:
dlfspark.sql.catalog.<catalogName>.dlf.access-key-id阿里云账号或者RAM用户的AccessKey ID。
<access_key_id>spark.sql.catalog.<catalogName>.dlf.access-key-secret阿里云账号或者RAM用户的AccessKey Secret。
<access_key_secret>DLF-Legacy(原DLF1.0)
元数据保存在DLF-Legacy(原DLF1.0)中。
spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore dlf spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name> spark.sql.catalog.<catalogName>.dlf.catalog.endpoint dlf-vpc.cn-hangzhou.aliyuncs.com涉及参数说明如下所示。
参数
说明
示例值
spark.sql.catalog.<catalogname>catalog实现。
固定值:
org.apache.paimon.spark.SparkCatalogspark.sql.catalog.<catalogname>.metastore指定元数据存储方式,设置为
dlf表示使用阿里云DLF作为元数据存储。固定值:
dlfspark.sql.catalog.<catalogName>.dlf.catalog.id指定DLF中的Catalog名称。
<catalog_name>spark.sql.catalog.<catalogName>.dlf.catalog.endpoint指定DLF的访问端点地址,需根据您所在的区域选择正确的 DLF 端点。
dlf-vpc.cn-hangzhou.aliyuncs.comHive MetaStore
元数据保存在指定的Hive MetaStore中。
spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore hive spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri>:<port>涉及参数如下表所示。
参数
说明
示例值
spark.sql.catalog.<catalogName>catalog实现。
固定值:
org.apache.paimon.spark.SparkCatalogspark.sql.catalog.<catalogName>.metastore指定元数据存储方式,设置为
hive表示使用 Hive MetaStore 作为元数据存储。固定值:
hivespark.sql.catalog.<catalogName>.uriHive MetaStore的URI。格式为
thrift://<Hive metastore的IP地址>:9083。<Hive metastore的IP地址>为HMS服务的内网IP地址。如果您需要指定外部Metastore服务,请参见连接外部Hive Metastore Service。thrift://192.168.**.**:9083FileSystem
元数据保存在文件系统中。
spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore filesystem spark.sql.catalog.<catalogName>.warehouse oss://<yourBucketName>/warehouse涉及参数如下表所示。
参数
说明
示例值
spark.sql.catalog.<catalogName>catalog实现。
固定值:
org.apache.paimon.spark.SparkCatalogspark.sql.catalog.<catalogName>.metastore指定元数据存储方式,设置为
filesystem表示使用文件系统作为元数据存储。固定值:
filesystemspark.sql.catalog.<catalogName>.warehouse指定元数据存储路径(Warehouse 路径)。代码中的
<yourBucketName>表示OSS上的Bucket名称。oss://my-bucket/warehouse您还可以同时配置多个Catalog,例如DLF、DLF-Legacy和Hive,具体信息请参见以下示例。
# 配置dlf Catalog spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore rest spark.sql.catalog.<catalogName>.uri http://cn-hangzhou-vpc.dlf.aliyuncs.com spark.sql.catalog.<catalogName>.warehouse <catalog_name> spark.sql.catalog.<catalogName>.token.provider dlf spark.sql.catalog.<catalogName>.dlf.access-key-id <access_key_id> spark.sql.catalog.<catalogName>.dlf.access-key-secret <access_key_secret> # 配置dlf-legacy Catalog spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore dlf spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name> spark.sql.catalog.<catalogName>.dlf.catalog.endpoint dlf-vpc.cn-hangzhou.aliyuncs.com # 配置hive1 Catalog spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore hive spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri-1>:<port> # 配置hive2 Catalog spark.sql.catalog.<catalogName> org.apache.paimon.spark.SparkCatalog spark.sql.catalog.<catalogName>.metastore hive spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri-2>:<port>使用内置Catalog
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.spark_catalog org.apache.paimon.spark.SparkGenericCatalog
步骤二:基于Paimon Catalog和spark_catalog的表读写操作
进入SQL开发页面。
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击
图标。在新建对话框中,输入名称(例如users_task),类型使用默认的SparkSQL,然后单击确定。
复制如下代码到新增的Spark SQL页签(users_task)中。
使用Paimon Catalog
-- 创建数据库 CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db; -- 创建Paimon表 CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon; -- 写入Paimon表 INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"), (3, "c"); -- 查询 Paimon 表的写入结果 SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id; -- 删除数据库 DROP DATABASE paimon.ss_paimon_db CASCADE;使用spark_catalog
-- 创建数据库 CREATE DATABASE IF NOT EXISTS ss_paimon_db; CREATE DATABASE IF NOT EXISTS ss_parquet_db; -- 创建Paimon表和Parquet表 CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon; CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c"; -- 写入数据 INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"); INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl; -- 查询写入结果 SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id; SELECT * FROM ss_parquet_db.parquet_tbl; -- 删除数据库 DROP DATABASE ss_paimon_db CASCADE; DROP DATABASE ss_parquet_db CASCADE;在数据库下拉列表中选择一个数据库,在会话下拉列表中选择刚刚创建的SQL会话。
单击运行,执行任务。返回信息如下所示。

常见问题
相关文档
SQL任务和任务编排完整的开发流程示例,请参见SparkSQL开发快速入门。
更多Paimon相关用法和配置,请参见Paimon官方文档。
如果需要指定外部Metastore服务,请参见连接外部Hive Metastore Service。