A user-defined aggregate function (UDAF) works like a built-in aggregate function such as SUM or AVG, except you define the aggregation logic. MaxCompute runs Python 2 UDAFs using Python 2.7.
Use MaxCompute Studio to write and upload UDAF code.
Code structure
A Python 2 UDAF file has five components:
| Component | Required | Description |
|---|---|---|
| Encoding declaration | Optional | Add #coding:utf-8 or # -*- coding: utf-8 -*- at the top of the file if the code contains Chinese characters. Without this declaration, the UDAF returns an error at runtime. |
| Module imports | Required | Import from odps.udf import annotate and from odps.udf import BaseUDAF. To reference file or table resources, also import from odps.distcache import get_cache_file or from odps.distcache import get_cache_table. |
| Function signature | Required | Declare the input and return data types using @annotate(<signature>). MaxCompute validates type consistency during semantic parsing. |
| Custom Python class | Required | Extend BaseUDAF to define the aggregation logic. |
| Methods | Required | Implement the four methods described in Methods. |
Methods
All four methods are required. Implement them in the class that extends BaseUDAF.
| Method | Required | Description |
|---|---|---|
BaseUDAF.new_buffer() |
Required | Returns the intermediate value buffer. The buffer must be a marshallable object, such as a LIST or a DICT. Its size must not grow with the amount of input data. After marshaling, the buffer cannot exceed 2 MB. |
BaseUDAF.iterate(buffer[, args, ...]) |
Required | Aggregates each input row's args into the buffer. |
BaseUDAF.merge(buffer, pbuffer) |
Required | Merges pbuffer into buffer. MaxCompute calls this method when combining partial results across workers. |
BaseUDAF.terminate(buffer) |
Required | Converts the final buffer into a MaxCompute SQL basic data type value and returns it. |
Example: calculate an average
The following UDAF computes the average of a DOUBLE column and returns a DOUBLE result.
#coding:utf-8
from odps.udf import annotate
from odps.udf import BaseUDAF
@annotate('double->double')
class Average(BaseUDAF):
def new_buffer(self):
# Buffer stores [sum, count] as a list
return [0, 0]
def iterate(self, buffer, number):
if number is not None:
buffer[0] += number
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def terminate(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
After uploading the UDAF and registering it (for example, as my_avg), call it in MaxCompute SQL the same way you call a built-in aggregate function:
-- Compute the average score per category
SELECT category, my_avg(score) AS avg_score
FROM my_table
GROUP BY category;
For cross-project calls, prefix the function with the project name:
SELECT B:my_avg(score) AS avg_score FROM my_table;
For the full development and registration process, see Develop a Python UDF.
Function signatures and data types
Signature format
@annotate('<arg_type_list> -> <return_type>')
-
arg_type_list: comma-separated input types. Use*for any number of inputs, or leave blank for no inputs. -
return_type: a single return type. UDAFs always return one column.
Supported input types: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, DECIMAL(precision,scale), CHAR, VARCHAR, ARRAY, MAP, STRUCT, and nested complex types.
Supported return types: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, DECIMAL(precision,scale), ARRAY, MAP, STRUCT, and nested complex types.
Examples:
| Signature | Meaning |
|---|---|
@annotate('bigint,double->string') |
Takes BIGINT and DOUBLE inputs, returns STRING |
@annotate('*->string') |
Takes any number of inputs, returns STRING |
@annotate('->double') |
Takes no inputs, returns DOUBLE |
@annotate('array<bigint>->struct<x:string, y:int>') |
Takes ARRAY<BIGINT>, returns STRUCT<x:STRING, y:INT> |
Data type mappings
Write UDAF logic using the corresponding Python 2 types for each MaxCompute SQL type.
| MaxCompute SQL type | Python 2 type |
|---|---|
| BIGINT | INT |
| STRING | STR |
| DOUBLE | FLOAT |
| BOOLEAN | BOOL |
| DATETIME | INT |
| FLOAT | FLOAT |
| CHAR | STR |
| VARCHAR | STR |
| BINARY | BYTEARRAY |
| DATE | INT |
| DECIMAL | DECIMAL.DECIMAL |
| ARRAY | LIST |
| MAP | DICT |
| STRUCT | COLLECTIONS.NAMEDTUPLE |
- DATETIME maps to INT following the UNIX timestamp format: milliseconds elapsed since 1970-01-01 00:00:00 UTC. Use Python's
datetimemodule to process DATETIME values. - NULL in MaxCompute SQL maps to
Nonein Python 2. odps.udf.int(value, silent=True)returnsNoneinstead of raising an error whenvaluecannot be converted to INT.
Limitations
MaxCompute runs UDAF code in a sandbox environment. The following operations are not allowed:
-
Reading from or writing to local files
-
Starting subprocesses
-
Starting threads
-
Using socket communication
-
Calling Python 2 user-defined functions (UDFs) from other systems
Upload only code that uses Python standard libraries. If a standard library module or C extension module performs any of the above operations, it cannot be used.
Available modules:
-
All pure-Python standard library modules (no C extension dependency) are available.
-
The following C extension modules are available:
array,audioop,binascii,bisect,cmath,_codecs_cn,_codecs_hk,_codecs_iso2022,_codecs_jp,_codecs_kr,_codecs_tw,_collections,cStringIO,datetime,_functools,future_builtins,_heapq,_hashlib,itertools,_json,_locale,_lsprof,math,_md5,_multibytecodec,operator,_random,_sha256,_sha512,_sha,_struct,strop,time,unicodedata,_weakref,cPickle.
Output limit: Writing to sys.stdout or sys.stderr is capped at 20 KB. Characters beyond 20 KB are silently dropped.
Third-party libraries
Third-party libraries, such as NumPy, are pre-installed in the MaxCompute Python 2 environment as supplements to the standard library.
Reference resources
Access files and tables registered as MaxCompute resources from within UDAF code using the odps.distcache module.
odps.distcache.get_cache_file(resource_name)
Returns a file-like object for the named file resource.
-
resource_namemust be the name of an existing file resource in your MaxCompute project. An invalid name or a missing file causes an error. -
Declare the file when creating the UDAF. Calling without declaring the resource returns an error.
-
Call
close()on the returned object when you are done with it to release the resource.
odps.distcache.get_cache_table(resource_name)
Returns a generator that yields one ARRAY-type record per iteration for the named table resource.
-
resource_namemust be the name of an existing table resource in your MaxCompute project. An invalid name or a missing table causes an error.
For usage examples, see Reference resources (Python 2 UDFs) and Reference resources (Python 2 UDTFs).
What's next
-
Develop a Python UDF — step-by-step guide to writing, uploading, and testing UDAFs using MaxCompute Studio
-
Built-in functions — reference for built-in aggregate functions that UDAFs can supplement or replace
-
Cross-project resource access based on packages — set up cross-project UDAF sharing
-
Data type editions — understand which data types are available in your project