All Products
Search
Document Center

MaxCompute:Praktik terbaik untuk menggunakan operator MaxFrame apply_chunk

Last Updated:Jan 10, 2026

Topik ini menjelaskan cara menggunakan operator MaxFrame DataFrame.mf.apply_chunk untuk pemrosesan data skala besar, mencakup penggunaan inti, kesalahan umum, dan praktik terbaik untuk lingkungan produksi.

apply_chunk pengenalan fungsi

Dalam framework komputasi terdistribusi:

  • pandas.DataFrame.apply() standar:

    Ini adalah operasi single-machine yang tidak dapat diskalakan.

  • MaxFrame df.apply():

    • Mendukung pemetaan terdistribusi baris per baris atau chunk per chunk.

    • Namun, apply() secara default memproses data baris per baris, yang tidak efisien untuk pemrosesan data skala besar.

Fungsi MaxFrame apply_chunk() menyediakan fitur-fitur berikut:

  • Memproses data dalam batch dan memungkinkan Anda mengontrol ukuran setiap chunk melalui parameter batch_rows.

  • Mendukung tipe dan struktur output kustom, seperti output_type, dtypes, dan index_value.

  • Mendukung dekorator user-defined function (UDF), seperti @with_python_requirements, untuk mengembangkan tugas kompleks.

Untuk tugas yang sensitif terhadap performa, kami merekomendasikan penggunaan apply_chunk.

Analisis signature

DataFrame.mf.apply_chunk(
    func,
    batch_rows=None,
    output_type=None,
    dtypes=None,
    index=None,
    index_value=None,
    columns=None,
    elementwise=None,
    sort=False,
    **kwds
)

Deskripsi parameter

Parameter

Tipe

Deskripsi

func

callable

UDF yang menerima pandas DataFrame dan menghasilkan pandas DataFrame atau Series.

DataFrame pandas input untuk fungsi ini merupakan sebuah chunk dari DataFrame, yang dapat dianggap sebagai batch baris.

batch_rows

int

Jumlah maksimum baris dalam setiap chunk.

output_type

str

Tipe output. Contoh: "dataframe" dan "series".

dtypes

pd.Series

Tipe data kolom output.

index

Index

Objek indeks output.

index_value

IndexValue

Metadata untuk indeks terdistribusi. Dapatkan ini dari DataFrame asli.

sort

bool

Menentukan apakah data dalam kelompok harus diurutkan dalam skenario groupby.

Contoh penggunaan

import os
import pyarrow as pa
import pandas as pd
import maxframe.dataframe as md
from maxframe.lib.dtypes_extension import dict_
from maxframe import new_session
from odps import ODPS

o = ODPS(
    # Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID diatur ke ID AccessKey Anda.
    # Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET diatur ke Rahasia AccessKey Anda.
    # Jangan gunakan string ID AccessKey dan Rahasia AccessKey secara langsung.
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='<your project>',
    endpoint='https://service.cn-<your region>.maxcompute.aliyun.com/api',
)

session = new_session(o)

# Buat data uji (pandas DataFrame)
col_a = pd.Series(
    data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
    index=[1, 2, 3],
    dtype=dict_(pa.string(), pa.int64()),
)
col_b = pd.Series(
    data=["A", "B", "C"],
    index=[1, 2, 3],
)
df = md.DataFrame({"A": col_a, "B": col_b})
df.execute()


# Definisikan fungsi kustom
def custom_set_item(df):
    for name, value in df["A"].items():
        if value is not None:
            df["A"][name]["x"] = 100
    return df


# Panggil apply_chunk
result_df = df.mf.apply_chunk(
    custom_set_item,
    output_type="dataframe",
    dtypes=df.dtypes.copy(),
    batch_rows=2,
    skip_infer=True,
    index=df.index,
).execute()

session.destroy()

Eksekusi dan penyetelan performa

Secara eksplisit deklarasikan output_type dan dtypes

  • Jangan mengandalkan inferensi tipe karena dapat menyebabkan kegagalan waktu proses atau degradasi performa.

    result_df = df.mf.apply_chunk(<process>)  # dtypes tidak disertakan!
  • Rekomendasi: pernyataan eksplisit

    Penting

    Jangan langsung memodifikasi dtypes dari df asli. Sebagai gantinya, gunakan .copy() atau buat DataFrame baru.

    Modifikasi kode sebelumnya sebagai berikut:

    def process(df_chunk):
        # Anda dapat menambahkan logika di sini atau cukup mengembalikan chunk tersebut. Kuncinya adalah struktur output.
        return df_chunk.copy() # Mengembalikan DataFrame yang berisi 'A' dan 'B'.
        
    def get_incorrect_dtypes(df):
        # Mengembalikan dtypes yang tidak lengkap yang sengaja tidak sesuai dengan output UDF.
        return df.dtypes.drop('A')
    
    incorrect_dtypes = get_incorrect_dtypes(df)
          
    print("\n--- Mencoba memanggil apply_chunk dengan dtypes yang tidak sesuai (error diharapkan) ---")
    try:
        result_df = df.mf.apply_chunk(
            process,
            output_type="dataframe",
            dtypes=incorrect_dtypes  # Menggunakan dtypes yang salah.
        ).execute()
    except Exception as e:
        print("Berhasil menangkap error yang diharapkan! Pesan error sebagai berikut:")
        print(f"Tipe error: {type(e)}")
        print(f"Detail error: {e}\n")
        # Error biasanya mirip dengan 'ValueError: forward method expected 1 arguments, but get 2'.
        # Hal ini karena dtypes hanya menentukan satu kolom ('B'), tetapi UDF mengembalikan dua kolom ('A', 'B').
    
    print("--- Penggunaan yang benar adalah sebagai berikut ---")
    #    Cara yang benar adalah dtypes harus secara akurat menggambarkan output UDF (process).
    #    Karena process mengembalikan DataFrame lengkap, dtypes yang benar adalah dtypes asli.
    correct_dtypes = df.dtypes.copy()
    
    result_df = df.mf.apply_chunk(
        custom_set_item,
        output_type="dataframe",
        dtypes=correct_dtypes,
        index=df.index
    )
    
    final_result = result_df.execute().fetch()
    
    print("\nSetelah menggunakan dtypes yang benar, hasil eksekusi adalah sebagai berikut:")
    print(final_result)
    
    session.destroy()

Atur batch_rows untuk mengontrol memori dan konkurensi

  • Menghindari error kehabisan memori (OOM) pada satu tugas.

  • Meningkatkan tingkat paralelisme.

  • Memungkinkan pemanfaatan sumber daya yang lebih baik dengan @with_running_options(memory=...).

Kiat debugging: Cetak hasil antara dan tangkap exception

Karena UDF dijalankan pada worker MaxCompute remote, Anda dapat melihat log print standar di LogView.

Anda dapat mengatur flush=True untuk memastikan log segera ditampilkan guna mempermudah troubleshooting.

def process(chunk):
    try:
        print(f"Memproses chunk dengan bentuk: {chunk.shape}", flush=True)
        print(f"Kolom: {list(chunk.columns)}", flush=True)
        result = chunk.sort_values("B")
        print("Berhasil.", flush=True)
        return result
    except Exception as e:
        print(f"[ERROR] Gagal memproses chunk: {str(e)}", flush=True)
        raise
        

Rekomendasi penyetelan performa

  • Ukuran batch: Atur batch_rows berdasarkan volume data dan sumber daya yang tersedia. Hindari mengaturnya ke nilai yang terlalu besar.

  • Jumlah kolom output: Hanya kembalikan bidang yang diperlukan.

  • Kompleksitas fungsi: Hindari komputasi berat dalam UDF.

  • Untuk dependensi eksternal, Anda dapat menggunakan @with_running_options(memory=16) untuk menambah memori.

FAQ

TypeError: cannot determine dtype

  • Penyebab

    Parameter dtypes tidak disediakan.

  • Solusi

    Berikan secara eksplisit tipe pd.Series.

Output kosong atau kolom hilang

  • Penyebab

    dtypes tidak sesuai.

  • Solusi

    Pastikan nama kolom dalam nilai kembali fungsi konsisten.

Eksekusi timeout atau hang

  • Penyebab

    Nilai batch_rows terlalu besar.

  • Solusi

    Kurangi ukuran batch dan alokasikan lebih banyak sumber daya.