Topik ini menjelaskan cara menggunakan operator MaxFrame DataFrame.mf.apply_chunk untuk pemrosesan data skala besar, mencakup penggunaan inti, kesalahan umum, dan praktik terbaik untuk lingkungan produksi.
apply_chunk pengenalan fungsi
Dalam framework komputasi terdistribusi:
pandas.DataFrame.apply()standar:Ini adalah operasi single-machine yang tidak dapat diskalakan.
MaxFrame
df.apply():Mendukung pemetaan terdistribusi baris per baris atau chunk per chunk.
Namun,
apply()secara default memproses data baris per baris, yang tidak efisien untuk pemrosesan data skala besar.
Fungsi MaxFrame apply_chunk() menyediakan fitur-fitur berikut:
Memproses data dalam batch dan memungkinkan Anda mengontrol ukuran setiap chunk melalui parameter
batch_rows.Mendukung tipe dan struktur output kustom, seperti
output_type,dtypes, danindex_value.Mendukung dekorator user-defined function (UDF), seperti
@with_python_requirements, untuk mengembangkan tugas kompleks.
Untuk tugas yang sensitif terhadap performa, kami merekomendasikan penggunaan apply_chunk.
Analisis signature
DataFrame.mf.apply_chunk(
func,
batch_rows=None,
output_type=None,
dtypes=None,
index=None,
index_value=None,
columns=None,
elementwise=None,
sort=False,
**kwds
)Deskripsi parameter
Parameter | Tipe | Deskripsi |
func | callable | UDF yang menerima pandas DataFrame dan menghasilkan pandas DataFrame atau Series. DataFrame pandas input untuk fungsi ini merupakan sebuah chunk dari DataFrame, yang dapat dianggap sebagai batch baris. |
batch_rows | int | Jumlah maksimum baris dalam setiap chunk. |
output_type | str | Tipe output. Contoh: "dataframe" dan "series". |
dtypes | pd.Series | Tipe data kolom output. |
index | Index | Objek indeks output. |
index_value | IndexValue | Metadata untuk indeks terdistribusi. Dapatkan ini dari DataFrame asli. |
sort | bool | Menentukan apakah data dalam kelompok harus diurutkan dalam skenario groupby. |
Contoh penggunaan
import os
import pyarrow as pa
import pandas as pd
import maxframe.dataframe as md
from maxframe.lib.dtypes_extension import dict_
from maxframe import new_session
from odps import ODPS
o = ODPS(
# Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID diatur ke ID AccessKey Anda.
# Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET diatur ke Rahasia AccessKey Anda.
# Jangan gunakan string ID AccessKey dan Rahasia AccessKey secara langsung.
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your project>',
endpoint='https://service.cn-<your region>.maxcompute.aliyun.com/api',
)
session = new_session(o)
# Buat data uji (pandas DataFrame)
col_a = pd.Series(
data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
index=[1, 2, 3],
dtype=dict_(pa.string(), pa.int64()),
)
col_b = pd.Series(
data=["A", "B", "C"],
index=[1, 2, 3],
)
df = md.DataFrame({"A": col_a, "B": col_b})
df.execute()
# Definisikan fungsi kustom
def custom_set_item(df):
for name, value in df["A"].items():
if value is not None:
df["A"][name]["x"] = 100
return df
# Panggil apply_chunk
result_df = df.mf.apply_chunk(
custom_set_item,
output_type="dataframe",
dtypes=df.dtypes.copy(),
batch_rows=2,
skip_infer=True,
index=df.index,
).execute()
session.destroy()
Eksekusi dan penyetelan performa
Secara eksplisit deklarasikan output_type dan dtypes
Jangan mengandalkan inferensi tipe karena dapat menyebabkan kegagalan waktu proses atau degradasi performa.
result_df = df.mf.apply_chunk(<process>) # dtypes tidak disertakan!Rekomendasi: pernyataan eksplisit
PentingJangan langsung memodifikasi
dtypesdaridfasli. Sebagai gantinya, gunakan.copy()atau buat DataFrame baru.Modifikasi kode sebelumnya sebagai berikut:
def process(df_chunk): # Anda dapat menambahkan logika di sini atau cukup mengembalikan chunk tersebut. Kuncinya adalah struktur output. return df_chunk.copy() # Mengembalikan DataFrame yang berisi 'A' dan 'B'. def get_incorrect_dtypes(df): # Mengembalikan dtypes yang tidak lengkap yang sengaja tidak sesuai dengan output UDF. return df.dtypes.drop('A') incorrect_dtypes = get_incorrect_dtypes(df) print("\n--- Mencoba memanggil apply_chunk dengan dtypes yang tidak sesuai (error diharapkan) ---") try: result_df = df.mf.apply_chunk( process, output_type="dataframe", dtypes=incorrect_dtypes # Menggunakan dtypes yang salah. ).execute() except Exception as e: print("Berhasil menangkap error yang diharapkan! Pesan error sebagai berikut:") print(f"Tipe error: {type(e)}") print(f"Detail error: {e}\n") # Error biasanya mirip dengan 'ValueError: forward method expected 1 arguments, but get 2'. # Hal ini karena dtypes hanya menentukan satu kolom ('B'), tetapi UDF mengembalikan dua kolom ('A', 'B'). print("--- Penggunaan yang benar adalah sebagai berikut ---") # Cara yang benar adalah dtypes harus secara akurat menggambarkan output UDF (process). # Karena process mengembalikan DataFrame lengkap, dtypes yang benar adalah dtypes asli. correct_dtypes = df.dtypes.copy() result_df = df.mf.apply_chunk( custom_set_item, output_type="dataframe", dtypes=correct_dtypes, index=df.index ) final_result = result_df.execute().fetch() print("\nSetelah menggunakan dtypes yang benar, hasil eksekusi adalah sebagai berikut:") print(final_result) session.destroy()
Atur batch_rows untuk mengontrol memori dan konkurensi
Menghindari error kehabisan memori (OOM) pada satu tugas.
Meningkatkan tingkat paralelisme.
Memungkinkan pemanfaatan sumber daya yang lebih baik dengan
@with_running_options(memory=...).
Kiat debugging: Cetak hasil antara dan tangkap exception
Karena UDF dijalankan pada worker MaxCompute remote, Anda dapat melihat log print standar di LogView.
Anda dapat mengatur flush=True untuk memastikan log segera ditampilkan guna mempermudah troubleshooting.
def process(chunk):
try:
print(f"Memproses chunk dengan bentuk: {chunk.shape}", flush=True)
print(f"Kolom: {list(chunk.columns)}", flush=True)
result = chunk.sort_values("B")
print("Berhasil.", flush=True)
return result
except Exception as e:
print(f"[ERROR] Gagal memproses chunk: {str(e)}", flush=True)
raise
Rekomendasi penyetelan performa
Ukuran batch: Atur
batch_rowsberdasarkan volume data dan sumber daya yang tersedia. Hindari mengaturnya ke nilai yang terlalu besar.Jumlah kolom output: Hanya kembalikan bidang yang diperlukan.
Kompleksitas fungsi: Hindari komputasi berat dalam UDF.
Untuk dependensi eksternal, Anda dapat menggunakan
@with_running_options(memory=16)untuk menambah memori.
FAQ
TypeError: cannot determine dtype
Penyebab
Parameter
dtypestidak disediakan.Solusi
Berikan secara eksplisit tipe
pd.Series.
Output kosong atau kolom hilang
Penyebab
dtypestidak sesuai.Solusi
Pastikan nama kolom dalam nilai kembali fungsi konsisten.
Eksekusi timeout atau hang
Penyebab
Nilai
batch_rowsterlalu besar.Solusi
Kurangi ukuran batch dan alokasikan lebih banyak sumber daya.