全部產品
Search
文件中心

MaxCompute:MaxFrame apply_chunk運算元使用實踐

更新時間:Jan 10, 2026

本文介紹如何使用MaxFrame DataFrame.mf.apply_chunk運算元實現大規模資料處理,內容涵蓋其在MaxFrame常見情境下的核心用法、常見誤區及生產級最佳實務。

apply_chunk功能介紹

在分散式運算架構中:

  • 標準pandas.DataFrame.apply()

    單機操作,無法擴充。

  • MaxFrame df.apply()

    • 支援分布式的逐行/逐區塊對應

    • apply() 預設按“行”執行計算,對於大規模資料處理效率不高。

而MaxFrame apply_chunk()可以滿足如下功能:

  • 按批處理,顯式控制每個chunk的大小,即batch_rows

  • 支援自訂輸出類型與結構(如:output_typedtypesindex_value)。

  • 支援結合UDF 裝飾器(如 @with_python_requirements),實現複雜任務開發。

對效能敏感的任務,建議優先使用apply_chunk

簽名解析

DataFrame.mf.apply_chunk(
    func,
    batch_rows=None,
    output_type=None,
    dtypes=None,
    index=None,
    index_value=None,
    columns=None,
    elementwise=None,
    sort=False,
    **kwds
)

參數說明

參數

類型

說明

func

callable

使用者自訂函數,接收 Pandas DataFrame 並輸出 Pandas DataFrame/Series。

該函數的輸入Pandas DataFrame 是DataFrame的一個區塊,可視為一批行資料。

batch_rows

int

每個chunk的最大行數。

output_type

str

輸出類型。例如"dataframe"、"series"等。

dtypes

pd.Series

輸出資料行的資料類型。

index

Index

輸出索引對象。

index_value

IndexValue

分布式索引元資訊,建議從原df擷取。

sort

bool

在groupby情境下,是否對group內部排序。

使用樣本

import os
import pyarrow as pa
import pandas as pd
import maxframe.dataframe as md
from maxframe.lib.dtypes_extension import dict_
from maxframe import new_session
from odps import ODPS

o = ODPS(
    # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
    # 不建議直接使用AccessKey ID和 AccessKey Secret字串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<your region>.maxcompute.aliyun.com/api',
)

session = new_session(o)

# 構建測試資料(Pandas DataFrame)
col_a = pd.Series(
    data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
    index=[1, 2, 3],
    dtype=dict_(pa.string(), pa.int64()),
)
col_b = pd.Series(
    data=["A", "B", "C"],
    index=[1, 2, 3],
)
df = md.DataFrame({"A": col_a, "B": col_b})
df.execute()


# 自訂函數
def custom_set_item(df):
    for name, value in df["A"].items():
        if value is not None:
            df["A"][name]["x"] = 100
    return df


# 調用 apply_chunk
result_df = df.mf.apply_chunk(
    custom_set_item,
    output_type="dataframe",
    dtypes=df.dtypes.copy(),
    batch_rows=2,
    skip_infer=True,
    index=df.index,
).execute()

session.destroy()

執行及效能調優建議

顯式聲明 output_type 和 dtypes

  • 不建議依賴類型推斷,可能會導致運行失敗或效能下降:

    result_df = df.mf.apply_chunk(<process>)  # 缺少 dtypes!
  • 推薦顯示聲明

    重要

    不能直接修改原dfdtypes,應使用.copy() 或重新構造。

    在上文代碼中進一步修改:

    def process(df_chunk):
        # 這裡可以有邏輯,也可以簡單地直接返回,關鍵是它的輸出結構
        return df_chunk.copy() # 返回一個包含 'A' 和 'B' 的 DataFrame
        
    def get_incorrect_dtypes(df):
        # 返回一個不完整的 dtypes,故意與 UDF 的輸出不符
        return df.dtypes.drop('A')
    
    incorrect_dtypes = get_incorrect_dtypes(df)
          
    print("\n--- 正在嘗試使用不匹配的 dtypes 調用 apply_chunk (預期會報錯) ---")
    try:
        result_df = df.mf.apply_chunk(
            process,
            output_type="dataframe",
            dtypes=incorrect_dtypes  # 使用了錯誤的 dtypes
        ).execute()
    except Exception as e:
        print("成功捕獲到預期錯誤!錯誤資訊如下:")
        print(f"錯誤類型: {type(e)}")
        print(f"錯誤詳情: {e}\n")
        # 錯誤通常會是類似 'ValueError: forward method expected 1 arguments, but get 2'
        # 因為 dtypes 說只有1列 ('B'),但 UDF 實際返回了2列 ('A', 'B')
    
    print("--- 正確使用方式如下 ---")
    #    正確的方式是,dtypes 必須精確描述 UDF(process) 的輸出
    #    因為 process 返回了完整的 DataFrame,所以正確的 dtypes 就是原始的 dtypes
    correct_dtypes = df.dtypes.copy()
    
    result_df = df.mf.apply_chunk(
        custom_set_item,
        output_type="dataframe",
        dtypes=correct_dtypes,
        index=df.index
    )
    
    final_result = result_df.execute().fetch()
    
    print("\n使用正確的 dtypes 後,執行結果如下:")
    print(final_result)
    
    session.destroy()

合理設定batch_rows控制記憶體與並發

  • 避免單個task OOM。

  • 提高並行度。

  • 更好地配合@with_running_options(memory=...)使用資源。

調試技巧:列印中間結果 & 異常捕獲

由於 UDF 運行在遠程 MaxCompute Worker 上,標準 print 日誌可通過 LogView 查看。

設定flush=True 確保日誌及時輸出,便於排查。

def process(chunk):
    try:
        print(f"Processing chunk with shape: {chunk.shape}", flush=True)
        print(f"Columns: {list(chunk.columns)}", flush=True)
        result = chunk.sort_values("B")
        print("Success.", flush=True)
        return result
    except Exception as e:
        print(f"[ERROR] Failed to process chunk: {str(e)}", flush=True)
        raise
        

效能調優建議

  • 批次大小:batch_rows根據資料量大小及資源合理設定,避免過大。

  • 輸出資料行數:盡量只返回必要欄位。

  • 函數複雜度:避免在 UDF 中做 heavy compute。

  • 外部依賴:使用@with_running_options(memory=16)提升記憶體。

常見問題

TypeError: cannot determine dtype

  • 問題原因

    未提供dtypes

  • 解決方案

    顯式傳入pd.Series類型。

輸出為空白或丟失列

  • 問題原因

    dtypes不匹配。

  • 解決方案

    檢查函數傳回值列名是否一致。

執行逾時或卡住

  • 問題原因

    batch_rows過大。

  • 解決方案

    減小批次,增加資源。