すべてのプロダクト
Search
ドキュメントセンター

Data Lake Formation:PyPaimon を使用した DLF へのアクセス

最終更新日:Mar 01, 2026

このトピックでは、PyPaimon を使用して Data Lake Formation (DLF) の Paimon テーブルを作成、読み取り、書き込みする方法について説明します。

PyPaimon と DLF の統合

PyPaimon は、Apache Paimon 向けの Python ソフトウェア開発キット (SDK) です。効率的なデータインジェスト機能を提供し、開発者は Python を使用して Paimon テーブルのデータを直接読み取り、書き込み、処理できます。

DLF カタログの統合

pypaimon_dlf2 拡張パッケージをインポートし、DLF カタログを設定することで、Paimon テーブルのメタデータを Alibaba Cloud の Data Lake Formation (DLF) サービスと自動的に同期できます。

この統合には、主に次の利点があります。

  • マルチエンジン間の相互運用性:メタデータが DLF で管理されると、MaxCompute、Hologres、EMR など、Alibaba Cloud 上の他のコンピューティングエンジンがこの Paimon データにシームレスにアクセスできるようになります。

  • 統合管理:DLF のデータレイク管理機能を使用して、Paimon テーブルのライフサイクル管理を実行し、ストレージフォーマットを自動的に最適化できます。

前提条件

  • DLF データカタログが作成されていること。

  • Python のバージョンは 3.8 以降です。python3 --version を実行して、現在のバージョンを確認できます。

操作手順

ステップ 1: 環境の準備

  1. pypaimon-1.4.dev0.tar.gz パッケージをダウンロードし、ターゲットフォルダにアップロードします。

  2. ターゲットフォルダに移動し、次のコマンドを実行して pypaimon SDK をインストールします。

    pip3 install pypaimon-1.4.dev0.tar.gz
  3. (任意) インストールが完了したら、pip show pypaimon コマンドを実行してインストールを検証できます。

ステップ 2: PyPaimon を使用した DLF Paimon テーブルへのアクセス

  1. ターゲットフォルダで、次のコマンドを実行して testdlf.py という名前のファイルを作成します。

    vim testdlf.py
  2. testdlf.py ファイルに、次のサンプルコードを追加します。この例では、PyPaimon を使用して DLF Paimon テーブルを作成し、そのテーブルの読み取りと書き込みを行う方法を示します。パラメーター設定、およびテーブルの読み取りと書き込みのさまざまなメソッドの詳細については、「コードの詳細」をご参照ください。

    import pyarrow as pa
    import pandas as pd
    from pypaimon import CatalogFactory
    from pypaimon import Schema
    
    # カタログの作成
    catalog_options = {
      'metastore': 'rest',
      'uri': "http://${region_id}-vpc.dlf.aliyuncs.com",
      'warehouse': "${catalog_name}",
      'dlf.region': '${region_id}',
      "token.provider": "dlf",
      'dlf.access-key-id': "xxx",
      'dlf.access-key-secret': "xxxx",
    }
    catalog = CatalogFactory.create(catalog_options)
    
    # データベースの作成
    catalog.create_database(
       name='testdb',
       ignore_if_exists=True    # データベースが既に存在する場合にエラーを無視するかどうかを指定
    )
    
    # スキーマの作成
    pa_schema = pa.schema([
        ('date', pa.string()),
        ('hour', pa.string()),
        ('key', pa.int64()),
        ('value', pa.string())
    ])
    
    schema = Schema.from_pyarrow_schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
    # テーブルの作成
    catalog.create_table(
        identifier='testdb.tb',
        schema=schema,
        ignore_if_exists=True  # テーブルが既に存在する場合にエラーを無視するかどうかを指定
    )
    table = catalog.get_table('testdb.tb')
    
    # テーブルの書き込みおよびコミット操作の作成
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()
    
    # テーブルへのデータ書き込み。PyArrow と Pandas をサポートしています。
    # Pandas のサンプルデータの書き込み
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
    
    dataframe = pd.DataFrame(data)
    table_write.write_pandas(dataframe)
    
    # データのコミット
    table_commit.commit(table_write.prepare_commit())
    # リソースのクローズ
    table_write.close()
    table_commit.close()
    
    # テーブルからのデータ読み取り。複数のデータ形式をサポートしています。
    read_builder = table.new_read_builder()
    predicate_builder = read_builder.new_predicate_builder()
    predicate = predicate_builder.equal('date', '2024-12-01')
    read_builder = read_builder.with_filter(predicate)
    
    table_scan = read_builder.new_scan()
    splits = table_scan.plan().splits()
    
    table_read = read_builder.new_read()
    pa_table = table_read.to_arrow(splits)
    print(pa_table)

ステップ 3: Python ファイルの実行

ターゲットフォルダで、次のコマンドを実行して Python スクリプトを実行します。

python3 testdlf.py

次の結果が返されます。

image

コード詳細

PyPaimon を使用した DLF Paimon テーブルの作成

  1. 1. Paimon DLF データカタログの作成

    説明

    カタログは、テーブルを管理するためのエントリーポイントです。DLF の Paimon テーブルにアクセスするには、まずカタログを作成する必要があります。

    # catalog_options は、キーと値が両方とも文字列の辞書です。
    catalog_options = {
      'metastore': 'rest',
      'uri': "http://${region_id}-vpc.dlf.aliyuncs.com",
      'warehouse': "${catalog_name}",
      'dlf.region': '${region_id}',
      "token.provider": "dlf",
      'dlf.access-key-id': "xxx",
      'dlf.access-key-secret': "xxxx",
    }
    catalog = CatalogFactory.create(catalog_options)

    パラメーターの説明は次のとおりです。

    パラメーター

    説明

    metastore

    dlf-paimon

    dlf.region

    DLF のリージョン ID。詳細については、「エンドポイント」をご参照ください。

    dlf.endpoint

    DLF のエンドポイント。詳細については、「エンドポイント」をご参照ください。

    dlf.catalog.id

    DLF データカタログの ID。データカタログの ID は Data Lake Formation コンソールで確認できます。詳細については、「データカタログ」をご参照ください。

    dlf.catalog.accessKeyId

    DLF サービスへのアクセスに必要な AccessKey ID。詳細については、「AccessKey ペアの作成」をご参照ください。

    dlf.catalog.accessKeySecret

    DLF サービスへのアクセスに必要な AccessKey Secret。詳細については、「AccessKey ペアの作成」をご参照ください。

    max-workers

    任意。PyPaimon でデータを読み取る際の並列度。N は 1 以上の整数です。デフォルト値は 1 で、データが直列に読み取られることを意味します。

  2. データベースを作成します。

    Paimon データカタログでは、すべてのテーブルが特定のデータベースに属している必要があります。データベースを作成してテーブルを管理できます。

    catalog.create_database(
       name='database_name',
       ignore_if_exists=True,    # データベースが既に存在する場合にエラーを無視するかどうかを指定
       properties={'key': 'value'}  # データベースパラメーター (任意)
    )
  3. スキーマを作成します。

    スキーマには、列定義、パーティションキー、プライマリキー、テーブルパラメーター、およびコメントが含まれます。列定義は pyarrow.Schema を使用して定義されます。その他のパラメーターは任意です。次の 2 つのメソッドのいずれかを使用して pyarrow.Schema を構築できます。

    PyArrow

    pyarrow.schema メソッドを使用します。次のコードに例を示します。

    import pyarrow as pa
    from pypaimon import Schema
    
    pa_schema = pa.schema([
        ('date', pa.string()),
        ('hour', pa.string()),
        ('key', pa.int64()),
        ('value', pa.string())
    ])
    
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
    説明

    pyarrowPaimon 間のデータの型のマッピングの詳細については、「PyPaimon のデータの型のマッピング」をご参照ください。

    Pandas

    Pandas データがある場合は、pandas.DataFrame から直接スキーマを取得することもできます。以下のコードは一例です。

    import pandas as pd
    import pyarrow as pa
    
    from pypaimon import Schema
    
    # これは DataFrame のサンプルデータです
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
    dataframe = pd.DataFrame(data)
    
    # DataFrame から pyarrow.Schema を取得
    record_batch = pa.RecordBatch.from_pandas(dataframe)
    pa_schema = record_batch.schema
    
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
  4. 4. テーブルオブジェクトの作成と取得

    catalog.create_table(
        identifier='database_name.table_name',
        schema=schema,
        ignore_if_exists=True  # テーブルが既に存在する場合にエラーを無視するかどうかを指定
    )
    table = catalog.get_table('database_name.table_name')
    

テーブルへのデータ書き込み

説明

現在、PyPaimon は、バケットが -1 に設定されているプライマリキーテーブルへの書き込みをサポートしていません。

  1. 1. テーブル、書き込み、およびコミット操作の作成

    # テーブルの書き込みおよびコミット操作の作成
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()
    
    # Pandas のサンプルデータの書き込み
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
  2. 次の 2 つのメソッドのいずれかを使用して、テーブルにデータを書き込むことができます。

    大規模なデータセットを処理する場合は、PyArrow の使用を推奨します。数ギガバイト (GB) 以下の比較的小さなデータセットの場合は、Pandas の方が効率的です。

    PyArrow

    pyarrow.Tablepyarrow.RecordBatch の両方がサポートされています。 pyarrow.RecordBatchはストリーム処理シナリオにより適しています。

    • メソッド 1: pyarrow.Table の書き込み

      # フィールドの作成
      fields = [
          pa.field('date', pa.string()),
          pa.field('hour', pa.string()),
          pa.field('key', pa.int64()),
          pa.field('value', pa.string())
      ]
      
      # データとフィールドを使用してスキーマを作成
      schema = pa.schema(fields)
      
      # テーブルの作成
      pa_table = pa.Table.from_arrays(data, schema)
      
      # データの書き込み
      table_write.write_arrow(pa_table)
    • メソッド 2: pyarrow.RecordBatch の書き込み

      # フィールドの作成
      fields = [
          pa.field('date', pa.string()),
          pa.field('hour', pa.string()),
          pa.field('key', pa.int64()),
          pa.field('value', pa.string())
      ]
      
      # データとフィールドを使用してスキーマを作成
      schema = pa.schema(fields)
      
      # RecordBatch の作成
      record_batch = pa.RecordBatch.from_arrays(data, schema)
      
      # データの書き込み
      table_write.write_arrow_batch(record_batch)

    Pandas

    pandas.DataFrame を書き込むことができます。

    import pandas as pd
    
    dataframe = pd.DataFrame(data)
    table_write.write_pandas(dataframe)
  3. 3. データの送信とリソースの解放

    # データのコミット
    table_commit.commit(table_write.prepare_commit())
    # リソースのクローズ
    table_write.close()
    table_commit.close()

テーブルからのデータ読み取り

  1. 1. `ReadBuilder` を作成して読み取り操作を設定

    read_builder = table.new_read_builder()
  2. 2. `PredicateBuilder` を使用してフィルター条件を構築し、プッシュダウン

    • 条件によるフィルター。たとえば、`date` が `2024-12-01` のデータのみをクエリします。

      predicate_builder = read_builder.new_predicate_builder()
      predicate = predicate_builder.equal('date', '2024-12-01')
      read_builder = read_builder.with_filter(predicate)
    • 特定の列によるフィルター。たとえば、`key` と `value` 列のみをクエリします。

      read_builder = read_builder.with_projection(['key', 'value'])
    説明

    サポートされているフィルター条件の詳細については、「PyPaimon フィルター条件」をご参照ください。

  3. splits を取得します。

    table_scan = read_builder.new_scan()
    splits = table_scan.plan().splits()
  4. splits をさまざまなデータ形式に変換します。

    Apache Arrow

    • すべてのデータを pyarrow.Table に読み込むことができます。

      table_read = read_builder.new_read()
      pa_table = table_read.to_arrow(splits)
      print(pa_table)
      
      # サンプル出力:
      # pyarrow.Table
      # key: int64 not null
      # value: string
      # ----
      # key: [[2],[1]]
      # value: [["BBB"],["AAA"]]
    • また、データを pyarrow.RecordBatchReader に読み込んで、反復的に読み取ることもできます。

      table_read = read_builder.new_read()
      for batch in table_read.to_arrow_batch_reader(splits):
          print(batch)
      
      # サンプル出力:
      # pyarrow.RecordBatch
      # key: int64
      # value: string
      # ----
      # key: [1,2]
      # value: ["AAA","BBB"]

    Pandas

    データを pandas.DataFrame に読み込むことができます。

    table_read = read_builder.new_read()
    df = table_read.to_pandas(splits)
    print(df)
    
    # サンプル出力:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB

    DuckDB

    重要

    DuckDB をインストールする必要があります。pip install duckdb を実行してインストールできます。

    データをインメモリの DuckDB テーブルに変換してクエリを実行できます。

    table_read = read_builder.new_read()
    duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
    
    print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
    # サンプル出力:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB
    
    print(duckdb_con.query("SELECT * FROM duckdb_table WHERE key = 1").fetchdf())
    # サンプル出力:
    #    key  value
    # 0   1  AAA

    Ray

    重要

    Ray は pip install ray を実行してインストールする必要があります。

    table_read = read_builder.new_read()
    ray_dataset = table_read.to_ray(splits)
    
    # ray_dataset に関する情報の出力
    print(ray_dataset)
    # サンプル出力:
    # MaterializedDataset(num_blocks=1, num_rows=2, schema={key: int64, value: string})
    
    # ray_dataset の最初の 2 つの要素の出力
    print(ray_dataset.take(2))
    # サンプル出力:
    # [{'key': 1, 'value': 'AAA'}, {'key': 2, 'value': 'BBB'}]
    
    # ray_dataset 全体を Pandas DataFrame に変換して出力
    print(ray_dataset.to_pandas())
    # サンプル出力:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB