このトピックでは、大規模データ処理に MaxFrame DataFrame.mf.apply_chunk オペレーターを使用する方法について説明します。コアな使用法、よくある間違い、および本番環境でのベストプラクティスについて解説します。
apply_chunk 関数の概要
分散コンピューティングフレームワークでは、次のようになります:
標準の
pandas.DataFrame.apply():これは単一マシンでの操作であり、スケールできません。
MaxFrame の
df.apply():行ごとまたはチャンクごとの分散マッピングをサポートします。
ただし、
apply()はデフォルトで行ごとにデータを処理するため、大規模データ処理には非効率です。
MaxFrame の apply_chunk() 関数は、次の特徴を備えています:
データをバッチで処理し、
batch_rowsを使用して各チャンクのサイズを制御できます。output_type、dtypes、index_valueなどのカスタム出力型と構造をサポートします。@with_python_requirementsなどのユーザー定義関数 (UDF) デコレーターをサポートし、複雑なタスクを開発できます。
パフォーマンスが重視されるタスクでは、apply_chunk の使用を推奨します。
シグネチャの分析
DataFrame.mf.apply_chunk(
func,
batch_rows=None,
output_type=None,
dtypes=None,
index=None,
index_value=None,
columns=None,
elementwise=None,
sort=False,
**kwds
)パラメーターの説明
パラメーター | 型 | 説明 |
func | callable | pandas DataFrame を受け取り、pandas DataFrame または Series を出力するユーザー定義関数 (UDF) です。 この関数への入力となる pandas DataFrame は、DataFrame のチャンクであり、行のバッチとして扱うことができます。 |
batch_rows | int | 各チャンクの最大行数です。 |
output_type | str | 出力型です。例:「dataframe」、「series」。 |
dtypes | pd.Series | 出力列のデータ型です。 |
index | Index | 出力インデックスオブジェクトです。 |
index_value | IndexValue | 分散インデックスのメタデータです。元の DataFrame から取得します。 |
sort | bool | groupby シナリオで、グループ内のデータをソートするかどうかを指定します。 |
使用例
import os
import pyarrow as pa
import pandas as pd
import maxframe.dataframe as md
from maxframe.lib.dtypes_extension import dict_
from maxframe import new_session
from odps import ODPS
o = ODPS(
# ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数がご自身の AccessKey ID に設定されていることを確認します。
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数がご自身の AccessKey Secret に設定されていることを確認します。
# AccessKey ID と AccessKey Secret の文字列を直接使用しないでください。
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your project>',
endpoint='https://service.cn-<your region>.maxcompute.aliyun.com/api',
)
session = new_session(o)
# テストデータ (pandas DataFrame) の作成
col_a = pd.Series(
data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
index=[1, 2, 3],
dtype=dict_(pa.string(), pa.int64()),
)
col_b = pd.Series(
data=["A", "B", "C"],
index=[1, 2, 3],
)
df = md.DataFrame({"A": col_a, "B": col_b})
df.execute()
# カスタム関数の定義
def custom_set_item(df):
for name, value in df["A"].items():
if value is not None:
df["A"][name]["x"] = 100
return df
# apply_chunk の呼び出し
result_df = df.mf.apply_chunk(
custom_set_item,
output_type="dataframe",
dtypes=df.dtypes.copy(),
batch_rows=2,
skip_infer=True,
index=df.index,
).execute()
session.destroy()
実行とパフォーマンスチューニング
output_type と dtypes の明示的な宣言
型推論に依存しないでください。ランタイムエラーやパフォーマンスの低下を引き起こす可能性があります。
result_df = df.mf.apply_chunk(<process>) # dtypes がありません!推奨:明示的な宣言
重要元の
dfのdtypesを直接変更しないでください。代わりに、.copy()を使用するか、新しい DataFrame を作成してください。上記のコードを次のように変更します:
def process(df_chunk): # ここにロジックを追加することも、単にチャンクを返すこともできます。重要なのは出力構造です。 return df_chunk.copy() # 'A' と 'B' を含む DataFrame を返します。 def get_incorrect_dtypes(df): # UDF の出力と意図的に一致しない不完全な dtypes を返します。 return df.dtypes.drop('A') incorrect_dtypes = get_incorrect_dtypes(df) print("\n--- dtypes の不一致で apply_chunk を呼び出してみます (エラーが想定されます) ---") try: result_df = df.mf.apply_chunk( process, output_type="dataframe", dtypes=incorrect_dtypes # 不正な dtypes が使用されています。 ).execute() except Exception as e: print("想定されるエラーを正常にキャッチしました!エラーメッセージは次のとおりです:") print(f"エラータイプ: {type(e)}") print(f"エラー詳細: {e}\n") # 通常、エラーは 'ValueError: forward method expected 1 arguments, but get 2' のようになります。 # これは、dtypes が 1 つの列 ('B') しか指定していないのに対し、UDF が 2 つの列 ('A', 'B') を返すためです。 print("--- 正しい使用方法は次のとおりです ---") # 正しい方法として、dtypes は UDF (process) の出力を正確に記述する必要があります。 # process は完全な DataFrame を返すため、正しい dtypes は元の dtypes です。 correct_dtypes = df.dtypes.copy() result_df = df.mf.apply_chunk( custom_set_item, output_type="dataframe", dtypes=correct_dtypes, index=df.index ) final_result = result_df.execute().fetch() print("\n正しい dtypes を使用した後、実行結果は次のとおりです:") print(final_result) session.destroy()
batch_rows の設定によるメモリと並列度の制御
単一タスクでのメモリ不足 (OOM) エラーを回避します。
並列度を向上させます。
@with_running_options(memory=...)を使用して、リソース使用率を向上させます。
デバッグのヒント:中間結果の出力と例外のキャッチ
UDF はリモートの MaxCompute ワーカーで実行されるため、標準の print ログは LogView で確認できます。
flush=True を設定すると、ログが迅速に出力されるようになり、トラブルシューティングが容易になります。
def process(chunk):
try:
print(f"Processing chunk with shape: {chunk.shape}", flush=True)
print(f"Columns: {list(chunk.columns)}", flush=True)
result = chunk.sort_values("B")
print("Success.", flush=True)
return result
except Exception as e:
print(f"[ERROR] Failed to process chunk: {str(e)}", flush=True)
raise
パフォーマンスチューニングの推奨事項
バッチサイズ:データ量と利用可能なリソースに基づいて
batch_rowsを設定します。大きな値に設定することは避けてください。出力列の数:必要なフィールドのみを返します。
関数の複雑さ:UDF での負荷の高い計算は避けてください。
外部依存関係がある場合は、
@with_running_options(memory=16)を使用してメモリを増やすことができます。
よくある質問
TypeError: cannot determine dtype
原因
dtypesパラメーターが指定されていません。解決策
pd.Series型を明示的に渡します。
空の出力または列の欠落
原因
dtypesが一致していません。解決策
関数の戻り値の列名が一貫していることを確認します。
実行のタイムアウトまたはハングアップ
原因
batch_rowsの値が大きすぎます。解決策
バッチサイズを小さくし、より多くのリソースを割り当てます。