すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:paiio を使用した MaxCompute テーブルの読み書き

最終更新日:Mar 11, 2026

Platform for AI (PAI) チームは、Deep Learning Containers (DLC) タスクで MaxCompute テーブルに対するデータの読み書きを簡素化するために、paiio モジュールを開発しました。 paiio モジュールは、TableRecordDataset、TableReader、TableWriter という 3 つのインターフェイスをサポートしています。 このトピックでは、これらのインターフェイスについて説明し、その使用例を示します。

制限事項

  • paiio モジュールは TensorFlow 1.12、1.15、および 2.0 をサポートしています。paiio モジュールは、DLC タスクでこれらのバージョンに対応するランタイムイメージを選択した場合にのみ使用できます。

  • paiio モジュールはカスタムイメージをサポートしていません。

事前準備: アカウント情報の構成

paiio モジュールを使用して MaxCompute テーブルからデータを読み取ったり、MaxCompute テーブルにデータを書き込んだりする前に、ご利用の MaxCompute アカウントの AccessKey を構成する必要があります。PAI は構成情報をファイルから読み取ります。構成ファイルをマウントされたファイルシステムに配置し、環境変数を使用してコード内で参照する必要があります。

  1. 以下の内容で構成ファイルを作成します。

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    パラメーター

    説明

    access_id

    ご利用の Alibaba Cloud アカウントの AccessKey ID です。

    access_key

    ご利用の Alibaba Cloud アカウントの AccessKey Secret です。

    end_point

    MaxCompute のエンドポイントです。例えば、中国 (上海) リージョンのエンドポイントは http://service.cn-shanghai.maxcompute.aliyun.com/api です。詳細については、「エンドポイント」をご参照ください。

  2. コード内で構成ファイルのパスを次のように指定します。

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    <your MaxCompute config file path> は、構成ファイルへのパスです。

TableRecordDataset の使用方法

インターフェイスの説明

TensorFlow コミュニティは、データストリームを構築するために TensorFlow 1.2 以降の Dataset インターフェイスを使用することを推奨しています。このインターフェイスは、元のスレッドおよびキューインターフェイスを置き換えるものです。詳細については、「Dataset」をご参照ください。複数の Dataset オブジェクトを組み合わせて変換し、計算用のデータを生成できます。これにより、データ入力コードが簡素化されます。

  • インターフェイス定義 (Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • パラメーター

    パラメーター

    必須

    デフォルト

    説明

    filenames

    はい

    STRING

    N/A

    読み取るテーブル名のリストです。すべてのテーブルのスキーマは同じである必要があります。テーブル名の形式は odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/... です。

    record_defaults

    はい

    LIST または TUPLE

    N/A

    出力列のデータ型変換に使用され、空の列にデフォルト値を提供します。値の数が読み取られた列の数と一致しない場合、またはデータ型を自動的に変換できない場合、システムは実行中に例外をスローします。

    サポートされているデータの型には、FLOAT32、FLOAT64、INT32、INT64、BOOL、および STRING があります。INT64 型のデフォルト値については、np.array(0, np.int64) をご参照ください。

    selected_cols

    いいえ

    STRING

    None

    選択する列を、カンマ (,) で区切られた列名の文字列として指定します。デフォルト値の None はすべての列を読み取ります。このパラメーターは excluded_cols と一緒に使用できません。

    excluded_cols

    いいえ

    STRING

    None

    除外する列を、カンマ (,) で区切られた列名の文字列として指定します。デフォルト値の None はすべての列を読み取ります。このパラメーターは selected_cols と一緒に使用できません。

    slice_id

    いいえ

    INT

    0

    分散読み取りシナリオでは、これは現在のシャードの ID (0 からの採番) です。分散読み取りの場合、システムは slice_count に基づいてテーブルを複数のシャードに分割し、slice_id に対応するシャードを読み取ります。

    slice_id が 0 (デフォルト) で slice_count が 1 の場合、テーブル全体が読み取られます。slice_count が 1 より大きい場合、0 番目のシャードが読み取られます。

    slice_count

    いいえ

    INT

    1

    分散読み取りシナリオでは、これはシャードの総数であり、通常はワーカーの数です。デフォルト値は 1 で、シャーディングは実行されず、テーブル全体が読み取られることを意味します。

    num_threads

    いいえ

    INT

    0

    各テーブルの組み込みリーダーによってデータプリフェッチのために有効にされるスレッド数です。これらのスレッドは計算スレッドとは独立しています。値は 1 から 64 の整数である必要があります。num_threads0 の場合、システムはプリフェッチスレッドの数を計算スレッドプール内のスレッド数の 4 分の 1 に自動的に設定します。

    説明

    I/O は各モデルの全体の計算に異なる影響を与えるため、プリフェッチスレッドの数を増やしても、モデルの全体のトレーニング速度の増加は保証されません。

    capacity

    いいえ

    INT

    0

    テーブルを読み取るための合計プリフェッチサイズ (行数) です。num_threads1 より大きい場合、各スレッドのプリフェッチサイズは capacity/num_threads 行 (切り上げ) です。capacity0 の場合、組み込みリーダーはテーブルの最初の N 行 (デフォルトで N=256) の平均サイズに基づいて合計プリフェッチサイズを自動的に構成します。これにより、各スレッドのプリフェッチデータは約 64 MB になります。

    説明

    MaxCompute テーブルのフィールドが DOUBLE 型の場合、TensorFlow では対応する型として `np.float64` 形式を使用する必要があります。

  • 戻り値

    データパイプラインを構築するための入力として使用できる Dataset オブジェクトを返します。

`test` という名前のテーブルが `myproject` という名前のプロジェクトに保存されていると仮定します。次の表は、データ例を示しています。

itemid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

次のコードは、TableRecordDataset インターフェイスを使用して `test` テーブルから `itemid` 列と `price` 列を読み取る方法を示しています。

import os
import tensorflow as tf
import paiio

# 構成ファイルのパスを指定します。実際のファイルのパスに置き換えてください。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 読み取るテーブルを定義します。複数のテーブルを指定できます。テーブル名と対応する MaxCompute プロジェクト名を独自のものに置き換えてください。
table = ["odps://${your_projectname}/tables/${table_name}"]
# テーブルの itemid 列と price 列を読み取るために TableRecordDataset を定義します。
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# エポックを 2、バッチサイズを 3、プリフェッチを 100 バッチに設定します。
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("データセットの終端")

TableReader の使用方法

インターフェイスの説明

TableReader は MaxCompute SDK に基づいて実装されており、TensorFlow フレームワークに依存しません。これを使用して MaxCompute テーブルに直接アクセスし、I/O 結果を即座に取得できます。

  • リーダーの作成とテーブルのオープン

    • インターフェイス定義

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • パラメーター

    • パラメーター

      必須

      デフォルト

      説明

      table

      はい

      STRING

      N/A

      開く MaxCompute テーブルの名前です。形式は odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/... です。

      selected_cols

      いいえ

      STRING

      空の文字列 ("")

      選択する列を、カンマ (,) で区切られた列名の文字列として指定します。デフォルト値の空の文字列 ("") はすべての列を読み取ります。このパラメーターは excluded_cols と一緒に使用できません。

      excluded_cols

      いいえ

      STRING

      空の文字列 ("")

      除外する列を、カンマ (,) で区切られた列名の文字列として指定します。デフォルト値の空の文字列 ("") はすべての列を読み取ります。このパラメーターは selected_cols と一緒に使用できません。

      slice_id

      いいえ

      INT

      0

      分散読み取りシナリオでは、これは現在のシャードの ID です。値は [0, slice_count-1] の範囲内である必要があります。分散読み取りの場合、システムは slice_count に基づいてテーブルを複数のシャードに分割し、slice_id に対応するシャードを読み取ります。デフォルト値は 0 で、シャーディングは実行されず、テーブルのすべての行が読み取られることを意味します。

      slice_count

      いいえ

      INT

      1

      分散読み取りシナリオでは、これはシャードの総数であり、通常はワーカーの数です。

    • 戻り値

      Reader オブジェクト

  • レコードの読み取り

    • インターフェイス定義

    • reader.read(num_records=1)
    • パラメーター

      num_records は、順次読み取る行数を指定します。デフォルト値は 1 で、1 行が読み取られることを意味します。num_records が未読の行数を超えると、残りのすべての行が返されます。読み取れるレコードがない場合、`OutOfRangeException` (paiio.python_io.OutOfRangeException) がスローされます。

    • 戻り値

      NumPy ndarray (recarray とも呼ばれます) を返します。配列内の各要素は、テーブルからの 1 行のデータを表すタプルです。

  • 特定の行へのシーク

    • インターフェイス定義

    • reader.seek(offset=0)
    • パラメーター

    • offset は、シークする行の 0 から始まるインデックスを指定します。次の読み取り操作はこの行から開始されます。slice_idslice_count が構成されている場合、シーク操作はシャードの開始位置を基準とします。offset がテーブルの総行数を超えると、`OutOfRange` 例外がスローされます。読み取り位置がすでにテーブルの終端を超えている場合、別のシーク操作は `OutOfRangeException` (paiio.python_io.OutOfRangeException) をスローします。

      重要

      データのバッチを読み取る際、残りの行数がバッチサイズより少ない場合、読み取り操作は残りの行のみを返し、例外はスローされません。ただし、その後シーク操作を実行すると、例外がスローされます。

    • 戻り値

      None。操作中にエラーが発生した場合、例外がスローされます。

  • テーブル内のレコード総数の取得

    • インターフェイス定義

    • reader.get_row_count()
    • パラメーター

      None

    • 戻り値

      テーブル内の行数を返します。slice_idslice_count が構成されている場合、代わりにシャードのサイズが返されます。

  • テーブルスキーマの取得

    • インターフェイス定義

    • reader.get_schema()
    • パラメーター

      None

    • 戻り値

    • 1 次元構造化 ndarray を返します。配列内の各要素は、MaxCompute テーブル内の選択された列のスキーマに対応し、次の 3 つのフィールドを含みます。

      パラメーター

      説明

      colname

      列名です。

      typestr

      MaxCompute データ型名です。

      pytype

      typestr に対応する Python データ型です。

      次の表は、typestrpytype のマッピングについて説明しています。

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      説明

      PAI-TensorFlow は MAP データ型に対する操作をサポートしていません。

      OBJECT

  • テーブルのクローズ

    • インターフェイス定義

    • reader.close()
    • パラメーター

      None

    • 戻り値

      None。操作中にエラーが発生した場合、例外がスローされます。

`test` という名前のテーブルが `myproject` という名前のプロジェクトに保存されていると仮定します。次の表は、データ例を示しています。

uid (BIGINT)

name (STRING)

price (DOUBLE)

virtual (BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

次のコードは、TableReader インターフェイスを使用して uidname、および price 列からデータを読み取る方法を示しています。

    import os
    import paiio
    
    # 構成ファイルのパスを指定します。実際のパスに置き換えてください。
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # テーブルを開き、リーダーオブジェクトを返します。テーブル名と対応する MaxCompute プロジェクト名を独自のものに置き換えてください。
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # テーブル内の総行数を取得します。
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # テーブルを読み取ります。戻り値は [(uid, name, price)*2] の形式の recarray になります。
    records = reader.read(batch_size) # Returns [(25, "Apple", 5.0), (38, "Pear", 4.5)]
    records = reader.read(batch_size) # Returns [(17, "Watermelon", 2.2)]
    # さらに読み取ると OutOfRange 例外がスローされます。
    
    # リーダーをクローズします。
    reader.close()

TableWriter の使用方法

TableWriter は MaxCompute SDK に基づいて実装されており、TensorFlow フレームワークに依存しません。これを使用して MaxCompute テーブルに直接データを書き込むことができます。

インターフェイスの説明

  • ライターの作成とテーブルのオープン

    • インターフェイス定義

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      説明
      • この操作は、既存のデータをクリアせずにテーブルにデータを追加します。

      • 追加されたデータを読み取る前に、テーブルをクローズする必要があります。

    • パラメーター

      パラメーター

      必須

      デフォルト

      説明

      table

      はい

      STRING

      N/A

      開く MaxCompute テーブルの名前です。形式は odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/... です。

      slice_id

      いいえ

      INT

      0

      分散シナリオでは、書き込み競合を避けるために異なるシャードにデータを書き込みます。スタンドアロンシナリオでは、デフォルト値の 0 を使用できます。マルチマシンシナリオでは、パラメータサーバ (PS) を含む複数のワーカーが同じ slice_id を使用して同時にテーブルに書き込むと、書き込み操作は失敗します。

    • 戻り値

      Writer オブジェクトを返します。

  • レコードの書き込み

    • インターフェイス定義

      writer.write(values, indices)
    • パラメーター

      パラメーター

      必須

      デフォルト

      説明

      values

      はい

      STRING

      N/A

      書き込むデータです。単一行または複数行を書き込むことができます。

      • 単一行を書き込むには、スカラーで構成される TUPLE、LIST、または 1D-ndarray を values パラメーターに渡します。LIST または ndarray を渡す場合、書き込むすべての列が同じデータ型であることを示します。

      • N 行 (N>=1) を書き込むには、LIST または 1D-ndarray を values パラメーターに渡すことができます。パラメーター内の各要素は、TUPLE または LIST として表される単一行のデータに対応する必要があります。また、ndarray 内に構造化された形式で保存することもできます。

      indices

      はい

      INT

      N/A

      データを書き込む列を指定します。INT 型のインデックスで構成される TUPLE、LIST、または 1D-ndarray を渡すことができます。indices 内の各数値 (i) は、テーブル内の i 番目の列 (0 からの採番) に対応します。

    • 戻り値

      None。書き込みプロセス中にエラーが発生した場合、例外がスローされ、プロセスが終了します。

  • テーブルのクローズ

    • インターフェイス定義

      writer.close()
      説明

      `with` ステートメントブロックを使用する場合、テーブルをクローズするために `close()` メソッドを明示的に呼び出す必要はありません。

    • パラメーター

      None

    • 戻り値

      None。操作中にエラーが発生した場合、例外がスローされます。

    • 次のコードに示すように、`with` ステートメントで TableWriter を使用します。

      with paiio.python_io.TableWriter(table) as writer:
        # 書き込み用の値を準備します。
          writer.write(values, indices)
          # このセクションの外でテーブルは自動的にクローズされます。

import paiio
import os

# 構成ファイルのパスを指定します。実際のパスに置き換えてください。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# データを準備します。
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# テーブルを開き、ライターオブジェクトを返します。テーブル名と対応する MaxCompute プロジェクト名を独自のものに置き換えてください。
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# テーブルの 0 から 3 列にレコードを書き込みます。
records = writer.write(values, indices=[0, 1, 2, 3])

# ライターをクローズします。
writer.close()

次のステップ

コードを構成したら、次の手順に従って paiio を使用して MaxCompute テーブルからデータを読み取ったり、MaxCompute テーブルにデータを書き込んだりします。

  1. データセットを作成し、構成ファイルとコードファイルをデータソースにアップロードします。詳細については、「データセットの管理」をご参照ください。

  2. DLC タスクを作成します。主要なパラメータ設定は次のとおりです。その他のパラメータ設定の詳細については、「トレーニングタスクの作成」をご参照ください。

    • [ノードイメージ]: [PAI 公式イメージ] の場合、TensorFlow 1.12、TensorFlow 1.15、または TensorFlow 2.0 に対応するイメージを選択します。

    • [データセット構成]: [データセット] の場合、前のステップで作成したデータセットを選択します。[マウントパス]/mnt/data/ に設定します。

    • [実行コマンド]: このパラメーターを python /mnt/data/xxx.py に設定します。`xxx.py` を最初のステップでアップロードしたコードファイル名に置き換えてください。

  3. [OK] をクリックします。

    トレーニングタスクが送信された後、インスタンスログで結果を表示できます。詳細については、「タスクログの表示」をご参照ください。