您可以藉助Paimon快速地在HDFS或者OSS上構建自己的資料湖儲存服務,然後通過Spark計算引擎實現資料湖的分析。本文為您介紹在EMR中如何通過Spark SQL讀取和寫入Paimon中的資料。
前提條件
已建立選擇了Spark和Paimon的DataLake或Custom類型的叢集,建立叢集詳情請參見建立叢集。
使用限制
EMR-3.46.0及後續版本、EMR-5.12.0及後續版本的叢集,支援Spark SQL對Paimon進行讀寫操作。
僅Spark3的Spark SQL可以通過Catalog讀寫Paimon中的資料。
操作步驟
步驟一:配置Catalog
Spark可以通過Catalog讀寫Paimon表,其中Catalog包括Paimon Catalog和spark_catalog兩種類型。您可以根據具體情境進行選擇。
Paimon Catalog:用於管理Paimon格式的中繼資料,只能用於查詢和寫入Paimon表。
spark_catalog:Spark預設內建Catalog,通常用於管理Spark SQL內部表的中繼資料,可以用於查詢和寫入Paimon表或者非Paimon表。
使用Paimon Catalog
您可以將中繼資料儲存在檔案系統(如HDFS)或Object Storage Service(如OSS)中,也可以將中繼資料同步到DLF和Hive中,方便其他服務訪問Paimon。
儲存的根路徑由spark.sql.catalog.paimon.warehouse參數指定。如果根路徑不存在,將會自動建立該路徑;如果根路徑存在,您可以通過該Catalog訪問路徑中已有的表。
通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點。
根據中繼資料類型選擇要配置的Catalog。執行對應的命令,啟動Spark SQL。
配置Filesystem Catalog
Filesystem Catalog會將中繼資料儲存在檔案系統或Object Storage Service中。
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=filesystem \ --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions說明spark.sql.catalog.paimon:定義了名為paimon的Catalog。spark.sql.catalog.paimon.metastore:指定Catalog使用的中繼資料存放區類型。設定為filesystem意味著中繼資料存放區在本地檔案系統。spark.sql.catalog.paimon.warehouse:配置資料倉儲的實際位置,請根據實際情況修改。其中<yourBucketName>為OSS的儲存空間,建立操作請參見建立儲存空間。
配置DLF Catalog
DLF Catalog會將中繼資料同步到DLF中。
重要建立叢集時,中繼資料必須是DLF統一中繼資料。
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=dlf \ --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions說明spark.sql.catalog.paimon:定義了名為paimon的Catalog。spark.sql.catalog.paimon.metastore:指定Catalog使用的中繼資料存放區類型。設定為dlf意味著將資料同步到DLF(Data Lake Formation)中。spark.sql.catalog.paimon.warehouse:配置資料倉儲的實際位置,請根據實際情況修改。其中<yourBucketName>為OSS的儲存空間,建立操作請參見建立儲存空間。
配置Hive Catalog
Hive Catalog會同步中繼資料到Hive MetaStore中。在Hive Catalog中建立的表可以直接在Hive中查詢。Hive查詢Paimon,詳情請參見Paimon與Hive整合。
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=hive \ --conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 \ --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions說明spark.sql.catalog.paimon:定義了名為paimon的Catalog。spark.sql.catalog.paimon.metastore:指定Catalog使用的中繼資料存放區類型。設定為hive意味著將中繼資料同步到Hive Metastore中。spark.sql.catalog.paimon.uri:為Hive MetaStore Service的地址和連接埠。參數值為thrift://master-1-1:9083,這意味著Spark SQL將串連到這個運行在master-1-1主機上、監聽9083連接埠的Hive Metastore服務以擷取中繼資料資訊。spark.sql.catalog.paimon.warehouse:配置資料倉儲的實際位置,請根據實際情況修改。其中<yourBucketName>為OSS的儲存空間,建立操作請參見建立儲存空間。
使用spark_catalog
通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點。
執行下面的命令配置Catalog,啟動Spark SQL。
spark-sql --conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions說明spark.sql.catalog.spark_catalog:定義了名為spark_catalog的Catalog。spark_catalog中用來儲存的根路徑由
spark.sql.warehouse.dir參數指定,一般不需要修改。
步驟二:讀寫Paimon表
執行以下Spark SQL語句,在Catalog中建立一張表,並讀寫表中的資料。
使用Paimon Catalog
訪問Paimon表需要使用paimon.<db_name>.<tbl_name>,其中<db_name>為資料庫的名字,<tbl_name>為表的名字。
-- 建立資料庫
CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;
-- 建立Paimon表
CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
-- 寫入Paimon表
INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana"), (3, "cherry");
-- 查詢寫入結果
SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;
-- 刪除資料庫
DROP DATABASE paimon.ss_paimon_db CASCADE;配置Hive Catalog後,建立資料庫時報metastore: Failed to connect to the MetaStore Server錯誤,說明Hive MetaStore服務沒有啟動,需要執行以下命令啟動。啟動成功後,再執行配置Hive Catalog的命令。
hive --service metastore &如果您建立叢集時中繼資料選擇DLF統一中繼資料,建議將中繼資料同步到DLF,配置DLF Catalog。
使用spark_catalog
無論是訪問Paimon表還是非Paimon表,都可以使用spark_catalog.<db_name>.<tbl_name>。由於spark_catalog是Spark預設內建Catalog,因此可以忽略不寫,直接通過<db_name>.<tbl_name>訪問表。其中<db_name>為資料庫的名字,<tbl_name>為表的名字。
-- 建立資料庫
CREATE DATABASE IF NOT EXISTS ss_paimon_db;
CREATE DATABASE IF NOT EXISTS ss_parquet_db;
-- 建立Paimon表和Parquet表
CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "cherry";
-- 寫入Paimon表
INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana");
INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;
-- 查詢寫入結果
SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;
-- 刪除資料庫
DROP DATABASE ss_paimon_db CASCADE;
DROP DATABASE ss_parquet_db CASCADE;查詢結果如下。
1 apple
2 banana
3 cherry 常見問題
相關文檔
更多Paimon相關用法和配置,請參見Paimon官方文檔。
