本文示範了使用Realtime ComputeFlink版和EMR Serverless Spark構建Paimon資料湖分析流程。該流程包括將資料寫入OSS、進行互動式查詢以及執行離線資料Compact操作。EMR Serverless Spark完全相容Paimon,通過內建的DLF中繼資料與其他雲產品(例如,Realtime ComputeFlink版)實現中繼資料互連,形成完整的流批一體化解決方案。它支援靈活的任務運行方式和參數配置,滿足即時分析和生產調度的多種需求。
背景資訊
Realtime ComputeFlink版
阿里雲Realtime ComputeFlink版是一種全託管Serverless的Flink雲端服務,是一站式開發營運管理平台,開箱即用,計費靈活。具備作業開發、資料調試、運行與監控、自動調優、智能診斷等全生命週期能力。更多資訊,請參見什麼是阿里雲Realtime ComputeFlink版。
Apache Paimon
Apache Paimon是一種統一的資料湖儲存格式,結合Flink和Spark構建了流批處理的即時湖倉一體架構。Paimon創新地將湖格式與LSM(Log-structured merge-tree)技術結合,使資料湖具備了即時資料流更新和完整的流處理能力。更多資訊,請參見Apache Paimon。
操作流程
步驟一:通過Realtime ComputeFlink建立Paimon Catalog
Paimon Catalog可以方便地管理同一個warehouse目錄下的所有Paimon表,並與其它阿里雲產品連通。建立並使用Paimon Catalog,詳情請參見管理Paimon Catalog。
單擊目標工作空間操作列下的控制台。
建立Paimon Catalog。
在左側導覽列,選擇。
建立查詢指令碼,填寫SQL代碼。
Catalog完整配置如下所示。
CREATE CATALOG `paimon` WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'warehouse' = '<warehouse>', 'dlf.catalog.id' = '<dlf.catalog.id>', 'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>', 'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>', 'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>', 'dlf.catalog.region' = '<dlf.catalog.region>', );配置項
說明
是否必填
備忘
paimon
Paimon Catalog名稱。
是
請填寫為自訂的英文名。
type
Catalog類型。
是
固定值為paimon。
metastore
中繼資料存放區類型。
是
本文樣本中繼資料存放區類型選擇dlf,通過DLF實現統一的中繼資料管理,實現多引擎無縫銜接。
warehouse
配置資料倉儲的實際位置。
是
請根據實際情況修改。
dlf.catalog.id
DLF資料目錄ID。
是
請在資料湖構建控制台上查看資料目錄對應的ID。
dlf.catalog.accessKeyId
訪問DLF服務所需的Access Key ID。
是
擷取方法請參見建立AccessKey。
dlf.catalog.accessKeySecret
訪問DLF服務所需的Access Key Secret。
是
擷取方法請參見建立AccessKey。
dlf.catalog.endpoint
DLF服務的Endpoint。
是
詳情請參見已開通的地區和訪問網域名稱。
說明如果Flink與DLF位於同一地區,則使用VPC網路Endpoint,否則使用公網Endpoint。
dlf.catalog.region
DLF所在地區。
是
詳情請參見已開通的地區和訪問網域名稱。
說明請和dlf.catalog.endpoint選擇的地區保持一致。
選擇或建立Session叢集。
單擊頁面右下角的執行環境,選擇對應版本的Session叢集(VVR 8.0.4及以上引擎版本)。如果沒有Session叢集,請參見步驟一:建立Session叢集。
選中目標程式碼片段後,單擊程式碼左側的運行。
建立Paimon表。
在查詢指令碼文本編輯地區輸入如下命令後,選中代碼後單擊運行。
CREATE TABLE IF NOT EXISTS `paimon`.`test_paimon_db`.`test_append_tbl` ( id STRING, data STRING, category INT, ts STRING, dt STRING, hh STRING ) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true' );建立流作業。
新增作業。
在左側導覽列,選擇。
建立流作業,在新增作業草稿對話方塊中,填寫作業配置資訊。
作業參數
說明
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
單擊建立。
編寫代碼。
在建立的作業草稿中,輸入以下代碼,通過datagen源源不斷產生資料寫入Paimon表中。
CREATE TEMPORARY TABLE datagen ( id string, data string, category int ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.category.kind' = 'random', 'fields.category.min' = '1', 'fields.category.max' = '10' ); INSERT INTO `paimon`.`test_paimon_db`.`test_append_tbl` SELECT id, data, category, cast(LOCALTIMESTAMP as string) as ts, cast(CURRENT_DATE as string) as dt, cast(hour(LOCALTIMESTAMP) as string) as hh FROM datagen;單擊部署,即可將資料發布至生產環境。
您可以在作業營運頁面啟動作業進入運行階段,詳情請參見作業啟動。
步驟二:通過EMR Serverless Spark建立SQL會話
建立的SQL會話用於SQL開發和查詢。有關會話的詳細介紹,請參見會話管理。
進入會話管理頁面。
在左側導覽列,選擇。
在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導覽列中的會話管理。
建立SQL會話。
在SQL會話頁簽,單擊建立SQL會話。
在建立SQL會話頁面,配置以下資訊,其餘參數無需配置,然後單擊建立。
參數
說明
名稱
自訂SQL會話的名稱。例如,paimon_compute。
Spark配置
請填寫以下Spark配置資訊,以串連Paimon。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>請根據您的實際情況替換以下資訊:
<warehouse>:配置資料倉儲的實際位置,請根據實際情況修改。<dlf.catalog.id>:DLF資料目錄ID,請根據實際情況修改。
單擊操作列的啟動。
步驟三:通過EMR Serverless Spark進行互動式查詢或任務調度
EMR Serverless Spark提供了互動式查詢和任務調度兩種操作模式,以滿足不同的使用需求。互動式查詢適用於快速查詢和調試,而任務調度則支援任務的開發、發布和營運,實現完整的生命週期管理。
在資料寫入過程中,我們可以隨時使用EMR Serverless Spark對Paimon表進行互動式查詢,以便即時擷取資料狀態和執行快速分析。此外,通過發布開發好的任務並建立工作流程,可以編排各項任務並完成工作流程的發布。您可以配置調度策略,實現任務的定期調度,從而保證資料處理和分析的自動化與高效性。
互動式查詢
建立SQL開發。
在EMR Serverless Spark頁面,單擊左側導覽列中的資料開發。
在開發目錄頁簽下,單擊建立。
在彈出的對話方塊中,輸入名稱(例如,paimon_compact),類型選擇為SparkSQL,然後單擊確定。
在右上方選擇資料目錄、資料庫和前一步驟中啟動的SQL會話。
在建立的任務編輯器中輸入SQL語句。
樣本1:查詢
test_append_tbl表中前10行的資料。SELECT * FROM paimon.test_paimon_db.test_append_tbl limit 10;返回結果樣本如下。

樣本2:統計
test_append_tbl表中滿足特定條件的行數。SELECT COUNT(*) FROM paimon.test_paimon_db.test_append_tbl WHERE dt = '2024-06-24' AND hh = '19';返回結果樣本如下。

運行並發布任務。
單擊運行。
返回結果資訊可以在下方的運行結果中查看。如果有異常,則可以在運行問題中查看。
確認運行無誤後,單擊右上方的發布。
在發布對話方塊中,可以輸入發布資訊,然後單擊確定。
任務調度
查詢Compact前檔案資訊。
在資料開發頁面,建立SQL開發,查詢Paimon的files系統資料表,快速地得到Compact前檔案的資料。建立SQL開發的具體操作,請參見SparkSQL開發。
SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';
在資料開發頁面,編寫Paimon Compact SQL(例如,paimon_compact),然後完成發布。
建立SQL開發的具體操作,請參見SparkSQL開發。
CALL paimon.sys.compact ( table => 'test_paimon_db.test_append_tbl', partitions => 'dt=\"2024-06-24\",hh=\"19\"', order_strategy => 'zorder', order_by => 'category' );建立工作流程。
在EMR Serverless Spark頁面,單擊左側導覽列中的任務編排。
在任務編排頁面,單擊建立工作流程。
在建立工作流程面板中,輸入工作流程名稱(例如,paimon_workflow_task),然後單擊下一步。
其他設定地區的參數,請根據您的實際情況配置,更多參數資訊請參見管理工作流程。
在建立的節點畫布中,單擊添加節點。
在來源檔案路徑下拉式清單中選擇發行的SQL開發(paimon_compact),填寫Spark配置參數,然後單擊儲存。
參數
說明
名稱
自訂SQL會話的名稱。例如,paimon_compute。
Spark配置
請填寫以下Spark配置資訊,以串連Paimon。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>請根據您的實際情況替換以下資訊:
<warehouse>:配置資料倉儲的實際位置,請根據實際情況修改。<dlf.catalog.id>:DLF資料目錄ID,請根據實際情況修改。
在建立的節點畫布中,單擊發布工作流程,然後單擊確定。
運行工作流程。
在任務編排頁面,單擊建立工作流程(例如,paimon_workflow_task)的工作流程名稱。
在工作流程執行個體列表頁面,單擊手動運行。
在觸發運行對話方塊中,單擊確定。
驗證Compact效果。
工作流程調度執行成功後,再次執行與開始相同的SQL查詢,對比Compact前後檔案的數量、記錄數和大小,以驗證Compact操作的效果。
SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';