全部產品
Search
文件中心

E-MapReduce:基於Flink、EMR Serverless Spark與Paimon構建流批一體資料湖分析

更新時間:Sep 20, 2025

本文示範了使用Realtime ComputeFlink版和EMR Serverless Spark構建Paimon資料湖分析流程。該流程包括將資料寫入OSS、進行互動式查詢以及執行離線資料Compact操作。EMR Serverless Spark完全相容Paimon,通過內建的DLF中繼資料與其他雲產品(例如,Realtime ComputeFlink版)實現中繼資料互連,形成完整的流批一體化解決方案。它支援靈活的任務運行方式和參數配置,滿足即時分析和生產調度的多種需求。

背景資訊

Realtime ComputeFlink版

阿里雲Realtime ComputeFlink版是一種全託管Serverless的Flink雲端服務,是一站式開發營運管理平台,開箱即用,計費靈活。具備作業開發、資料調試、運行與監控、自動調優、智能診斷等全生命週期能力。更多資訊,請參見什麼是阿里雲Realtime ComputeFlink版

Apache Paimon

Apache Paimon是一種統一的資料湖儲存格式,結合Flink和Spark構建了流批處理的即時湖倉一體架構。Paimon創新地將湖格式與LSM(Log-structured merge-tree)技術結合,使資料湖具備了即時資料流更新和完整的流處理能力。更多資訊,請參見Apache Paimon

操作流程

步驟一:通過Realtime ComputeFlink建立Paimon Catalog

Paimon Catalog可以方便地管理同一個warehouse目錄下的所有Paimon表,並與其它阿里雲產品連通。建立並使用Paimon Catalog,詳情請參見管理Paimon Catalog

  1. 登入Realtime Compute控制台

  2. 單擊目標工作空間操作列下的控制台

  3. 建立Paimon Catalog。

    1. 在左側導覽列,選擇資料開發 > 資料查詢

    2. 建立查詢指令碼,填寫SQL代碼。

      Catalog完整配置如下所示。

      CREATE CATALOG `paimon` WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'warehouse' = '<warehouse>',
        'dlf.catalog.id' = '<dlf.catalog.id>',
        'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
        'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
        'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
        'dlf.catalog.region' = '<dlf.catalog.region>',
      );

      配置項

      說明

      是否必填

      備忘

      paimon

      Paimon Catalog名稱。

      請填寫為自訂的英文名。

      type

      Catalog類型。

      固定值為paimon。

      metastore

      中繼資料存放區類型。

      本文樣本中繼資料存放區類型選擇dlf,通過DLF實現統一的中繼資料管理,實現多引擎無縫銜接。

      warehouse

      配置資料倉儲的實際位置。

      請根據實際情況修改。

      dlf.catalog.id

      DLF資料目錄ID。

      請在資料湖構建控制台上查看資料目錄對應的ID。

      dlf.catalog.accessKeyId

      訪問DLF服務所需的Access Key ID。

      擷取方法請參見建立AccessKey

      dlf.catalog.accessKeySecret

      訪問DLF服務所需的Access Key Secret。

      擷取方法請參見建立AccessKey

      dlf.catalog.endpoint

      DLF服務的Endpoint。

      詳情請參見已開通的地區和訪問網域名稱

      說明

      如果Flink與DLF位於同一地區,則使用VPC網路Endpoint,否則使用公網Endpoint。

      dlf.catalog.region

      DLF所在地區。

      詳情請參見已開通的地區和訪問網域名稱

      說明

      請和dlf.catalog.endpoint選擇的地區保持一致。

    3. 選擇或建立Session叢集。

      單擊頁面右下角的執行環境,選擇對應版本的Session叢集(VVR 8.0.4及以上引擎版本)。如果沒有Session叢集,請參見步驟一:建立Session叢集

    4. 選中目標程式碼片段後,單擊程式碼左側的運行

  4. 建立Paimon表。

    查詢指令碼文本編輯地區輸入如下命令後,選中代碼後單擊運行

    CREATE TABLE IF NOT EXISTS `paimon`.`test_paimon_db`.`test_append_tbl`
    (
        id       STRING,
        data     STRING,
        category INT,
        ts       STRING,
        dt       STRING,
        hh       STRING
    ) PARTITIONED BY (dt, hh)
    WITH (
        'write-only' = 'true'
    );
  5. 建立流作業。

    1. 新增作業。

      1. 在左側導覽列,選擇資料開發 > ETL

      2. 建立流作業,在新增作業草稿對話方塊中,填寫作業配置資訊。

        作業參數

        說明

        檔案名稱

        作業的名稱。

        說明

        作業名稱在當前專案中必須保持唯一。

        引擎版本

        當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

      3. 單擊建立

    2. 編寫代碼。

      在建立的作業草稿中,輸入以下代碼,通過datagen源源不斷產生資料寫入Paimon表中。

      CREATE TEMPORARY TABLE datagen
      (
          id        string,
          data      string,
          category  int
      )
      WITH (
          'connector' = 'datagen',
          'rows-per-second' = '100',
          'fields.category.kind' = 'random',
          'fields.category.min' = '1',
          'fields.category.max' = '10'
      );
      INSERT INTO `paimon`.`test_paimon_db`.`test_append_tbl`
      SELECT
          id,
          data,
          category,
          cast(LOCALTIMESTAMP as string) as ts,
          cast(CURRENT_DATE as string) as dt,
          cast(hour(LOCALTIMESTAMP) as string) as hh
      FROM datagen;
    3. 單擊部署,即可將資料發布至生產環境。

    4. 您可以在作業營運頁面啟動作業進入運行階段,詳情請參見作業啟動

步驟二:通過EMR Serverless Spark建立SQL會話

建立的SQL會話用於SQL開發和查詢。有關會話的詳細介紹,請參見會話管理

  1. 進入會話管理頁面。

    1. 登入E-MapReduce控制台

    2. 在左側導覽列,選擇EMR Serverless > Spark

    3. Spark頁面,單擊目標工作空間名稱。

    4. EMR Serverless Spark頁面,單擊左側導覽列中的會話管理

  2. 建立SQL會話。

    1. SQL會話頁簽,單擊建立SQL會話

    2. 在建立SQL會話頁面,配置以下資訊,其餘參數無需配置,然後單擊建立

      參數

      說明

      名稱

      自訂SQL會話的名稱。例如,paimon_compute。

      Spark配置

      請填寫以下Spark配置資訊,以串連Paimon。

      spark.sql.extensions                org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon            org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore  dlf
      spark.sql.catalog.paimon.warehouse  <warehouse>
      spark.sql.catalog.paimon.dlf.catalog.id  <dlf.catalog.id>

      請根據您的實際情況替換以下資訊:

      • <warehouse>:配置資料倉儲的實際位置,請根據實際情況修改。

      • <dlf.catalog.id>:DLF資料目錄ID,請根據實際情況修改。

    3. 單擊操作列的啟動

步驟三:通過EMR Serverless Spark進行互動式查詢或任務調度

EMR Serverless Spark提供了互動式查詢和任務調度兩種操作模式,以滿足不同的使用需求。互動式查詢適用於快速查詢和調試,而任務調度則支援任務的開發、發布和營運,實現完整的生命週期管理。

在資料寫入過程中,我們可以隨時使用EMR Serverless Spark對Paimon表進行互動式查詢,以便即時擷取資料狀態和執行快速分析。此外,通過發布開發好的任務並建立工作流程,可以編排各項任務並完成工作流程的發布。您可以配置調度策略,實現任務的定期調度,從而保證資料處理和分析的自動化與高效性。

互動式查詢

  1. 建立SQL開發。

    1. EMR Serverless Spark頁面,單擊左側導覽列中的資料開發

    2. 開發目錄頁簽下,單擊建立

    3. 在彈出的對話方塊中,輸入名稱(例如,paimon_compact),類型選擇為SparkSQL,然後單擊確定

    4. 在右上方選擇資料目錄、資料庫和前一步驟中啟動的SQL會話。

    5. 在建立的任務編輯器中輸入SQL語句。

      • 樣本1:查詢test_append_tbl表中前10行的資料。

        SELECT * FROM paimon.test_paimon_db.test_append_tbl limit 10;

        返回結果樣本如下。

        image

      • 樣本2:統計test_append_tbl表中滿足特定條件的行數。

        SELECT COUNT(*) FROM paimon.test_paimon_db.test_append_tbl WHERE dt = '2024-06-24' AND hh = '19';

        返回結果樣本如下。

        image

  2. 運行並發布任務。

    1. 單擊運行

      返回結果資訊可以在下方的運行結果中查看。如果有異常,則可以在運行問題中查看。

    2. 確認運行無誤後,單擊右上方的發布

    3. 發布對話方塊中,可以輸入發布資訊,然後單擊確定

任務調度

  1. 查詢Compact前檔案資訊。

    資料開發頁面,建立SQL開發,查詢Paimon的files系統資料表,快速地得到Compact前檔案的資料。建立SQL開發的具體操作,請參見SparkSQL開發

    SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';

    image

  2. 資料開發頁面,編寫Paimon Compact SQL(例如,paimon_compact),然後完成發布。

    建立SQL開發的具體操作,請參見SparkSQL開發

    CALL paimon.sys.compact (
      table => 'test_paimon_db.test_append_tbl',
      partitions => 'dt=\"2024-06-24\",hh=\"19\"',
      order_strategy => 'zorder',
      order_by => 'category'
    );
  3. 建立工作流程。

    1. EMR Serverless Spark頁面,單擊左側導覽列中的任務編排

    2. 任務編排頁面,單擊建立工作流程

    3. 建立工作流程面板中,輸入工作流程名稱(例如,paimon_workflow_task),然後單擊下一步

      其他設定地區的參數,請根據您的實際情況配置,更多參數資訊請參見管理工作流程

    4. 在建立的節點畫布中,單擊添加節點

    5. 來源檔案路徑下拉式清單中選擇發行的SQL開發(paimon_compact),填寫Spark配置參數,然後單擊儲存

      參數

      說明

      名稱

      自訂SQL會話的名稱。例如,paimon_compute。

      Spark配置

      請填寫以下Spark配置資訊,以串連Paimon。

      spark.sql.extensions                org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon            org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore  dlf
      spark.sql.catalog.paimon.warehouse  <warehouse>
      spark.sql.catalog.paimon.dlf.catalog.id  <dlf.catalog.id>

      請根據您的實際情況替換以下資訊:

      • <warehouse>:配置資料倉儲的實際位置,請根據實際情況修改。

      • <dlf.catalog.id>:DLF資料目錄ID,請根據實際情況修改。

    6. 在建立的節點畫布中,單擊發布工作流程,然後單擊確定

  4. 運行工作流程。

    1. 任務編排頁面,單擊建立工作流程(例如,paimon_workflow_task)的工作流程名稱

    2. 工作流程執行個體列表頁面,單擊手動運行

    3. 觸發運行對話方塊中,單擊確定

  5. 驗證Compact效果。

    工作流程調度執行成功後,再次執行與開始相同的SQL查詢,對比Compact前後檔案的數量、記錄數和大小,以驗證Compact操作的效果。

    SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';

    image