このトピックでは、PyPaimon を使用して Data Lake Formation (DLF) の Paimon テーブルを作成、読み取り、書き込みする方法について説明します。
PyPaimon と DLF の統合
PyPaimon は、Apache Paimon 向けの Python ソフトウェア開発キット (SDK) です。効率的なデータインジェスト機能を提供し、開発者は Python を使用して Paimon テーブルのデータを直接読み取り、書き込み、処理できます。
DLF カタログの統合
pypaimon_dlf2 拡張パッケージをインポートし、DLF カタログを設定することで、Paimon テーブルのメタデータを Alibaba Cloud の Data Lake Formation (DLF) サービスと自動的に同期できます。
この統合には、主に次の利点があります。
マルチエンジン間の相互運用性:メタデータが DLF で管理されると、MaxCompute、Hologres、EMR など、Alibaba Cloud 上の他のコンピューティングエンジンがこの Paimon データにシームレスにアクセスできるようになります。
統合管理:DLF のデータレイク管理機能を使用して、Paimon テーブルのライフサイクル管理を実行し、ストレージフォーマットを自動的に最適化できます。
前提条件
DLF データカタログが作成されていること。
Python のバージョンは 3.8 以降です。
python3 --versionを実行して、現在のバージョンを確認できます。
操作手順
ステップ 1: 環境の準備
pypaimon-1.4.dev0.tar.gz パッケージをダウンロードし、ターゲットフォルダにアップロードします。
ターゲットフォルダに移動し、次のコマンドを実行して pypaimon SDK をインストールします。
pip3 install pypaimon-1.4.dev0.tar.gz(任意) インストールが完了したら、
pip show pypaimonコマンドを実行してインストールを検証できます。
ステップ 2: PyPaimon を使用した DLF Paimon テーブルへのアクセス
ターゲットフォルダで、次のコマンドを実行して
testdlf.pyという名前のファイルを作成します。vim testdlf.pytestdlf.pyファイルに、次のサンプルコードを追加します。この例では、PyPaimon を使用して DLF Paimon テーブルを作成し、そのテーブルの読み取りと書き込みを行う方法を示します。パラメーター設定、およびテーブルの読み取りと書き込みのさまざまなメソッドの詳細については、「コードの詳細」をご参照ください。import pyarrow as pa import pandas as pd from pypaimon import CatalogFactory from pypaimon import Schema # カタログの作成 catalog_options = { 'metastore': 'rest', 'uri': "http://${region_id}-vpc.dlf.aliyuncs.com", 'warehouse': "${catalog_name}", 'dlf.region': '${region_id}', "token.provider": "dlf", 'dlf.access-key-id': "xxx", 'dlf.access-key-secret': "xxxx", } catalog = CatalogFactory.create(catalog_options) # データベースの作成 catalog.create_database( name='testdb', ignore_if_exists=True # データベースが既に存在する場合にエラーを無視するかどうかを指定 ) # スキーマの作成 pa_schema = pa.schema([ ('date', pa.string()), ('hour', pa.string()), ('key', pa.int64()), ('value', pa.string()) ]) schema = Schema.from_pyarrow_schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' ) # テーブルの作成 catalog.create_table( identifier='testdb.tb', schema=schema, ignore_if_exists=True # テーブルが既に存在する場合にエラーを無視するかどうかを指定 ) table = catalog.get_table('testdb.tb') # テーブルの書き込みおよびコミット操作の作成 write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # テーブルへのデータ書き込み。PyArrow と Pandas をサポートしています。 # Pandas のサンプルデータの書き込み data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } dataframe = pd.DataFrame(data) table_write.write_pandas(dataframe) # データのコミット table_commit.commit(table_write.prepare_commit()) # リソースのクローズ table_write.close() table_commit.close() # テーブルからのデータ読み取り。複数のデータ形式をサポートしています。 read_builder = table.new_read_builder() predicate_builder = read_builder.new_predicate_builder() predicate = predicate_builder.equal('date', '2024-12-01') read_builder = read_builder.with_filter(predicate) table_scan = read_builder.new_scan() splits = table_scan.plan().splits() table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table)
ステップ 3: Python ファイルの実行
ターゲットフォルダで、次のコマンドを実行して Python スクリプトを実行します。
python3 testdlf.py次の結果が返されます。

コード詳細
PyPaimon を使用した DLF Paimon テーブルの作成
1. Paimon DLF データカタログの作成
説明カタログは、テーブルを管理するためのエントリーポイントです。DLF の Paimon テーブルにアクセスするには、まずカタログを作成する必要があります。
# catalog_options は、キーと値が両方とも文字列の辞書です。 catalog_options = { 'metastore': 'rest', 'uri': "http://${region_id}-vpc.dlf.aliyuncs.com", 'warehouse': "${catalog_name}", 'dlf.region': '${region_id}', "token.provider": "dlf", 'dlf.access-key-id': "xxx", 'dlf.access-key-secret': "xxxx", } catalog = CatalogFactory.create(catalog_options)パラメーターの説明は次のとおりです。
パラメーター
説明
metastore
dlf-paimon
dlf.region
DLF のリージョン ID。詳細については、「エンドポイント」をご参照ください。
dlf.endpoint
DLF のエンドポイント。詳細については、「エンドポイント」をご参照ください。
dlf.catalog.id
DLF データカタログの ID。データカタログの ID は Data Lake Formation コンソールで確認できます。詳細については、「データカタログ」をご参照ください。
dlf.catalog.accessKeyId
DLF サービスへのアクセスに必要な AccessKey ID。詳細については、「AccessKey ペアの作成」をご参照ください。
dlf.catalog.accessKeySecret
DLF サービスへのアクセスに必要な AccessKey Secret。詳細については、「AccessKey ペアの作成」をご参照ください。
max-workers
任意。PyPaimon でデータを読み取る際の並列度。N は 1 以上の整数です。デフォルト値は 1 で、データが直列に読み取られることを意味します。
データベースを作成します。
Paimon データカタログでは、すべてのテーブルが特定のデータベースに属している必要があります。データベースを作成してテーブルを管理できます。
catalog.create_database( name='database_name', ignore_if_exists=True, # データベースが既に存在する場合にエラーを無視するかどうかを指定 properties={'key': 'value'} # データベースパラメーター (任意) )スキーマを作成します。
スキーマには、列定義、パーティションキー、プライマリキー、テーブルパラメーター、およびコメントが含まれます。列定義は
pyarrow.Schemaを使用して定義されます。その他のパラメーターは任意です。次の 2 つのメソッドのいずれかを使用してpyarrow.Schemaを構築できます。PyArrow
pyarrow.schemaメソッドを使用します。次のコードに例を示します。import pyarrow as pa from pypaimon import Schema pa_schema = pa.schema([ ('date', pa.string()), ('hour', pa.string()), ('key', pa.int64()), ('value', pa.string()) ]) schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' )説明pyarrowとPaimon間のデータの型のマッピングの詳細については、「PyPaimon のデータの型のマッピング」をご参照ください。Pandas
Pandas データがある場合は、
pandas.DataFrameから直接スキーマを取得することもできます。以下のコードは一例です。import pandas as pd import pyarrow as pa from pypaimon import Schema # これは DataFrame のサンプルデータです data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } dataframe = pd.DataFrame(data) # DataFrame から pyarrow.Schema を取得 record_batch = pa.RecordBatch.from_pandas(dataframe) pa_schema = record_batch.schema schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' )4. テーブルオブジェクトの作成と取得
catalog.create_table( identifier='database_name.table_name', schema=schema, ignore_if_exists=True # テーブルが既に存在する場合にエラーを無視するかどうかを指定 ) table = catalog.get_table('database_name.table_name')
テーブルへのデータ書き込み
現在、PyPaimon は、バケットが -1 に設定されているプライマリキーテーブルへの書き込みをサポートしていません。
1. テーブル、書き込み、およびコミット操作の作成
# テーブルの書き込みおよびコミット操作の作成 write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # Pandas のサンプルデータの書き込み data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], }次の 2 つのメソッドのいずれかを使用して、テーブルにデータを書き込むことができます。
大規模なデータセットを処理する場合は、PyArrow の使用を推奨します。数ギガバイト (GB) 以下の比較的小さなデータセットの場合は、Pandas の方が効率的です。
PyArrow
pyarrow.Tableとpyarrow.RecordBatchの両方がサポートされています。pyarrow.RecordBatchはストリーム処理シナリオにより適しています。メソッド 1: pyarrow.Table の書き込み
# フィールドの作成 fields = [ pa.field('date', pa.string()), pa.field('hour', pa.string()), pa.field('key', pa.int64()), pa.field('value', pa.string()) ] # データとフィールドを使用してスキーマを作成 schema = pa.schema(fields) # テーブルの作成 pa_table = pa.Table.from_arrays(data, schema) # データの書き込み table_write.write_arrow(pa_table)メソッド 2: pyarrow.RecordBatch の書き込み
# フィールドの作成 fields = [ pa.field('date', pa.string()), pa.field('hour', pa.string()), pa.field('key', pa.int64()), pa.field('value', pa.string()) ] # データとフィールドを使用してスキーマを作成 schema = pa.schema(fields) # RecordBatch の作成 record_batch = pa.RecordBatch.from_arrays(data, schema) # データの書き込み table_write.write_arrow_batch(record_batch)
Pandas
pandas.DataFrame を書き込むことができます。
import pandas as pd dataframe = pd.DataFrame(data) table_write.write_pandas(dataframe)3. データの送信とリソースの解放
# データのコミット table_commit.commit(table_write.prepare_commit()) # リソースのクローズ table_write.close() table_commit.close()
テーブルからのデータ読み取り
1. `ReadBuilder` を作成して読み取り操作を設定
read_builder = table.new_read_builder()2. `PredicateBuilder` を使用してフィルター条件を構築し、プッシュダウン
条件によるフィルター。たとえば、`date` が `2024-12-01` のデータのみをクエリします。
predicate_builder = read_builder.new_predicate_builder() predicate = predicate_builder.equal('date', '2024-12-01') read_builder = read_builder.with_filter(predicate)特定の列によるフィルター。たとえば、`key` と `value` 列のみをクエリします。
read_builder = read_builder.with_projection(['key', 'value'])
説明サポートされているフィルター条件の詳細については、「PyPaimon フィルター条件」をご参照ください。
splitsを取得します。table_scan = read_builder.new_scan() splits = table_scan.plan().splits()splitsをさまざまなデータ形式に変換します。Apache Arrow
すべてのデータを
pyarrow.Tableに読み込むことができます。table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table) # サンプル出力: # pyarrow.Table # key: int64 not null # value: string # ---- # key: [[2],[1]] # value: [["BBB"],["AAA"]]また、データを
pyarrow.RecordBatchReaderに読み込んで、反復的に読み取ることもできます。table_read = read_builder.new_read() for batch in table_read.to_arrow_batch_reader(splits): print(batch) # サンプル出力: # pyarrow.RecordBatch # key: int64 # value: string # ---- # key: [1,2] # value: ["AAA","BBB"]
Pandas
データを
pandas.DataFrameに読み込むことができます。table_read = read_builder.new_read() df = table_read.to_pandas(splits) print(df) # サンプル出力: # key value # 0 1 AAA # 1 2 BBBDuckDB
重要DuckDB をインストールする必要があります。
pip install duckdbを実行してインストールできます。データをインメモリの DuckDB テーブルに変換してクエリを実行できます。
table_read = read_builder.new_read() duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()) # サンプル出力: # key value # 0 1 AAA # 1 2 BBB print(duckdb_con.query("SELECT * FROM duckdb_table WHERE key = 1").fetchdf()) # サンプル出力: # key value # 0 1 AAARay
重要Ray は
pip install rayを実行してインストールする必要があります。table_read = read_builder.new_read() ray_dataset = table_read.to_ray(splits) # ray_dataset に関する情報の出力 print(ray_dataset) # サンプル出力: # MaterializedDataset(num_blocks=1, num_rows=2, schema={key: int64, value: string}) # ray_dataset の最初の 2 つの要素の出力 print(ray_dataset.take(2)) # サンプル出力: # [{'key': 1, 'value': 'AAA'}, {'key': 2, 'value': 'BBB'}] # ray_dataset 全体を Pandas DataFrame に変換して出力 print(ray_dataset.to_pandas()) # サンプル出力: # key value # 0 1 AAA # 1 2 BBB