すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:MaxFrame apply_chunk オペレーターのベストプラクティス

最終更新日:Jan 10, 2026

このトピックでは、大規模データ処理に MaxFrame DataFrame.mf.apply_chunk オペレーターを使用する方法について説明します。コアな使用法、よくある間違い、および本番環境でのベストプラクティスについて解説します。

apply_chunk 関数の概要

分散コンピューティングフレームワークでは、次のようになります:

  • 標準の pandas.DataFrame.apply()

    これは単一マシンでの操作であり、スケールできません。

  • MaxFrame の df.apply()

    • 行ごとまたはチャンクごとの分散マッピングをサポートします。

    • ただし、apply() はデフォルトで行ごとにデータを処理するため、大規模データ処理には非効率です。

MaxFrame の apply_chunk() 関数は、次の特徴を備えています:

  • データをバッチで処理し、batch_rows を使用して各チャンクのサイズを制御できます。

  • output_typedtypesindex_value などのカスタム出力型と構造をサポートします。

  • @with_python_requirements などのユーザー定義関数 (UDF) デコレーターをサポートし、複雑なタスクを開発できます。

パフォーマンスが重視されるタスクでは、apply_chunk の使用を推奨します。

シグネチャの分析

DataFrame.mf.apply_chunk(
    func,
    batch_rows=None,
    output_type=None,
    dtypes=None,
    index=None,
    index_value=None,
    columns=None,
    elementwise=None,
    sort=False,
    **kwds
)

パラメーターの説明

パラメーター

説明

func

callable

pandas DataFrame を受け取り、pandas DataFrame または Series を出力するユーザー定義関数 (UDF) です。

この関数への入力となる pandas DataFrame は、DataFrame のチャンクであり、行のバッチとして扱うことができます。

batch_rows

int

各チャンクの最大行数です。

output_type

str

出力型です。例:「dataframe」、「series」。

dtypes

pd.Series

出力列のデータ型です。

index

Index

出力インデックスオブジェクトです。

index_value

IndexValue

分散インデックスのメタデータです。元の DataFrame から取得します。

sort

bool

groupby シナリオで、グループ内のデータをソートするかどうかを指定します。

使用例

import os
import pyarrow as pa
import pandas as pd
import maxframe.dataframe as md
from maxframe.lib.dtypes_extension import dict_
from maxframe import new_session
from odps import ODPS

o = ODPS(
    # ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数がご自身の AccessKey ID に設定されていることを確認します。
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数がご自身の AccessKey Secret に設定されていることを確認します。
    # AccessKey ID と AccessKey Secret の文字列を直接使用しないでください。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<your region>.maxcompute.aliyun.com/api',
)

session = new_session(o)

# テストデータ (pandas DataFrame) の作成
col_a = pd.Series(
    data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
    index=[1, 2, 3],
    dtype=dict_(pa.string(), pa.int64()),
)
col_b = pd.Series(
    data=["A", "B", "C"],
    index=[1, 2, 3],
)
df = md.DataFrame({"A": col_a, "B": col_b})
df.execute()


# カスタム関数の定義
def custom_set_item(df):
    for name, value in df["A"].items():
        if value is not None:
            df["A"][name]["x"] = 100
    return df


# apply_chunk の呼び出し
result_df = df.mf.apply_chunk(
    custom_set_item,
    output_type="dataframe",
    dtypes=df.dtypes.copy(),
    batch_rows=2,
    skip_infer=True,
    index=df.index,
).execute()

session.destroy()

実行とパフォーマンスチューニング

output_typedtypes の明示的な宣言

  • 型推論に依存しないでください。ランタイムエラーやパフォーマンスの低下を引き起こす可能性があります。

    result_df = df.mf.apply_chunk(<process>)  # dtypes がありません!
  • 推奨:明示的な宣言

    重要

    元の dfdtypes を直接変更しないでください。代わりに、.copy() を使用するか、新しい DataFrame を作成してください。

    上記のコードを次のように変更します:

    def process(df_chunk):
        # ここにロジックを追加することも、単にチャンクを返すこともできます。重要なのは出力構造です。
        return df_chunk.copy() # 'A' と 'B' を含む DataFrame を返します。
        
    def get_incorrect_dtypes(df):
        # UDF の出力と意図的に一致しない不完全な dtypes を返します。
        return df.dtypes.drop('A')
    
    incorrect_dtypes = get_incorrect_dtypes(df)
          
    print("\n--- dtypes の不一致で apply_chunk を呼び出してみます (エラーが想定されます) ---")
    try:
        result_df = df.mf.apply_chunk(
            process,
            output_type="dataframe",
            dtypes=incorrect_dtypes  # 不正な dtypes が使用されています。
        ).execute()
    except Exception as e:
        print("想定されるエラーを正常にキャッチしました!エラーメッセージは次のとおりです:")
        print(f"エラータイプ: {type(e)}")
        print(f"エラー詳細: {e}\n")
        # 通常、エラーは 'ValueError: forward method expected 1 arguments, but get 2' のようになります。
        # これは、dtypes が 1 つの列 ('B') しか指定していないのに対し、UDF が 2 つの列 ('A', 'B') を返すためです。
    
    print("--- 正しい使用方法は次のとおりです ---")
    #    正しい方法として、dtypes は UDF (process) の出力を正確に記述する必要があります。
    #    process は完全な DataFrame を返すため、正しい dtypes は元の dtypes です。
    correct_dtypes = df.dtypes.copy()
    
    result_df = df.mf.apply_chunk(
        custom_set_item,
        output_type="dataframe",
        dtypes=correct_dtypes,
        index=df.index
    )
    
    final_result = result_df.execute().fetch()
    
    print("\n正しい dtypes を使用した後、実行結果は次のとおりです:")
    print(final_result)
    
    session.destroy()

batch_rows の設定によるメモリと並列度の制御

  • 単一タスクでのメモリ不足 (OOM) エラーを回避します。

  • 並列度を向上させます。

  • @with_running_options(memory=...) を使用して、リソース使用率を向上させます。

デバッグのヒント:中間結果の出力と例外のキャッチ

UDF はリモートの MaxCompute ワーカーで実行されるため、標準の print ログは LogView で確認できます。

flush=True を設定すると、ログが迅速に出力されるようになり、トラブルシューティングが容易になります。

def process(chunk):
    try:
        print(f"Processing chunk with shape: {chunk.shape}", flush=True)
        print(f"Columns: {list(chunk.columns)}", flush=True)
        result = chunk.sort_values("B")
        print("Success.", flush=True)
        return result
    except Exception as e:
        print(f"[ERROR] Failed to process chunk: {str(e)}", flush=True)
        raise
        

パフォーマンスチューニングの推奨事項

  • バッチサイズ:データ量と利用可能なリソースに基づいて batch_rows を設定します。大きな値に設定することは避けてください。

  • 出力列の数:必要なフィールドのみを返します。

  • 関数の複雑さ:UDF での負荷の高い計算は避けてください。

  • 外部依存関係がある場合は、@with_running_options(memory=16) を使用してメモリを増やすことができます。

よくある質問

TypeError: cannot determine dtype

  • 原因

    dtypes パラメーターが指定されていません。

  • 解決策

    pd.Series 型を明示的に渡します。

空の出力または列の欠落

  • 原因

    dtypes が一致していません。

  • 解決策

    関数の戻り値の列名が一貫していることを確認します。

実行のタイムアウトまたはハングアップ

  • 原因

    batch_rows の値が大きすぎます。

  • 解決策

    バッチサイズを小さくし、より多くのリソースを割り当てます。