PyODPS DataFrame lets you extend built-in computation with user-defined functions (UDFs) and third-party Python packages. This topic covers element-wise mapping with map, row-level transformations and custom aggregations with apply, resource references inside UDFs, and how to upload and configure third-party packages.
Prerequisites
Before you begin, ensure that you have:
A DataFrame object created from a MaxCompute table or a pandas DataFrame
Python UDF support enabled in your MaxCompute project (required for
mapandapplywith Python functions)
Alibaba Cloud public services do not support Python UDF. If your project does not support Python UDFs, the map method and built-in functions that depend on it are unavailable.Known limitations
| Limitation | Details |
|---|---|
| Unsupported types | The map and apply methods do not accept LIST or DICT types as input or output. |
| Pre-installed binary library | The only pre-installed third-party library that contains C code is NumPy. All other binary libraries require explicit upload. |
| Python 2/3 compatibility | Because of byte code differences between Python versions, code that uses Python 3-specific syntax (such as yield from) may fail on a MaxCompute Worker running Python 2.7. Verify that your code runs correctly before deploying to production using the MapReduce API in Python 3. |
| Build platform for binary packages | Wheel files built on macOS or Windows cannot be used in MaxCompute. Build binary packages in a Linux shell. |
Apply UDFs to a column
Use the map method on a Sequence object to call a UDF on every element.
>>> iris.sepallength.map(lambda x: x + 1).head(5)
sepallength
0 6.1
1 5.9
2 5.7
3 5.6
4 6.0If the Sequence type changes after map, explicitly specify the new type:
>>> iris.sepallength.map(lambda x: 't' + str(x), 'string').head(5)
sepallength
0 t5.1
1 t4.9
2 t4.7
3 t4.6
4 t5.0Avoid closure variable capture bugs
When a UDF contains a closure, external changes to the captured variable affect the function's behavior. The following code produces an unintended result — each SequenceExpr in dfs ends up as df.sepal_length + 9:
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(lambda x: x + i))Fix this by returning the lambda from an outer function, or by using functools.partial:
# Option 1: use a factory function
>>> dfs = []
>>> def get_mapper(i):
>>> return lambda x: x + i
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(get_mapper(i)))# Option 2: use functools.partial
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))Use existing UDFs
Pass a function name (string) or a Function object to map to call an existing UDF. For details, see Functions.
Track execution with counters
Use get_execution_context to access counters from within a UDF. Counter values appear in JSONSummary of LogView.
from odps.udf import get_execution_context
def h(x):
ctx = get_execution_context()
counters = ctx.get_counters()
counters.get_counter('df', 'add_one').increment(1)
return x + 1
df.field.map(h)Apply UDFs to a row
Use apply with axis=1 to call a UDF on each row. The UDF receives one row at a time; retrieve field values by attribute name or index.
Return a single value per row
Set reduce=True to return a Sequence. Specify the output type with the types parameter (default is STRING).
>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
sepaladd
0 8.6
1 7.9
2 7.9Return multiple rows using yield
Set reduce=False and use yield to emit multiple rows per input row. Specify output field names and types with names and types.
>>> iris.count()
150
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300Annotate the output schema directly on the function to avoid repeating it at call sites:
>>> from odps.df import output
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>> iris.apply(handle, axis=1).count()
300Equivalent: map-only map_reduce
map_reduce in map-only mode is equivalent to apply with axis=1:
>>> iris.map_reduce(mapper=handle).count()
300Use an existing UDTF
To call an existing user-defined table-valued function (UDTF) in MaxCompute, pass its name as a string:
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])Combine row output with a lateral view
When reduce=False, combine the UDF output with original columns using a lateral view — useful for aggregations:
>>> from odps.df import output
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>> iris[iris.category, iris.apply(handle, axis=1)]Apply custom aggregations to a column
Use apply with axis=0 (or no axis argument) to pass a custom aggregation class over all Sequence objects. The class must implement buffer, __call__, merge, and getvalue.
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667Read MaxCompute resources in UDFs
UDFs can read MaxCompute resources — file resources and table resources — or reference a Collection object as a resource. Wrap the UDF in a closure or callable class so resources are loaded once at initialization rather than per row.
Loading resources inside the closure (rather than on every function call) avoids repeated initialization overhead — for example, when loading lookup tables or model artifacts.
Row-level UDF with file and collection resources
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor>>> def myfunc(resources): # resources are passed in by calling order
>>> names = set()
>>> fileobj = resources[0] # file resources are represented by a file-like object
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # retrieve values by field name or offset
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>> df
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica FalseWhen reading partitioned tables, partition fields are not included.
Row-level UDF with a local DataFrame as a resource
Local variables can be referenced as resources in MaxCompute at execution time. In the following example, stop_words is a local DataFrame that the executor passes to the UDF as a resource:
>>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use PythonFor row operations (axis=1), use a function closure or callable class to load resources. For column aggregations, use the__init__method instead.
Upload third-party Python libraries
MaxCompute supports uploading Python packages in .whl, .egg, .zip, and .tar.gz formats. All dependencies must be explicitly specified — omitting a dependency causes import errors at runtime.
Choose your upload path based on the package type:
| Package type | Upload method | Notes |
|---|---|---|
| Pre-installed | None needed | NumPy only |
| Pure Python (no compiled code, no file operations) | Upload as .whl file resource | Works for packages like python-dateutil, pytz, six. Later versions of MaxCompute also support packages with file operations. |
| Binary (compiled C extensions) | Upload as .zip archive resource, enable isolation | Requires cp27-cp27m-manylinux1_x86_64 platform tag; build on Linux |
Pure Python packages
By default, PyODPS supports third-party libraries that contain pure Python code but no file operations. The following example uploads python-dateutil and its dependency six.
Step 1: Download the package and its dependencies. Packages must be built for Linux.
$ pip download python-dateutil -d /to/path/This downloads six-1.10.0-py2.py3-none-any.whl and python_dateutil-2.5.3-py2.py3-none-any.whl.
Step 2: Upload both files as resources using create_resource.
# Make sure that file name extensions are correct.
>>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
>>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))Step 3: Use the libraries. Specify them globally via options.df.libraries, or per-execution via the libraries parameter.
# Global configuration (applies to all subsequent DataFrame operations in this session)
>>> from odps import options
>>> def get_year(t):
>>> from dateutil.parser import parse
>>> return parse(t).strftime('%Y')
>>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
>>> df.datestr.map(get_year)
datestr
0 2016
1 2015# Per-execution configuration (applies only to this call)
>>> def get_year(t):
>>> from dateutil.parser import parse
>>> return parse(t).strftime('%Y')
>>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
datestr
0 2016
1 2015Binary packages (containing compiled code)
Packages that include compiled C extensions (such as SciPy or pandas) require additional steps:
The
.whlfile must use thecp27-cp27m-manylinux1_x86_64platform tag.Upload the file as an archive resource, with the
.whlextension renamed to.zip.Set
odps.isolation.session.enabletoTrue, or enableisolationin your project settings.
# Upload the binary package as an archive with the .zip extension.
>>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
# If isolation is already enabled in your project, the following option is optional.
>>> options.sql.settings = {'odps.isolation.session.enable': True}
>>> def psi(value):
>>> # Import the third-party library inside the function to avoid errors
>>> # caused by structural differences between operating systems.
>>> from scipy.special import psi
>>> return float(psi(value))
>>> df.float_col.map(psi).execute(libraries=['scipy.zip'])To build a binary package from source, run the following in a Linux shell. Wheel files built on macOS or Windows are not compatible with MaxCompute.
python setup.py bdist_wheelUpload via the MaxCompute console
As an alternative to the PyODPS API, upload packages using add archive in the MaxCompute console.
Step 1: Identify the correct package file for each dependency.
Most packages provide .whl files for multiple platforms. For binary packages, find the file with cp27-cp27m-manylinux1_x86_64 in its name. For pure Python packages, any py2.py3-none-any wheel works.
Step 2: Verify all required dependencies. The following table lists dependencies for common packages.
| Package | Dependencies |
|---|---|
| pandas | NumPy, python-dateutil, pytz, six |
| SciPy | NumPy |
| scikit-learn | NumPy, SciPy |
NumPy is already pre-installed. Upload only python-dateutil, pytz, pandas, SciPy, scikit-learn, and six.
Step 3: Download the package files. The following table lists the specific files to download for each package.
| Package | File to download | Upload resource name |
|---|---|---|
| python-dateutil | python-dateutil-2.6.0.zip | python-dateutil.zip |
| pytz | pytz-2017.2.zip | pytz.zip |
| six | six-1.11.0.tar.gz | six.tar.gz |
| pandas | pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip | pandas.zip |
| SciPy | scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip | scipy.zip |
| scikit-learn | scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip | sklearn.zip |
Step 4: Upload each file. For binary packages (pandas, SciPy, scikit-learn), rename the .whl extension to .zip before uploading.
add archive python-dateutil.zip;
add archive pandas.zip;Specify libraries for execution
Use options.df.libraries to set libraries globally for the session, or pass the libraries parameter directly to an execution method to scope it to a single call.
# Global: applies to all subsequent DataFrame operations in this session
>>> from odps import options
>>> options.df.libraries = ['six.whl', 'python_dateutil.whl']# Local: applies only to this execution call
>>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])