全部產品
Search
文件中心

MaxCompute:基於Flink建立MaxCompute Paimon外部表格

更新時間:Jan 24, 2026

MaxCompute支援通過建立Paimon外部表格來與儲存在OSS上的Paimon表目錄建立映射關係,並訪問其中的資料。本文將介紹如何基於Flink建立Paimon外部表格並通過MaxCompute訪問Paimon外部表格。

背景資訊

Apache Paimon是一種流批一體的湖儲存格式,具備高吞吐的寫入和低延後查詢能力。阿里雲Realtime ComputeFlink版開源巨量資料平台E-MapReduce的常見計算引擎(如Spark、Hive或Trino)都與Paimon有完善的整合。藉助Apache Paimon,您可以快速構建自己的資料湖儲存服務在儲存服務OSS上,並接入MaxCompute實現資料湖的分析。關於Apache Paimon的詳細資料,請參見Apache Paimon

前提條件

  • 當前執行操作的帳號已具備建立MaxCompute表(CreateTable)的許可權。更多表許可權資訊,請參見MaxCompute許可權

  • 已建立MaxCompute專案。具體操作,請參見建立MaxCompute專案

  • 已建立儲存空間(Bucket)以及對應的檔案目錄。具體操作,請參見建立儲存空間

    說明

    由於MaxCompute只在部分地區部署,跨地區的資料連通性可能存在問題,因此建議Bucket與MaxCompute專案所在地區保持一致。

  • 已購買Flink全託管,具體操作請參見開通Realtime ComputeFlink版

注意事項

  • 當前MaxCompute僅支援對Paimon外部表格的讀取操作,暫時不支援寫入和自動跟隨Paimon表結構變更等操作。

  • Paimon當前暫時不支援開啟了Schema操作的MaxCompute專案。

  • Paimon外部表格不支援cluster屬性。

  • Paimon外部表格暫不支援查詢回溯歷史版本的資料等特性。

步驟一:在MaxCompute專案中上傳Paimon外掛程式

您可以選擇以下其中一種方式,在已建立的MaxCompute專案中上傳Paimon外掛程式

使用MaxCompute用戶端

通過用戶端(odpscmd)訪問已建立的MaxCompute專案,並執行以下代碼,將paimon_maxcompute_connector.jar上傳至MaxCompute專案中。

ADD JAR <path_to_paimon_maxcompute_connector.jar>;

使用DataWorks

  1. 登入DataWorks控制台,在左側導覽列選擇工作空間,單擊目標工作空間操作列中的快速進入 > 資料開發

  2. 資料開發頁面,單擊建立按鈕,選擇建立資源 > JAR

  3. 建立資源對話方塊,配置建立資源參數,上傳paimon_maxcompute_connector.jar,單擊建立。建立資源操作詳情,請參見步驟一:建立或上傳資源

    image.png

  4. 資源建立完成後,您需在資源編輯頁面,單擊工具列中的image.png表徵圖,提交資源至調度程式開發伺服器端。

步驟二:基於Flink建立MaxCompute Paimon外部表格

本文的最佳實務情境是以Flink為主要的操作環境,從Flink向OSS寫入Paimon檔案,自訂Flink Catalog,同時向MaxCompute建立基於OSS的支援Paimon檔案讀取能力的外表。最後從MaxCompute讀取OSS上Flink寫入的Paimon資料。

  1. 登入Realtime Compute控制台,建立查詢指令碼。建立查詢指令碼詳情,請參見查詢指令碼

  2. 在指令碼編輯地區,填寫Catalog代碼資訊及參數值後,選中代碼,單擊運行

    CREATE CATALOG `<catalog name>` WITH (
     'type' = 'paimon',
      'metastore' = 'maxcompute',
      'warehouse' = '<warehouse>',
      'maxcompute.endpoint' = '<maxcompute.endpoint>',
      'maxcompute.project' = '<maxcompute.project>',
      'maxcompute.accessid' = '<maxcompute.accessid>',
      'maxcompute.accesskey' = '<maxcompute.accesskey>',
      'maxcompute.oss.endpoint' = '<maxcompute.oss.endpoint>',
      'fs.oss.endpoint' = '<fs.oss.endpoint>',
      'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
      'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
    );

    參數說明

    參數

    是否必填

    說明

    catalog name

    Paimon Catalog名稱,自訂配置時,要求為英文字母。本文以catalogname為例。

    type

    Catalog類型,固定值為paimon

    metastore

    中繼資料存放區類型,取值為maxcompute。

    warehouse

    OSS服務中所指定的數倉目錄,格式為oss://<bucket>/<object>,其中:

    • bucket:表示您建立的OSS Bucket名稱。

    • object:表示您存放資料的路徑。

    您可以在OSS管理主控台上查看您的bucket和object名稱。

    maxcompute.endpoint

    MaxCompute服務的訪問網域名稱。

    您需要根據建立MaxCompute專案時選擇的地區以及網路連接方式配置Endpoint。各地區及網路對應的Endpoint資訊,請參見Endpoint

    maxcompute.project

    目標MaxCompute專案名稱。

    暫不支援開啟了Schema的MaxCompute專案。

    maxcompute.accessid

    擁有MaxCompute許可權的阿里雲帳號或RAM使用者的AccessKey ID。

    您可以進入AccessKey管理頁面擷取AccessKey ID。

    maxcompute.accesskey

    AccessKey ID對應的AccessKey Secret。

    maxcompute.oss.endpoint

    MaxCompute訪問OSS服務的訪問網域名稱,如果未填寫,將預設使用fs.oss.endpoint參數的值。

    重要

    由於OSS Bucket與MaxCompute專案處於同一地區,此處需填寫內網Endpoint。關於OSS各地區及網路對應的Endpoint值,請參見地區和Endpoint

    fs.oss.endpoint

    OSS服務的串連地址。

    當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其他帳號下的OSS Bucket時需要填寫。

    說明

    您需要根據建立OSS Bucket時選擇的地區以及網路連接方式配置Endpoint。各地區及網路對應的Endpoint資訊,請參見地區和Endpoint

    fs.oss.accessKeyId

    擁有讀寫OSS許可權的阿里雲帳號或RAM使用者的AccessKey ID。

    當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其他帳號下的OSS Bucket時需要填寫。

    您可以進入AccessKey管理頁面擷取AccessKey ID。

    fs.oss.accessKeySecret

    AccessKey ID對應的AccessKey Secret。

    當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其他帳號下的OSS Bucket時需要填寫。

  3. 建立Paimon外部表格。

    1. 建立樣本表test_tbl

      查詢指令碼頁簽的指令碼編輯地區,執行以下語句,等待結果頁簽中提示已完成。本文以建立表test_tbl為例。

      CREATE TABLE `catalogname`.`default`.test_tbl (
       dt STRING,
       id BIGINT,
       data STRING,
       PRIMARY KEY (dt, id) NOT ENFORCED
      ) PARTITIONED BY (dt);
    2. 向樣本表test_tbl中寫入資料。

      作業草稿頁面,建立包含如下語句的SQL作業,部署並運行作業。關於如何建立並運行SQL作業詳情,請參見作業開發地圖

      --此處將檢查點間隔縮短為 10s,是為了更快地提交資料。
      SET 'execution.checkpointing.interval' = '10s';
      
      INSERT INTO `catalogname`.`default`.test_tbl VALUES ('2023-04-21', 1, 'AAA'), ('2023-04-21', 2, 'BBB'), ('2023-04-22', 1, 'CCC'), ('2023-04-22', 2, 'DDD');
      說明
      • Paimon結果表在每次檢查點完成之後才會正式提交資料。

      • 在生產環境下,系統檢查點的間隔與兩次系統檢查點之間的最短時間間隔根據業務對延時要求的不同,一般設定為 1 分鐘到 10 分鐘。

      • 請確認SQL作業的引擎版本為vvr-8.0.5-flink-1.17及以上版本。

步驟三:通過MaxCompute讀取Paimon外部表格

  1. 使用本地用戶端(odpscmd)串連或其他可以運行MaxCompute SQL的工具,執行以下命令。

    SET odps.sql.common.table.planner.ext.hive.bridge = true;
    SET odps.sql.hive.compatible = true;
  2. 執行以下命令,查詢MaxCompute Paimon外部表格test_tbl

    SELECT * FROM test_tbl WHERE dt = '2023-04-21';

    返回結果如下。

    +------------+------------+------------+
    | id | data | dt |
    +------------+------------+------------+
    | 1 | AAA | 2023-04-21 |
    | 2 | BBB | 2023-04-21 |
    +------------+------------+------------+