全部產品
Search
文件中心

Data Lake Formation:PyPaimon訪問DLF

更新時間:Jan 21, 2026

本文為您介紹如何使用PyPaimon建立DLF Paimon表以及讀寫DLF Paimon表資料的方法。

PyPaimon 與 DLF 整合

PyPaimon 是 Apache Paimon 的 Python SDK。它提供了高效的資料入湖能力,支援開發人員使用 Python 語言直接讀取、寫入及處理 Paimon 表資料。

DLF Catalog 整合

通過引入 pypaimon_dlf2 擴充包並配置 DLF Catalog,使用者可以將 Paimon 表的中繼資料自動同步至阿里雲資料湖構建(DLF)服務。

這一整合帶來以下核心優勢:

  • 多引擎互連:中繼資料託管於 DLF 後,阿里雲上的其他計算引擎(如 MaxCompute、Hologres、EMR)即可無縫訪問這些 Paimon 資料資源。

  • 統一治理:藉助 DLF 的資料湖管理能力,使用者可以對 Paimon 表執行生命週期管理,並自動化地最佳化儲存格式。

前提條件

  • 已建立DLF資料目錄

  • Python 版本必須為 3.8 或更高。可以通過python3 --version確認目前的版本。

操作步驟

步驟一:環境準備

  1. 下載pypaimon-1.4.dev0.tar.gz安裝包,並上傳到目標目錄下。

  2. 進入目標目錄下,執行以下命令,安裝pypaimon SDK。

    pip3 install pypaimon-1.4.dev0.tar.gz
  3. (可選)安裝完成後,可通過pip show pypaimon命令查看結果。

步驟二:通過PyPaimon訪問DLF Paimon表

  1. 在目標目錄下,執行以下命令,建立檔案testdlf.py

    vim testdlf.py
  2. testdlf.py檔案中,寫入以下完整範例程式碼。此樣本展示了如何通過PyPaimon建立DLF Paimon表以及讀寫DLF Paimon表資料。代碼的具體參數配置及多種讀寫表資料的方式,請參見代碼詳解

    import pyarrow as pa
    import pandas as pd
    from pypaimon import CatalogFactory
    from pypaimon import Schema
    
    # 建立 catalog
    catalog_options = {
      'metastore': 'rest',
      'uri': "http://${region_id}-vpc.dlf.aliyuncs.com",
      'warehouse': "${catalog_name}",
      'dlf.region': '${region_id}',
      "token.provider": "dlf",
      'dlf.access-key-id': "xxx",
      'dlf.access-key-secret': "xxxx",
    }
    catalog = CatalogFactory.create(catalog_options)
    
    # 建立 database
    catalog.create_database(
       name='testdb',
       ignore_if_exists=True    # 是否忽略Database已存在的錯誤
    )
    
    # 建立 Schema
    pa_schema = pa.schema([
        ('date', pa.string()),
        ('hour', pa.string()),
        ('key', pa.int64()),
        ('value', pa.string())
    ])
    
    schema = Schema.from_pyarrow_schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
    # 建立 table
    catalog.create_table(
        identifier='testdb.tb',
        schema=schema,
        ignore_if_exists=True  # 是否忽略表已存在的錯誤
    )
    table = catalog.get_table('testdb.tb')
    
    # 建立 table write and commit
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()
    
    # 寫表資料,支援pyarrow和Pandas
    # 寫入Pandas樣本資料
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
    
    dataframe = pd.DataFrame(data)
    table_write.write_pandas(dataframe)
    
    # 提交資料
    table_commit.commit(table_write.prepare_commit())
    # 關閉資源
    table_write.close()
    table_commit.close()
    
    # 讀表資料,支援多種資料格式
    read_builder = table.new_read_builder()
    predicate_builder = read_builder.new_predicate_builder()
    predicate = predicate_builder.equal('date', '2024-12-01')
    read_builder = read_builder.with_filter(predicate)
    
    table_scan = read_builder.new_scan()
    splits = table_scan.plan().splits()
    
    table_read = read_builder.new_read()
    pa_table = table_read.to_arrow(splits)
    print(pa_table)

步驟三:運行Python檔案

進入目標目錄,執行以下命令,運行Python指令碼。

python3 testdlf.py

運行結果如下。

image

代碼詳解

通過PyPaimon建立DLF Paimon表

  1. 建立Paimon DLF Catalog。

    說明

    Catalog是管理表的入口,在訪問DLF中的Paimon表之前,首先需要建立Catalog。

    # Catalog options是一個dict, key和value都是str
    catalog_options = {
      'metastore': 'rest',
      'uri': "http://${region_id}-vpc.dlf.aliyuncs.com",
      'warehouse': "${catalog_name}",
      'dlf.region': '${region_id}',
      "token.provider": "dlf",
      'dlf.access-key-id': "xxx",
      'dlf.access-key-secret': "xxxx",
    }
    catalog = CatalogFactory.create(catalog_options)

    參數說明如下。

    參數

    說明

    metastore

    dlf-paimon

    dlf.region

    DLF Region ID,詳情請參見服務存取點

    dlf.endpoint

    DLF Endpoint,詳情請參見服務存取點

    dlf.catalog.id

    DLF資料目錄ID。可在資料湖構建控制台上查看資料目錄對應的ID,具體操作請參見資料目錄

    dlf.catalog.accessKeyId

    訪問DLF服務所需的AccessKey。詳情請參見建立AccessKey

    dlf.catalog.accessKeySecret

    訪問DLF服務所需的SecretKey。詳情請參見建立AccessKey

    max-workers

    可選,PyPaimon讀資料時的並發數。N:大於等於1的整數,預設為1,即預設情況下是串列讀取。

  2. 建立Database。

    在Paimon Catalog中,所有表都歸屬於特定的Database內。您可以建立Database來管理表。

    catalog.create_database(
       name='database_name',
       ignore_if_exists=True,    # 是否忽略Database已存在的錯誤
       properties={'key': 'value'}  # Database參數(可選)
    )
  3. 建立Schema。

    Schema包含列定義、分區鍵、主鍵、表參數和注釋。其中,列定義使用 pyarrow.Schema 描述,而其餘參數均為可選項。可通過以下兩種方式構建pyarrow.Schema

    PyArrow

    使用 pyarrow.schema方法。樣本如下。

    import pyarrow as pa
    from pypaimon import Schema
    
    pa_schema = pa.schema([
        ('date', pa.string()),
        ('hour', pa.string()),
        ('key', pa.int64()),
        ('value', pa.string())
    ])
    
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
    說明

    pyarrowPaimon的資料類型映射,請參見PyPaimon資料類型映射

    Pandas

    如果您有pandas資料,也可以直接從 pandas.DataFrame中擷取。樣本如下。

    import pandas as pd
    import pyarrow as pa
    
    from pypaimon import Schema
    
    # 這裡是樣本DataFrame資料
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
    dataframe = pd.DataFrame(data)
    
    # 從DataFrame中擷取pyarrow.Schema
    record_batch = pa.RecordBatch.from_pandas(dataframe)
    pa_schema = record_batch.schema
    
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
  4. 建立並擷取Table。

    catalog.create_table(
        identifier='database_name.table_name',
        schema=schema,
        ignore_if_exists=True  # 是否忽略表已存在的錯誤
    )
    table = catalog.get_table('database_name.table_name')
    

寫入表資料

說明

當前PyPaimon不支援寫bucket=-1的主鍵表。

  1. 建立Table寫入與提交操作

    # 建立 table write and commit
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()
    
    # 寫入Pandas樣本資料
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
  2. 您可通過以下兩種方式寫入表資料:

    在處理大規模資料集時,推薦使用PyArrow;而對於較小規模的資料(通常指幾GB以下),Pandas則能更高效地進行處理。

    PyArrow

    支援pyarrow.Tablepyarrow.RecordBatch兩種方式。其中,pyarrow.RecordBatch更適合用於串流情境。

    • 方式一:寫入pyarrow.Table

      # 建立欄位
      fields = [
          pa.field('date', pa.string()),
          pa.field('hour', pa.string()),
          pa.field('key', pa.int64()),
          pa.field('value', pa.string())
      ]
      
      # 使用資料和欄位建立 Schema
      schema = pa.schema(fields)
      
      # 建立 Table
      pa_table = pa.Table.from_arrays(data, schema)
      
      # 寫入資料
      table_write.write_arrow(pa_table)
    • 方式二:寫入pyarrow.RecordBatch

      # 建立欄位
      fields = [
          pa.field('date', pa.string()),
          pa.field('hour', pa.string()),
          pa.field('key', pa.int64()),
          pa.field('value', pa.string())
      ]
      
      # 使用資料和欄位建立 Schema
      schema = pa.schema(fields)
      
      # 建立 RecordBatch
      record_batch = pa.RecordBatch.from_arrays(data, schema)
      
      # 寫入資料
      table_write.write_arrow_batch(record_batch)

    Pandas

    支援寫入pandas.DataFrame。

    import pandas as pd
    
    dataframe = pd.DataFrame(data)
    table_write.write_pandas(dataframe)
  3. 提交資料並關閉資源。

    # 提交資料
    table_commit.commit(table_write.prepare_commit())
    # 關閉資源
    table_write.close()
    table_commit.close()

讀取表資料

  1. 建立ReadBuilder,構建讀資料工具。

    read_builder = table.new_read_builder()
  2. 使用PredicateBuilder來構建和下推篩選條件。

    • 支援條件式篩選,例如您只想查詢date = 2024-12-01的資料。

      predicate_builder = read_builder.new_predicate_builder()
      predicate = predicate_builder.equal('date', '2024-12-01')
      read_builder = read_builder.with_filter(predicate)
    • 支援篩選特定列,例如您只想查詢 key和value兩列。

      read_builder = read_builder.with_projection(['key', 'value'])
    說明

    更多支援的篩選條件,參見PyPaimon篩選條件

  3. 擷取splits

    table_scan = read_builder.new_scan()
    splits = table_scan.plan().splits()
  4. splits轉換為多種資料格式。

    Apache Arrow

    • 您可以把所有資料讀到 pyarrow.Table中。

      table_read = read_builder.new_read()
      pa_table = table_read.to_arrow(splits)
      print(pa_table)
      
      # 輸出樣本:
      # pyarrow.Table
      # key: int64 not null
      # value: string
      # ----
      # key: [[2],[1]]
      # value: [["BBB"],["AAA"]]
    • 也可以將資料讀到 pyarrow.RecordBatchReader并迭代讀取。

      table_read = read_builder.new_read()
      for batch in table_read.to_arrow_batch_reader(splits):
          print(batch)
      
      # 輸出樣本:
      # pyarrow.RecordBatch
      # key: int64
      # value: string
      # ----
      # key: [1,2]
      # value: ["AAA","BBB"]

    Pandas

    您可以將資料讀到 pandas.DataFrame中。

    table_read = read_builder.new_read()
    df = table_read.to_pandas(splits)
    print(df)
    
    # 輸出樣本:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB

    DuckDB

    重要

    需要安裝DuckDB,可通過pip install duckdb安裝。

    您可以將資料轉換為一個in-memory的DuckDB table並查詢。

    table_read = read_builder.new_read()
    duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
    
    print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
    # 輸出樣本:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB
    
    print(duckdb_con.query("SELECT * FROM duckdb_table WHERE key = 1").fetchdf())
    # 輸出樣本:
    #    key  value
    # 0   1  AAA

    Ray

    重要

    需要安裝Ray,可通過pip install ray安裝。

    table_read = read_builder.new_read()
    ray_dataset = table_read.to_ray(splits)
    
    # 列印ray_dataset的資訊
    print(ray_dataset)
    # 輸出樣本:
    # MaterializedDataset(num_blocks=1, num_rows=2, schema={key: int64, value: string})
    
    # 列印ray_dataset中的前兩個元素
    print(ray_dataset.take(2))
    # 輸出樣本:
    # [{'key': 1, 'value': 'AAA'}, {'key': 2, 'value': 'BBB'}]
    
    # 將整個ray_dataset轉換為Pandas DataFrame格式後列印
    print(ray_dataset.to_pandas())
    # 輸出樣本:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB