本文為您介紹如何使用PyPaimon建立DLF Paimon表以及讀寫DLF Paimon表資料的方法。
PyPaimon 與 DLF 整合
PyPaimon 是 Apache Paimon 的 Python SDK。它提供了高效的資料入湖能力,支援開發人員使用 Python 語言直接讀取、寫入及處理 Paimon 表資料。
DLF Catalog 整合
通過引入 pypaimon_dlf2 擴充包並配置 DLF Catalog,使用者可以將 Paimon 表的中繼資料自動同步至阿里雲資料湖構建(DLF)服務。
這一整合帶來以下核心優勢:
多引擎互連:中繼資料託管於 DLF 後,阿里雲上的其他計算引擎(如 MaxCompute、Hologres、EMR)即可無縫訪問這些 Paimon 資料資源。
統一治理:藉助 DLF 的資料湖管理能力,使用者可以對 Paimon 表執行生命週期管理,並自動化地最佳化儲存格式。
前提條件
已建立DLF資料目錄。
Python 版本必須為 3.8 或更高。可以通過
python3 --version確認目前的版本。
操作步驟
步驟一:環境準備
下載pypaimon-1.4.dev0.tar.gz安裝包,並上傳到目標目錄下。
進入目標目錄下,執行以下命令,安裝pypaimon SDK。
pip3 install pypaimon-1.4.dev0.tar.gz(可選)安裝完成後,可通過
pip show pypaimon命令查看結果。
步驟二:通過PyPaimon訪問DLF Paimon表
在目標目錄下,執行以下命令,建立檔案
testdlf.py。vim testdlf.py在
testdlf.py檔案中,寫入以下完整範例程式碼。此樣本展示了如何通過PyPaimon建立DLF Paimon表以及讀寫DLF Paimon表資料。代碼的具體參數配置及多種讀寫表資料的方式,請參見代碼詳解。import pyarrow as pa import pandas as pd from pypaimon import CatalogFactory from pypaimon import Schema # 建立 catalog catalog_options = { 'metastore': 'rest', 'uri': "http://${region_id}-vpc.dlf.aliyuncs.com", 'warehouse': "${catalog_name}", 'dlf.region': '${region_id}', "token.provider": "dlf", 'dlf.access-key-id': "xxx", 'dlf.access-key-secret': "xxxx", } catalog = CatalogFactory.create(catalog_options) # 建立 database catalog.create_database( name='testdb', ignore_if_exists=True # 是否忽略Database已存在的錯誤 ) # 建立 Schema pa_schema = pa.schema([ ('date', pa.string()), ('hour', pa.string()), ('key', pa.int64()), ('value', pa.string()) ]) schema = Schema.from_pyarrow_schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' ) # 建立 table catalog.create_table( identifier='testdb.tb', schema=schema, ignore_if_exists=True # 是否忽略表已存在的錯誤 ) table = catalog.get_table('testdb.tb') # 建立 table write and commit write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # 寫表資料,支援pyarrow和Pandas # 寫入Pandas樣本資料 data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } dataframe = pd.DataFrame(data) table_write.write_pandas(dataframe) # 提交資料 table_commit.commit(table_write.prepare_commit()) # 關閉資源 table_write.close() table_commit.close() # 讀表資料,支援多種資料格式 read_builder = table.new_read_builder() predicate_builder = read_builder.new_predicate_builder() predicate = predicate_builder.equal('date', '2024-12-01') read_builder = read_builder.with_filter(predicate) table_scan = read_builder.new_scan() splits = table_scan.plan().splits() table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table)
步驟三:運行Python檔案
進入目標目錄,執行以下命令,運行Python指令碼。
python3 testdlf.py運行結果如下。

代碼詳解
通過PyPaimon建立DLF Paimon表
建立Paimon DLF Catalog。
說明Catalog是管理表的入口,在訪問DLF中的Paimon表之前,首先需要建立Catalog。
# Catalog options是一個dict, key和value都是str catalog_options = { 'metastore': 'rest', 'uri': "http://${region_id}-vpc.dlf.aliyuncs.com", 'warehouse': "${catalog_name}", 'dlf.region': '${region_id}', "token.provider": "dlf", 'dlf.access-key-id': "xxx", 'dlf.access-key-secret': "xxxx", } catalog = CatalogFactory.create(catalog_options)參數說明如下。
參數
說明
metastore
dlf-paimon
dlf.region
DLF Region ID,詳情請參見服務存取點。
dlf.endpoint
DLF Endpoint,詳情請參見服務存取點。
dlf.catalog.id
DLF資料目錄ID。可在資料湖構建控制台上查看資料目錄對應的ID,具體操作請參見資料目錄。
dlf.catalog.accessKeyId
訪問DLF服務所需的AccessKey。詳情請參見建立AccessKey。
dlf.catalog.accessKeySecret
訪問DLF服務所需的SecretKey。詳情請參見建立AccessKey。
max-workers
可選,PyPaimon讀資料時的並發數。N:大於等於1的整數,預設為1,即預設情況下是串列讀取。
建立Database。
在Paimon Catalog中,所有表都歸屬於特定的Database內。您可以建立Database來管理表。
catalog.create_database( name='database_name', ignore_if_exists=True, # 是否忽略Database已存在的錯誤 properties={'key': 'value'} # Database參數(可選) )建立Schema。
Schema包含列定義、分區鍵、主鍵、表參數和注釋。其中,列定義使用
pyarrow.Schema描述,而其餘參數均為可選項。可通過以下兩種方式構建pyarrow.Schema。PyArrow
使用
pyarrow.schema方法。樣本如下。import pyarrow as pa from pypaimon import Schema pa_schema = pa.schema([ ('date', pa.string()), ('hour', pa.string()), ('key', pa.int64()), ('value', pa.string()) ]) schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' )說明pyarrow和Paimon的資料類型映射,請參見PyPaimon資料類型映射。Pandas
如果您有pandas資料,也可以直接從
pandas.DataFrame中擷取。樣本如下。import pandas as pd import pyarrow as pa from pypaimon import Schema # 這裡是樣本DataFrame資料 data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } dataframe = pd.DataFrame(data) # 從DataFrame中擷取pyarrow.Schema record_batch = pa.RecordBatch.from_pandas(dataframe) pa_schema = record_batch.schema schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' )建立並擷取Table。
catalog.create_table( identifier='database_name.table_name', schema=schema, ignore_if_exists=True # 是否忽略表已存在的錯誤 ) table = catalog.get_table('database_name.table_name')
寫入表資料
當前PyPaimon不支援寫bucket=-1的主鍵表。
建立Table寫入與提交操作。
# 建立 table write and commit write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # 寫入Pandas樣本資料 data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], }您可通過以下兩種方式寫入表資料:
在處理大規模資料集時,推薦使用PyArrow;而對於較小規模的資料(通常指幾GB以下),Pandas則能更高效地進行處理。
PyArrow
支援
pyarrow.Table和pyarrow.RecordBatch兩種方式。其中,pyarrow.RecordBatch更適合用於串流情境。方式一:寫入pyarrow.Table
# 建立欄位 fields = [ pa.field('date', pa.string()), pa.field('hour', pa.string()), pa.field('key', pa.int64()), pa.field('value', pa.string()) ] # 使用資料和欄位建立 Schema schema = pa.schema(fields) # 建立 Table pa_table = pa.Table.from_arrays(data, schema) # 寫入資料 table_write.write_arrow(pa_table)方式二:寫入pyarrow.RecordBatch
# 建立欄位 fields = [ pa.field('date', pa.string()), pa.field('hour', pa.string()), pa.field('key', pa.int64()), pa.field('value', pa.string()) ] # 使用資料和欄位建立 Schema schema = pa.schema(fields) # 建立 RecordBatch record_batch = pa.RecordBatch.from_arrays(data, schema) # 寫入資料 table_write.write_arrow_batch(record_batch)
Pandas
支援寫入pandas.DataFrame。
import pandas as pd dataframe = pd.DataFrame(data) table_write.write_pandas(dataframe)提交資料並關閉資源。
# 提交資料 table_commit.commit(table_write.prepare_commit()) # 關閉資源 table_write.close() table_commit.close()
讀取表資料
建立ReadBuilder,構建讀資料工具。
read_builder = table.new_read_builder()使用PredicateBuilder來構建和下推篩選條件。
支援條件式篩選,例如您只想查詢date = 2024-12-01的資料。
predicate_builder = read_builder.new_predicate_builder() predicate = predicate_builder.equal('date', '2024-12-01') read_builder = read_builder.with_filter(predicate)支援篩選特定列,例如您只想查詢 key和value兩列。
read_builder = read_builder.with_projection(['key', 'value'])
說明更多支援的篩選條件,參見PyPaimon篩選條件。
擷取
splits。table_scan = read_builder.new_scan() splits = table_scan.plan().splits()將
splits轉換為多種資料格式。Apache Arrow
您可以把所有資料讀到
pyarrow.Table中。table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table) # 輸出樣本: # pyarrow.Table # key: int64 not null # value: string # ---- # key: [[2],[1]] # value: [["BBB"],["AAA"]]也可以將資料讀到
pyarrow.RecordBatchReader并迭代讀取。table_read = read_builder.new_read() for batch in table_read.to_arrow_batch_reader(splits): print(batch) # 輸出樣本: # pyarrow.RecordBatch # key: int64 # value: string # ---- # key: [1,2] # value: ["AAA","BBB"]
Pandas
您可以將資料讀到
pandas.DataFrame中。table_read = read_builder.new_read() df = table_read.to_pandas(splits) print(df) # 輸出樣本: # key value # 0 1 AAA # 1 2 BBBDuckDB
重要需要安裝DuckDB,可通過
pip install duckdb安裝。您可以將資料轉換為一個in-memory的DuckDB table並查詢。
table_read = read_builder.new_read() duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()) # 輸出樣本: # key value # 0 1 AAA # 1 2 BBB print(duckdb_con.query("SELECT * FROM duckdb_table WHERE key = 1").fetchdf()) # 輸出樣本: # key value # 0 1 AAARay
重要需要安裝Ray,可通過
pip install ray安裝。table_read = read_builder.new_read() ray_dataset = table_read.to_ray(splits) # 列印ray_dataset的資訊 print(ray_dataset) # 輸出樣本: # MaterializedDataset(num_blocks=1, num_rows=2, schema={key: int64, value: string}) # 列印ray_dataset中的前兩個元素 print(ray_dataset.take(2)) # 輸出樣本: # [{'key': 1, 'value': 'AAA'}, {'key': 2, 'value': 'BBB'}] # 將整個ray_dataset轉換為Pandas DataFrame格式後列印 print(ray_dataset.to_pandas()) # 輸出樣本: # key value # 0 1 AAA # 1 2 BBB