全部產品
Search
文件中心

E-MapReduce:Paimon與Spark整合

更新時間:Jan 23, 2025

您可以藉助Paimon快速地在HDFS或者OSS上構建自己的資料湖儲存服務,然後通過Spark計算引擎實現資料湖的分析。本文為您介紹在EMR中如何通過Spark SQL讀取和寫入Paimon中的資料。

前提條件

已建立選擇了Spark和Paimon的DataLake或Custom類型的叢集,建立叢集詳情請參見建立叢集

使用限制

  • EMR-3.46.0及後續版本、EMR-5.12.0及後續版本的叢集,支援Spark SQL對Paimon進行讀寫操作。

  • 僅Spark3的Spark SQL可以通過Catalog讀寫Paimon中的資料。

操作步驟

步驟一:配置Catalog

Spark可以通過Catalog讀寫Paimon表,其中Catalog包括Paimon Catalog和spark_catalog兩種類型。您可以根據具體情境進行選擇。

  • Paimon Catalog:用於管理Paimon格式的中繼資料,只能用於查詢和寫入Paimon表。

  • spark_catalog:Spark預設內建Catalog,通常用於管理Spark SQL內部表的中繼資料,可以用於查詢和寫入Paimon表或者非Paimon表。

使用Paimon Catalog

您可以將中繼資料儲存在檔案系統(如HDFS)或Object Storage Service(如OSS)中,也可以將中繼資料同步到DLF和Hive中,方便其他服務訪問Paimon。

儲存的根路徑由spark.sql.catalog.paimon.warehouse參數指定。如果根路徑不存在,將會自動建立該路徑;如果根路徑存在,您可以通過該Catalog訪問路徑中已有的表。

  1. 通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點

  2. 根據中繼資料類型選擇要配置的Catalog。執行對應的命令,啟動Spark SQL。

    配置Filesystem Catalog

    Filesystem Catalog會將中繼資料儲存在檔案系統或Object Storage Service中。

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=filesystem \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    說明
    • spark.sql.catalog.paimon:定義了名為paimon的Catalog。

    • spark.sql.catalog.paimon.metastore:指定Catalog使用的中繼資料存放區類型。設定為filesystem意味著中繼資料存放區在本地檔案系統。

    • spark.sql.catalog.paimon.warehouse:配置資料倉儲的實際位置,請根據實際情況修改。其中<yourBucketName>為OSS的儲存空間,建立操作請參見建立儲存空間

    配置DLF Catalog

    DLF Catalog會將中繼資料同步到DLF中。

    重要

    建立叢集時,中繼資料必須是DLF統一中繼資料

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=dlf \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    說明
    • spark.sql.catalog.paimon:定義了名為paimon的Catalog。

    • spark.sql.catalog.paimon.metastore:指定Catalog使用的中繼資料存放區類型。設定為dlf意味著將資料同步到DLF(Data Lake Formation)中。

    • spark.sql.catalog.paimon.warehouse:配置資料倉儲的實際位置,請根據實際情況修改。其中<yourBucketName>為OSS的儲存空間,建立操作請參見建立儲存空間

    配置Hive Catalog

    Hive Catalog會同步中繼資料到Hive MetaStore中。在Hive Catalog中建立的表可以直接在Hive中查詢。Hive查詢Paimon,詳情請參見Paimon與Hive整合

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    說明
    • spark.sql.catalog.paimon:定義了名為paimon的Catalog。

    • spark.sql.catalog.paimon.metastore:指定Catalog使用的中繼資料存放區類型。設定為hive意味著將中繼資料同步到Hive Metastore中。

    • spark.sql.catalog.paimon.uri:為Hive MetaStore Service的地址和連接埠。參數值為thrift://master-1-1:9083,這意味著Spark SQL將串連到這個運行在master-1-1主機上、監聽9083連接埠的Hive Metastore服務以擷取中繼資料資訊。

    • spark.sql.catalog.paimon.warehouse:配置資料倉儲的實際位置,請根據實際情況修改。其中<yourBucketName>為OSS的儲存空間,建立操作請參見建立儲存空間

使用spark_catalog

  1. 通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點

  2. 執行下面的命令配置Catalog,啟動Spark SQL。

    spark-sql --conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    說明
    • spark.sql.catalog.spark_catalog:定義了名為spark_catalog的Catalog。

    • spark_catalog中用來儲存的根路徑由spark.sql.warehouse.dir參數指定,一般不需要修改。

步驟二:讀寫Paimon表

執行以下Spark SQL語句,在Catalog中建立一張表,並讀寫表中的資料。

使用Paimon Catalog

訪問Paimon表需要使用paimon.<db_name>.<tbl_name>,其中<db_name>為資料庫的名字,<tbl_name>為表的名字。

-- 建立資料庫
CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;

-- 建立Paimon表
CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;

-- 寫入Paimon表
INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana"), (3, "cherry");

-- 查詢寫入結果
SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;

-- 刪除資料庫
DROP DATABASE paimon.ss_paimon_db CASCADE;
說明

配置Hive Catalog後,建立資料庫時報metastore: Failed to connect to the MetaStore Server錯誤,說明Hive MetaStore服務沒有啟動,需要執行以下命令啟動。啟動成功後,再執行配置Hive Catalog的命令。

hive --service metastore &

如果您建立叢集時中繼資料選擇DLF統一中繼資料,建議將中繼資料同步到DLF,配置DLF Catalog。

使用spark_catalog

無論是訪問Paimon表還是非Paimon表,都可以使用spark_catalog.<db_name>.<tbl_name>。由於spark_catalog是Spark預設內建Catalog,因此可以忽略不寫,直接通過<db_name>.<tbl_name>訪問表。其中<db_name>為資料庫的名字,<tbl_name>為表的名字。

-- 建立資料庫
CREATE DATABASE IF NOT EXISTS ss_paimon_db;
CREATE DATABASE IF NOT EXISTS ss_parquet_db;

-- 建立Paimon表和Parquet表
CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "cherry";

-- 寫入Paimon表
INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana");
INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;

-- 查詢寫入結果
SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;

-- 刪除資料庫
DROP DATABASE ss_paimon_db CASCADE;
DROP DATABASE ss_parquet_db CASCADE;

查詢結果如下。

1       apple   
2       banana
3       cherry 

常見問題

叢集添加Paimon組件後,會自動增加參數配置spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions嗎?

會自動增加。叢集添加Paimon組件後,您可以通過以下操作查看配置資訊。

  1. 進入目的地組群的叢集服務頁簽。

  2. 查看Spark服務的配置資訊。

    1. 單擊Spark服務右側的配置

    2. 配置項名稱輸入框搜尋spark.sql.extensions,可以查看到配置資訊。

      image

通過Spark Shell可以讀寫Paimon中的資料嗎?

可以的。通過Spark Shell讀寫Paimon中的資料可參考下面的步驟。

  1. 執行以下命令,啟動Spark Shell。

    spark-shell
  2. 在Spark Shell中運行以下Scala代碼,寫入並查詢指定目錄下儲存的Paimon表。

    val dataset = spark.read.format("paimon").load("oss://<yourBucketName>/warehouse/test_db.db/test_tbl")
    dataset.createOrReplaceTempView("test_tbl")
    spark.sql("INSERT INTO test_tbl VALUES (4, 'apple1', 3.5), (5, 'banana1', 4.0), (6, 'cherry1', 20.5)")
    spark.sql("SELECT * FROM test_tbl").show()
    說明
    • paimon:固定值。表明您正在使用Paimon作為資料存放區格式來讀取或寫入資料。

    • oss://<yourBucketName>/warehouse/test_db.db/test_tbl:Paimon表所在路徑,請根據實際情況進行替換。其中<yourBucketName>為OSS的儲存空間。

相關文檔

更多Paimon相關用法和配置,請參見Paimon官方文檔