Platform for AI (PAI) チームは、Deep Learning Containers (DLC) タスクで MaxCompute テーブルに対するデータの読み書きを簡素化するために、paiio モジュールを開発しました。 paiio モジュールは、TableRecordDataset、TableReader、TableWriter という 3 つのインターフェイスをサポートしています。 このトピックでは、これらのインターフェイスについて説明し、その使用例を示します。
制限事項
-
paiio モジュールは TensorFlow 1.12、1.15、および 2.0 をサポートしています。paiio モジュールは、DLC タスクでこれらのバージョンに対応するランタイムイメージを選択した場合にのみ使用できます。
-
paiio モジュールはカスタムイメージをサポートしていません。
事前準備: アカウント情報の構成
paiio モジュールを使用して MaxCompute テーブルからデータを読み取ったり、MaxCompute テーブルにデータを書き込んだりする前に、ご利用の MaxCompute アカウントの AccessKey を構成する必要があります。PAI は構成情報をファイルから読み取ります。構成ファイルをマウントされたファイルシステムに配置し、環境変数を使用してコード内で参照する必要があります。
-
以下の内容で構成ファイルを作成します。
access_id=xxxx access_key=xxxx end_point=http://xxxxパラメーター
説明
access_id
ご利用の Alibaba Cloud アカウントの AccessKey ID です。
access_key
ご利用の Alibaba Cloud アカウントの AccessKey Secret です。
end_point
MaxCompute のエンドポイントです。例えば、中国 (上海) リージョンのエンドポイントは
http://service.cn-shanghai.maxcompute.aliyun.com/apiです。詳細については、「エンドポイント」をご参照ください。 -
コード内で構成ファイルのパスを次のように指定します。
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'<your MaxCompute config file path> は、構成ファイルへのパスです。
TableRecordDataset の使用方法
インターフェイスの説明
TensorFlow コミュニティは、データストリームを構築するために TensorFlow 1.2 以降の Dataset インターフェイスを使用することを推奨しています。このインターフェイスは、元のスレッドおよびキューインターフェイスを置き換えるものです。詳細については、「Dataset」をご参照ください。複数の Dataset オブジェクトを組み合わせて変換し、計算用のデータを生成できます。これにより、データ入力コードが簡素化されます。
-
インターフェイス定義 (Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0): -
パラメーター
パラメーター
必須
型
デフォルト
説明
filenames
はい
STRING
N/A
読み取るテーブル名のリストです。すべてのテーブルのスキーマは同じである必要があります。テーブル名の形式は
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...です。record_defaults
はい
LIST または TUPLE
N/A
出力列のデータ型変換に使用され、空の列にデフォルト値を提供します。値の数が読み取られた列の数と一致しない場合、またはデータ型を自動的に変換できない場合、システムは実行中に例外をスローします。
サポートされているデータの型には、FLOAT32、FLOAT64、INT32、INT64、BOOL、および STRING があります。INT64 型のデフォルト値については、
np.array(0, np.int64)をご参照ください。selected_cols
いいえ
STRING
None
選択する列を、カンマ (,) で区切られた列名の文字列として指定します。デフォルト値の None はすべての列を読み取ります。このパラメーターは excluded_cols と一緒に使用できません。
excluded_cols
いいえ
STRING
None
除外する列を、カンマ (,) で区切られた列名の文字列として指定します。デフォルト値の None はすべての列を読み取ります。このパラメーターは selected_cols と一緒に使用できません。
slice_id
いいえ
INT
0
分散読み取りシナリオでは、これは現在のシャードの ID (0 からの採番) です。分散読み取りの場合、システムは slice_count に基づいてテーブルを複数のシャードに分割し、slice_id に対応するシャードを読み取ります。
slice_id が 0 (デフォルト) で slice_count が 1 の場合、テーブル全体が読み取られます。slice_count が 1 より大きい場合、0 番目のシャードが読み取られます。
slice_count
いいえ
INT
1
分散読み取りシナリオでは、これはシャードの総数であり、通常はワーカーの数です。デフォルト値は 1 で、シャーディングは実行されず、テーブル全体が読み取られることを意味します。
num_threads
いいえ
INT
0
各テーブルの組み込みリーダーによってデータプリフェッチのために有効にされるスレッド数です。これらのスレッドは計算スレッドとは独立しています。値は 1 から 64 の整数である必要があります。num_threads が 0 の場合、システムはプリフェッチスレッドの数を計算スレッドプール内のスレッド数の 4 分の 1 に自動的に設定します。
説明I/O は各モデルの全体の計算に異なる影響を与えるため、プリフェッチスレッドの数を増やしても、モデルの全体のトレーニング速度の増加は保証されません。
capacity
いいえ
INT
0
テーブルを読み取るための合計プリフェッチサイズ (行数) です。num_threads が 1 より大きい場合、各スレッドのプリフェッチサイズは capacity/num_threads 行 (切り上げ) です。capacity が 0 の場合、組み込みリーダーはテーブルの最初の N 行 (デフォルトで N=256) の平均サイズに基づいて合計プリフェッチサイズを自動的に構成します。これにより、各スレッドのプリフェッチデータは約 64 MB になります。
説明MaxCompute テーブルのフィールドが DOUBLE 型の場合、TensorFlow では対応する型として `np.float64` 形式を使用する必要があります。
-
戻り値
データパイプラインを構築するための入力として使用できる Dataset オブジェクトを返します。
例
`test` という名前のテーブルが `myproject` という名前のプロジェクトに保存されていると仮定します。次の表は、データ例を示しています。
|
itemid (BIGINT) |
name (STRING) |
price (DOUBLE) |
virtual (BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
次のコードは、TableRecordDataset インターフェイスを使用して `test` テーブルから `itemid` 列と `price` 列を読み取る方法を示しています。
import os
import tensorflow as tf
import paiio
# 構成ファイルのパスを指定します。実際のファイルのパスに置き換えてください。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 読み取るテーブルを定義します。複数のテーブルを指定できます。テーブル名と対応する MaxCompute プロジェクト名を独自のものに置き換えてください。
table = ["odps://${your_projectname}/tables/${table_name}"]
# テーブルの itemid 列と price 列を読み取るために TableRecordDataset を定義します。
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# エポックを 2、バッチサイズを 3、プリフェッチを 100 バッチに設定します。
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("データセットの終端")
TableReader の使用方法
インターフェイスの説明
TableReader は MaxCompute SDK に基づいて実装されており、TensorFlow フレームワークに依存しません。これを使用して MaxCompute テーブルに直接アクセスし、I/O 結果を即座に取得できます。
-
リーダーの作成とテーブルのオープン
-
インターフェイス定義
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1): -
-
パラメーター
-
戻り値
Reader オブジェクト
|
パラメーター |
必須 |
型 |
デフォルト |
説明 |
|
table |
はい |
STRING |
N/A |
開く MaxCompute テーブルの名前です。形式は |
|
selected_cols |
いいえ |
STRING |
空の文字列 ("") |
選択する列を、カンマ (,) で区切られた列名の文字列として指定します。デフォルト値の空の文字列 ("") はすべての列を読み取ります。このパラメーターは excluded_cols と一緒に使用できません。 |
|
excluded_cols |
いいえ |
STRING |
空の文字列 ("") |
除外する列を、カンマ (,) で区切られた列名の文字列として指定します。デフォルト値の空の文字列 ("") はすべての列を読み取ります。このパラメーターは selected_cols と一緒に使用できません。 |
|
slice_id |
いいえ |
INT |
0 |
分散読み取りシナリオでは、これは現在のシャードの ID です。値は [0, slice_count-1] の範囲内である必要があります。分散読み取りの場合、システムは slice_count に基づいてテーブルを複数のシャードに分割し、slice_id に対応するシャードを読み取ります。デフォルト値は 0 で、シャーディングは実行されず、テーブルのすべての行が読み取られることを意味します。 |
|
slice_count |
いいえ |
INT |
1 |
分散読み取りシナリオでは、これはシャードの総数であり、通常はワーカーの数です。 |
レコードの読み取り
-
インターフェイス定義
reader.read(num_records=1)
パラメーター
num_records は、順次読み取る行数を指定します。デフォルト値は 1 で、1 行が読み取られることを意味します。num_records が未読の行数を超えると、残りのすべての行が返されます。読み取れるレコードがない場合、`OutOfRangeException` (paiio.python_io.OutOfRangeException) がスローされます。
戻り値
NumPy ndarray (recarray とも呼ばれます) を返します。配列内の各要素は、テーブルからの 1 行のデータを表すタプルです。
特定の行へのシーク
-
インターフェイス定義
reader.seek(offset=0)
パラメーター
offset は、シークする行の 0 から始まるインデックスを指定します。次の読み取り操作はこの行から開始されます。slice_id と slice_count が構成されている場合、シーク操作はシャードの開始位置を基準とします。offset がテーブルの総行数を超えると、`OutOfRange` 例外がスローされます。読み取り位置がすでにテーブルの終端を超えている場合、別のシーク操作は `OutOfRangeException` (paiio.python_io.OutOfRangeException) をスローします。
データのバッチを読み取る際、残りの行数がバッチサイズより少ない場合、読み取り操作は残りの行のみを返し、例外はスローされません。ただし、その後シーク操作を実行すると、例外がスローされます。
戻り値
None。操作中にエラーが発生した場合、例外がスローされます。
テーブル内のレコード総数の取得
-
インターフェイス定義
reader.get_row_count()
パラメーター
None
戻り値
テーブル内の行数を返します。slice_id と slice_count が構成されている場合、代わりにシャードのサイズが返されます。
テーブルスキーマの取得
-
インターフェイス定義
reader.get_schema()
パラメーター
None
戻り値
1 次元構造化 ndarray を返します。配列内の各要素は、MaxCompute テーブル内の選択された列のスキーマに対応し、次の 3 つのフィールドを含みます。
|
パラメーター |
説明 |
|
colname |
列名です。 |
|
typestr |
MaxCompute データ型名です。 |
|
pytype |
typestr に対応する Python データ型です。 |
次の表は、typestr と pytype のマッピングについて説明しています。
|
typestr |
pytype |
|
BIGINT |
INT |
|
DOUBLE |
FLOAT |
|
BOOLEAN |
BOOL |
|
STRING |
OBJECT |
|
DATETIME |
INT |
|
MAP 説明
PAI-TensorFlow は MAP データ型に対する操作をサポートしていません。 |
OBJECT |
テーブルのクローズ
-
インターフェイス定義
reader.close()
パラメーター
None
戻り値
None。操作中にエラーが発生した場合、例外がスローされます。
例
`test` という名前のテーブルが `myproject` という名前のプロジェクトに保存されていると仮定します。次の表は、データ例を示しています。
|
uid (BIGINT) |
name (STRING) |
price (DOUBLE) |
virtual (BOOL) |
|
25 |
"Apple" |
5.0 |
False |
|
38 |
"Pear" |
4.5 |
False |
|
17 |
"Watermelon" |
2.2 |
False |
次のコードは、TableReader インターフェイスを使用して uid、name、および price 列からデータを読み取る方法を示しています。
import os
import paiio
# 構成ファイルのパスを指定します。実際のパスに置き換えてください。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# テーブルを開き、リーダーオブジェクトを返します。テーブル名と対応する MaxCompute プロジェクト名を独自のものに置き換えてください。
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# テーブル内の総行数を取得します。
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# テーブルを読み取ります。戻り値は [(uid, name, price)*2] の形式の recarray になります。
records = reader.read(batch_size) # Returns [(25, "Apple", 5.0), (38, "Pear", 4.5)]
records = reader.read(batch_size) # Returns [(17, "Watermelon", 2.2)]
# さらに読み取ると OutOfRange 例外がスローされます。
# リーダーをクローズします。
reader.close()
TableWriter の使用方法
TableWriter は MaxCompute SDK に基づいて実装されており、TensorFlow フレームワークに依存しません。これを使用して MaxCompute テーブルに直接データを書き込むことができます。
インターフェイスの説明
-
ライターの作成とテーブルのオープン
-
インターフェイス定義
writer = paiio.python_io.TableWriter(table, slice_id=0)説明-
この操作は、既存のデータをクリアせずにテーブルにデータを追加します。
-
追加されたデータを読み取る前に、テーブルをクローズする必要があります。
-
-
パラメーター
パラメーター
必須
型
デフォルト
説明
table
はい
STRING
N/A
開く MaxCompute テーブルの名前です。形式は
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...です。slice_id
いいえ
INT
0
分散シナリオでは、書き込み競合を避けるために異なるシャードにデータを書き込みます。スタンドアロンシナリオでは、デフォルト値の 0 を使用できます。マルチマシンシナリオでは、パラメータサーバ (PS) を含む複数のワーカーが同じ slice_id を使用して同時にテーブルに書き込むと、書き込み操作は失敗します。
-
戻り値
Writer オブジェクトを返します。
-
-
レコードの書き込み
-
インターフェイス定義
writer.write(values, indices) -
パラメーター
パラメーター
必須
型
デフォルト
説明
values
はい
STRING
N/A
書き込むデータです。単一行または複数行を書き込むことができます。
-
単一行を書き込むには、スカラーで構成される TUPLE、LIST、または 1D-ndarray を values パラメーターに渡します。LIST または ndarray を渡す場合、書き込むすべての列が同じデータ型であることを示します。
-
N 行 (N>=1) を書き込むには、LIST または 1D-ndarray を values パラメーターに渡すことができます。パラメーター内の各要素は、TUPLE または LIST として表される単一行のデータに対応する必要があります。また、ndarray 内に構造化された形式で保存することもできます。
indices
はい
INT
N/A
データを書き込む列を指定します。INT 型のインデックスで構成される TUPLE、LIST、または 1D-ndarray を渡すことができます。indices 内の各数値 (i) は、テーブル内の i 番目の列 (0 からの採番) に対応します。
-
-
戻り値
None。書き込みプロセス中にエラーが発生した場合、例外がスローされ、プロセスが終了します。
-
-
テーブルのクローズ
-
インターフェイス定義
writer.close()説明`with` ステートメントブロックを使用する場合、テーブルをクローズするために `close()` メソッドを明示的に呼び出す必要はありません。
-
パラメーター
None
-
戻り値
None。操作中にエラーが発生した場合、例外がスローされます。
-
例
次のコードに示すように、`with` ステートメントで TableWriter を使用します。
with paiio.python_io.TableWriter(table) as writer: # 書き込み用の値を準備します。 writer.write(values, indices) # このセクションの外でテーブルは自動的にクローズされます。
-
例
import paiio
import os
# 構成ファイルのパスを指定します。実際のパスに置き換えてください。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# データを準備します。
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# テーブルを開き、ライターオブジェクトを返します。テーブル名と対応する MaxCompute プロジェクト名を独自のものに置き換えてください。
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# テーブルの 0 から 3 列にレコードを書き込みます。
records = writer.write(values, indices=[0, 1, 2, 3])
# ライターをクローズします。
writer.close()
次のステップ
コードを構成したら、次の手順に従って paiio を使用して MaxCompute テーブルからデータを読み取ったり、MaxCompute テーブルにデータを書き込んだりします。
-
データセットを作成し、構成ファイルとコードファイルをデータソースにアップロードします。詳細については、「データセットの管理」をご参照ください。
-
DLC タスクを作成します。主要なパラメータ設定は次のとおりです。その他のパラメータ設定の詳細については、「トレーニングタスクの作成」をご参照ください。
-
[ノードイメージ]: [PAI 公式イメージ] の場合、TensorFlow 1.12、TensorFlow 1.15、または TensorFlow 2.0 に対応するイメージを選択します。
-
[データセット構成]: [データセット] の場合、前のステップで作成したデータセットを選択します。[マウントパス] を
/mnt/data/に設定します。 -
[実行コマンド]: このパラメーターを
python /mnt/data/xxx.pyに設定します。`xxx.py` を最初のステップでアップロードしたコードファイル名に置き換えてください。
-
-
[OK] をクリックします。
トレーニングタスクが送信された後、インスタンスログで結果を表示できます。詳細については、「タスクログの表示」をご参照ください。