All Products
Search
Document Center

MaxCompute:Use UDFs and the third-party Python libraries

Last Updated:Mar 26, 2026

PyODPS DataFrame lets you extend built-in computation with user-defined functions (UDFs) and third-party Python packages. This topic covers element-wise mapping with map, row-level transformations and custom aggregations with apply, resource references inside UDFs, and how to upload and configure third-party packages.

Prerequisites

Before you begin, ensure that you have:

  • A DataFrame object created from a MaxCompute table or a pandas DataFrame

  • Python UDF support enabled in your MaxCompute project (required for map and apply with Python functions)

Alibaba Cloud public services do not support Python UDF. If your project does not support Python UDFs, the map method and built-in functions that depend on it are unavailable.

Known limitations

LimitationDetails
Unsupported typesThe map and apply methods do not accept LIST or DICT types as input or output.
Pre-installed binary libraryThe only pre-installed third-party library that contains C code is NumPy. All other binary libraries require explicit upload.
Python 2/3 compatibilityBecause of byte code differences between Python versions, code that uses Python 3-specific syntax (such as yield from) may fail on a MaxCompute Worker running Python 2.7. Verify that your code runs correctly before deploying to production using the MapReduce API in Python 3.
Build platform for binary packagesWheel files built on macOS or Windows cannot be used in MaxCompute. Build binary packages in a Linux shell.

Apply UDFs to a column

Use the map method on a Sequence object to call a UDF on every element.

>>> iris.sepallength.map(lambda x: x + 1).head(5)
   sepallength
0          6.1
1          5.9
2          5.7
3          5.6
4          6.0

If the Sequence type changes after map, explicitly specify the new type:

>>> iris.sepallength.map(lambda x: 't' + str(x), 'string').head(5)
   sepallength
0         t5.1
1         t4.9
2         t4.7
3         t4.6
4         t5.0

Avoid closure variable capture bugs

When a UDF contains a closure, external changes to the captured variable affect the function's behavior. The following code produces an unintended result — each SequenceExpr in dfs ends up as df.sepal_length + 9:

>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(lambda x: x + i))

Fix this by returning the lambda from an outer function, or by using functools.partial:

# Option 1: use a factory function
>>> dfs = []
>>> def get_mapper(i):
>>>     return lambda x: x + i
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(get_mapper(i)))
# Option 2: use functools.partial
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

Use existing UDFs

Pass a function name (string) or a Function object to map to call an existing UDF. For details, see Functions.

Track execution with counters

Use get_execution_context to access counters from within a UDF. Counter values appear in JSONSummary of LogView.

from odps.udf import get_execution_context

def h(x):
    ctx = get_execution_context()
    counters = ctx.get_counters()
    counters.get_counter('df', 'add_one').increment(1)
    return x + 1

df.field.map(h)

Apply UDFs to a row

Use apply with axis=1 to call a UDF on each row. The UDF receives one row at a time; retrieve field values by attribute name or index.

Return a single value per row

Set reduce=True to return a Sequence. Specify the output type with the types parameter (default is STRING).

>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
   sepaladd
0       8.6
1       7.9
2       7.9

Return multiple rows using yield

Set reduce=False and use yield to emit multiple rows per input row. Specify output field names and types with names and types.

>>> iris.count()
150

>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth

>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300

Annotate the output schema directly on the function to avoid repeating it at call sites:

>>> from odps.df import output

>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth

>>> iris.apply(handle, axis=1).count()
300

Equivalent: map-only map_reduce

map_reduce in map-only mode is equivalent to apply with axis=1:

>>> iris.map_reduce(mapper=handle).count()
300

Use an existing UDTF

To call an existing user-defined table-valued function (UDTF) in MaxCompute, pass its name as a string:

>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])

Combine row output with a lateral view

When reduce=False, combine the UDF output with original columns using a lateral view — useful for aggregations:

>>> from odps.df import output

>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth

>>> iris[iris.category, iris.apply(handle, axis=1)]

Apply custom aggregations to a column

Use apply with axis=0 (or no axis argument) to pass a custom aggregation class over all Sequence objects. The class must implement buffer, __call__, merge, and getvalue.

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
   sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
0                 5.843333                   3.054                 3.758667                1.198667

Read MaxCompute resources in UDFs

UDFs can read MaxCompute resources — file resources and table resources — or reference a Collection object as a resource. Wrap the UDF in a closure or callable class so resources are loaded once at initialization rather than per row.

Loading resources inside the closure (rather than on every function call) avoids repeated initialization overhead — for example, when loading lookup tables or model artifacts.

Row-level UDF with file and collection resources

>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')

>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
       sepallength
0      Iris-setosa
1  Iris-versicolor
>>> def myfunc(resources):  # resources are passed in by calling order
>>>     names = set()
>>>     fileobj = resources[0]  # file resources are represented by a file-like object
>>>     for l in fileobj:
>>>         names.add(l)
>>>     collection = resources[1]
>>>     for r in collection:
>>>         names.add(r.name)  # retrieve values by field name or offset
>>>     def h(x):
>>>         if x in names:
>>>             return True
>>>         else:
>>>             return False
>>>     return h

>>> df = iris.distinct('name')
>>> df = df[df.name,
>>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]

>>> df
              name   isin
0      Iris-setosa   True
1  Iris-versicolor   True
2   Iris-virginica  False
When reading partitioned tables, partition fields are not included.

Row-level UDF with a local DataFrame as a resource

Local variables can be referenced as resources in MaxCompute at execution time. In the following example, stop_words is a local DataFrame that the executor passes to the UDF as a resource:

>>> words_df
                     sentence
0                 Hello World
1                Hello Python
2  Life is short I use Python

>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))

>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>>     stop_words = set([r[0] for r in resources[0]])
>>>     def h(row):
>>>         return ' '.join(w for w in row[0].split() if w not in stop_words),
>>>     return h

>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
                sentence
0            Hello World
1           Hello Python
2  Life short use Python
For row operations (axis=1), use a function closure or callable class to load resources. For column aggregations, use the __init__ method instead.

Upload third-party Python libraries

MaxCompute supports uploading Python packages in .whl, .egg, .zip, and .tar.gz formats. All dependencies must be explicitly specified — omitting a dependency causes import errors at runtime.

Choose your upload path based on the package type:

Package typeUpload methodNotes
Pre-installedNone neededNumPy only
Pure Python (no compiled code, no file operations)Upload as .whl file resourceWorks for packages like python-dateutil, pytz, six. Later versions of MaxCompute also support packages with file operations.
Binary (compiled C extensions)Upload as .zip archive resource, enable isolationRequires cp27-cp27m-manylinux1_x86_64 platform tag; build on Linux

Pure Python packages

By default, PyODPS supports third-party libraries that contain pure Python code but no file operations. The following example uploads python-dateutil and its dependency six.

Step 1: Download the package and its dependencies. Packages must be built for Linux.

$ pip download python-dateutil -d /to/path/

This downloads six-1.10.0-py2.py3-none-any.whl and python_dateutil-2.5.3-py2.py3-none-any.whl.

Step 2: Upload both files as resources using create_resource.

# Make sure that file name extensions are correct.
>>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
>>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))

Step 3: Use the libraries. Specify them globally via options.df.libraries, or per-execution via the libraries parameter.

# Global configuration (applies to all subsequent DataFrame operations in this session)
>>> from odps import options

>>> def get_year(t):
>>>     from dateutil.parser import parse
>>>     return parse(t).strftime('%Y')

>>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
>>> df.datestr.map(get_year)
   datestr
0     2016
1     2015
# Per-execution configuration (applies only to this call)
>>> def get_year(t):
>>>     from dateutil.parser import parse
>>>     return parse(t).strftime('%Y')

>>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
   datestr
0     2016
1     2015

Binary packages (containing compiled code)

Packages that include compiled C extensions (such as SciPy or pandas) require additional steps:

  • The .whl file must use the cp27-cp27m-manylinux1_x86_64 platform tag.

  • Upload the file as an archive resource, with the .whl extension renamed to .zip.

  • Set odps.isolation.session.enable to True, or enable isolation in your project settings.

# Upload the binary package as an archive with the .zip extension.
>>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))

# If isolation is already enabled in your project, the following option is optional.
>>> options.sql.settings = {'odps.isolation.session.enable': True}

>>> def psi(value):
>>>     # Import the third-party library inside the function to avoid errors
>>>     # caused by structural differences between operating systems.
>>>     from scipy.special import psi
>>>     return float(psi(value))

>>> df.float_col.map(psi).execute(libraries=['scipy.zip'])

To build a binary package from source, run the following in a Linux shell. Wheel files built on macOS or Windows are not compatible with MaxCompute.

python setup.py bdist_wheel

Upload via the MaxCompute console

As an alternative to the PyODPS API, upload packages using add archive in the MaxCompute console.

Step 1: Identify the correct package file for each dependency.

Most packages provide .whl files for multiple platforms. For binary packages, find the file with cp27-cp27m-manylinux1_x86_64 in its name. For pure Python packages, any py2.py3-none-any wheel works.

Step 2: Verify all required dependencies. The following table lists dependencies for common packages.

PackageDependencies
pandasNumPy, python-dateutil, pytz, six
SciPyNumPy
scikit-learnNumPy, SciPy
NumPy is already pre-installed. Upload only python-dateutil, pytz, pandas, SciPy, scikit-learn, and six.

Step 3: Download the package files. The following table lists the specific files to download for each package.

PackageFile to downloadUpload resource name
python-dateutilpython-dateutil-2.6.0.zippython-dateutil.zip
pytzpytz-2017.2.zippytz.zip
sixsix-1.11.0.tar.gzsix.tar.gz
pandaspandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zippandas.zip
SciPyscipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zipscipy.zip
scikit-learnscikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zipsklearn.zip

Step 4: Upload each file. For binary packages (pandas, SciPy, scikit-learn), rename the .whl extension to .zip before uploading.

add archive python-dateutil.zip;
add archive pandas.zip;

Specify libraries for execution

Use options.df.libraries to set libraries globally for the session, or pass the libraries parameter directly to an execution method to scope it to a single call.

# Global: applies to all subsequent DataFrame operations in this session
>>> from odps import options
>>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
# Local: applies only to this execution call
>>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])