When you run df.apply() on a distributed MaxFrame DataFrame, data is processed row by row — which becomes a bottleneck at scale. apply_chunk solves this by processing data in configurable batches and sending each batch to a MaxCompute worker as a pandas DataFrame. For performance-sensitive workloads, use apply_chunk instead of df.apply().
Use apply_chunk when:
Your user-defined function (UDF) operates on a pandas DataFrame and needs to run at scale
You want to control memory usage and the degree of parallelism
Row-by-row processing with
df.apply()is too slow for your data volume
How it works
apply_chunk splits the distributed DataFrame into chunks, sends each chunk to a MaxCompute worker as a pandas DataFrame, and merges the results. The key parameters — batch_rows, output_type, and dtypes — tell MaxFrame how to partition the data, what the UDF returns, and how to validate and merge the output. You can also use UDF decorators such as @with_python_requirements to manage dependencies for complex tasks.
Parameters
DataFrame.mf.apply_chunk(
func,
batch_rows=None,
output_type=None,
dtypes=None,
index=None,
index_value=None,
columns=None,
elementwise=None,
sort=False,
**kwds
)| Parameter | Type | Description |
|---|---|---|
func | callable | A UDF that accepts a pandas DataFrame (a chunk of rows) and returns a pandas DataFrame or Series. |
batch_rows | int | Maximum number of rows per chunk. Controls memory usage and the degree of parallelism. |
output_type | str | Output type: "dataframe" or "series". |
dtypes | pd.Series | Data types of the output columns. Must exactly match the columns returned by func. |
index | Index | Output index object. |
index_value | IndexValue | Metadata for the distributed index. Get this from the original DataFrame. |
sort | bool | Whether to sort data within groups in a groupby scenario. |
Understanding dtypes
dtypes is a pd.Series that describes the column names and data types of your UDF's output. MaxFrame uses it to validate and merge results across workers. If dtypes doesn't match what func actually returns, the job fails at runtime.
To inspect what dtypes looks like for a DataFrame:
>>> df.dtypes
A object
B object
dtype: objectThe most common way to provide dtypes is to copy it from the original DataFrame:
dtypes=df.dtypes.copy()Never pass df.dtypes directly. Always use .copy() to avoid modifying the original DataFrame's metadata.
If your UDF changes the output schema (adds or drops columns), construct a new pd.Series that describes the actual output:
import pandas as pd
# UDF drops column 'A' and adds column 'C'
new_dtypes = pd.Series({
"B": df.dtypes["B"],
"C": pd.StringDtype(),
})Example
The following example creates a MaxFrame DataFrame with a dict-type column, then uses apply_chunk to update values in each chunk.
import os
import pyarrow as pa
import pandas as pd
import maxframe.dataframe as md
from maxframe.lib.dtypes_extension import dict_
from maxframe import new_session
from odps import ODPS
o = ODPS(
# Get credentials from environment variables.
# Never hardcode AccessKey ID or AccessKey secret in your code.
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='<your-project>',
endpoint='https://service.cn-<your-region>.maxcompute.aliyun.com/api',
)
session = new_session(o)
# Create a MaxFrame DataFrame with a dict-type column
col_a = pd.Series(
data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
index=[1, 2, 3],
dtype=dict_(pa.string(), pa.int64()),
)
col_b = pd.Series(
data=["A", "B", "C"],
index=[1, 2, 3],
)
df = md.DataFrame({"A": col_a, "B": col_b})
df.execute()
def custom_set_item(df: pd.DataFrame) -> pd.DataFrame:
"""Add key 'x' with value 100 to each non-null dict in column A."""
for name, value in df["A"].items():
if value is not None:
df["A"][name]["x"] = 100
return df
result_df = df.mf.apply_chunk(
custom_set_item,
output_type="dataframe",
dtypes=df.dtypes.copy(), # Must match the columns returned by the UDF
batch_rows=2, # Process 2 rows per chunk
skip_infer=True,
index=df.index,
).execute()
session.destroy()Replace the following placeholders with actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-project> | MaxCompute project name | my_project |
<your-region> | Region ID | cn-hangzhou |
Performance tuning
Set batch_rows based on your data and resources
batch_rows controls how many rows each worker processes per chunk. The right value depends on your row size and available memory:
| Direction | Effect | Risk |
|---|---|---|
| Larger values | Fewer chunks, less scheduling overhead, better throughput | Out-of-memory (OOM) errors if rows are wide or the UDF is memory-intensive |
| Smaller values | More chunks, higher degree of parallelism | Increased scheduling overhead for very small chunks |
Start with a conservative value, monitor memory usage in LogView, and adjust from there. If you see OOM errors, reduce batch_rows. If jobs run fast with low memory usage, increase it.
If your UDF requires more memory per chunk, use the @with_running_options decorator to raise the per-task memory limit:
from maxframe.udf import with_running_options
@with_running_options(memory=16)
def my_udf(df: pd.DataFrame) -> pd.DataFrame:
...Always declare output_type and dtypes explicitly
By default, MaxFrame tries to infer the output schema by running your function on sample data. Inference can fail or produce incorrect results for complex schemas. Explicitly declaring output_type and dtypes avoids runtime failures and makes the job faster.
Without explicit declaration (fails at runtime):
result_df = df.mf.apply_chunk(process) # dtypes is missingWith explicit declaration (correct):
result_df = df.mf.apply_chunk(
process,
output_type="dataframe",
dtypes=df.dtypes.copy(),
)A common mistake is passing dtypes that don't match the UDF's actual output. For example, if you drop a column inside func but still pass the original df.dtypes, MaxFrame raises a ValueError because the declared schema has more columns than the returned DataFrame.
Return only the columns you need
Each output column is serialized, transferred, and merged across workers. Returning unused columns increases memory usage and slows down the job. Inside func, drop any columns you don't need before returning.
Debug UDFs with print and flush=True
UDFs run on remote MaxCompute workers. Use print(..., flush=True) to emit logs immediately — they appear in LogView. Wrap the UDF body in a try/except block to catch and log errors before they surface as generic failures:
def process(chunk: pd.DataFrame) -> pd.DataFrame:
try:
print(f"Processing chunk: shape={chunk.shape}, columns={list(chunk.columns)}", flush=True)
result = chunk.sort_values("B")
print("Chunk processed successfully.", flush=True)
return result
except Exception as e:
print(f"[ERROR] {type(e).__name__}: {e}", flush=True)
raiseFAQ
Why do I get TypeError: cannot determine dtype?
MaxFrame failed to infer the output schema. Pass dtypes and output_type explicitly when calling apply_chunk.
Why is my output empty or missing columns?
The dtypes you passed doesn't match what your UDF returns. Check that the column names in dtypes exactly match the columns in the DataFrame returned by func. Drop any column from dtypes that func doesn't include in its return value.
Why does the job hang or time out?
batch_rows is probably too large. Reduce it and allocate more resources to give each worker a smaller, faster chunk to process. Also check LogView for OOM errors — they often appear as hangs rather than explicit failures.