MaxCompute UDFs are classified into UDFs, UDAFs, and UDTFs. This topic describes how to implement these functions by using MaxCompute Python.

Limits

MaxCompute uses Python 2.7. User code is executed in a sandbox, which is a restricted environment. In this environment, the following operations are prohibited:
  • Read and write local files.
  • Start subprocesses.
  • Start threads.
  • Initiate socket-based communication.
  • Call other systems.

Due to these restrictions, uploaded code must all be implemented by Python, because C extension modules are disabled.

In addition, the Python standard library disables all modules that involve these operations. Available modules in the standard library include:
  • All modules purely implemented by Python.
  • The following C extension modules:
    • array and audioop
    • binascii and bisect
    • cmath, _codecs_cn, _codecs_hk, _codecs_iso2022, _codecs_jp, _codecs_kr, _codecs_tw, _collections, and cStringIO
    • datetime
    • _functools and future_builtins
    • _heapq and _hashlib
    • itertools
    • _json
    • _locale and _lsprof
    • math, _md5, and _multibytecodec
    • operator
    • _random
    • _sha256, _sha512, _sha, _struct, and strop
    • time
    • unicodedata
    • _weakref
    • cPickle
  • Some modules have limited functionality. For example, a sandbox limits the size of data that your code can write to the standard output and standard error output, namely, sys.stdout and sys.stderr, to 20 KB. Any characters that exceed this limit are ignored.

Third-party libraries

Common third-party libraries such as NumPy are installed in the runtime environment to supplement the standard library.
Note The use of third-party libraries is also subject to restrictions. For example, local or remote I/O operations are prohibited. Therefore, the related APIs in the third-party libraries are disabled.

Types of parameters and return values

The types of parameters and return values are specified by the following method:
@odps.udf.annotate(signature)

Python UDFs support the following MaxCompute SQL data types: BIGINT, STRING, DOUBLE, BOOLEAN, and DATETIME. Before you execute an SQL statement, the types of parameters and return values of all functions must be determined. Python is a dynamically typed language. You must add decorators to the UDF class to specify the function signature.

The function signature is specified by a string. The syntax is as follows:
arg_type_list '->' type_list
arg_type_list: type_list | '*' | ''
type_list: [type_list ','] type
type: 'bigint' | 'string' | 'double' | 'boolean' | 'datetime'
Syntax descriptions
  • The part to the left of the arrow indicates the types of parameters. The part to the right of the arrow indicates the types of return values.
  • The return value of a UDTF can contain multiple columns, while that of a UDF or UDAF only contains one column.
  • An asterisk (*) represents variable-length parameters. If variable-length parameters are used, UDFs, UDTFs, and UDAFs can match input parameters of any type.
The following is an example of a valid signature:
'bigint,double->string'            # The parameters are of the BIGINT and DOUBLE types, and the return value is of the STRING type.
'bigint,boolean->string,datetime'  # The UDTF parameters are of the BIGINT and BOOLEAN types, and the data in the return value are of the STRING and DATETIME types. BIGINT-type parameters correspond to STRING-type returned data, and BOOLEAN-type parameters correspond to DATETIME-type returned data.
'*->string'                        # The input parameters are variable-length parameters and can be of any type, and the return value is of the STRING type.
'->double'                         # The parameters are left blank, and the return value is of the DOUBLE type.
If an invalid signature is found during query parsing, an error is returned and the execution of the function is prohibited. During the execution of the function, UDF parameters are transferred to you in the type specified by the function signature. The return value must be of the same type as that specified by the function signature. Otherwise, an error is returned. The following table lists the mappings between Python data types and MaxCompute SQL data types.
MaxCompute SQL data type Python data type
BIGINT INT
STRING STR
DOUBLE FLOAT
BOOLEAN BOOL
DATETIME INT
Note
  • A value of the DATETIME type is transferred to user code in the INT type. It indicates the number of milliseconds that have elapsed since the Epoch time January 1, 1970, 00:00:00 UTC. You can process data of the DATETIME type by using the DATETIME module in the Python standard library.
  • The NULL value corresponds to None in Python.

In addition, silent is added to odps.udf.int(value,[silent=True]). If silent is set to True and value cannot be converted to the INT type, None is returned, and no error is reported.

UDFs

To implement Python UDFs, you must define a new-style class and implement the EVALUATE method.
from odps.udf import annotate
@annotate("bigint,bigint->bigint")
class MyPlus(object):
   def evaluate(self, arg0, arg1):
       if None in (arg0, arg1):
           return None
       return arg0 + arg1
Note A Python UDF must have its signature specified by using annotate.

UDAFs

  • class odps.udf.BaseUDAF: is inherited to implement Python UDAFs.
  • BaseUDAF.new_buffer(): returns buffer of the intermediate value of a UDAF. buffer must be a Marshallable object (such as LIST or DICT), and the buffer size cannot increase with the data volume. Under extreme circumstances, the buffer size cannot exceed 2 MB after the marshal operation.
  • BaseUDAF.iterate(buffer[, args, ...]): aggregates args to buffer of the intermediate value.
  • BaseUDAF.merge(buffer, pbuffer): aggregates buffer of two intermediate values. It merges pbuffer to buffer.
  • BaseUDAF.terminate(buffer): converts buffer of the intermediate value to a value of a basic data type in MaxCompute SQL.
The following example uses a UDAF to obtain the average value.
@annotate('double->double')
class Average(BaseUDAF):
    def new_buffer(self):
        return [0, 0]
    def iterate(self, buffer, number):
        if number is not None:
            buffer[0] += number
            buffer[1] += 1
    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]
    def terminate(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]

UDTFs

  • class odps.udf.BaseUDTF: the base class of Python UDTFs. It can be inherited to implement the PROCESS and CLOSE methods.
  • BaseUDTF.__init__(): the initialization method. To implement this method for the derived class, you must call the super(BaseUDTF, self).__init__() initialization method of the base class at the beginning of code execution. Throughout the lifecycle of a UDTF, the INIT method is called only once. It is only called before the first record is processed. If a UDTF needs to save internal states, all states can be initialized by using this method.
  • BaseUDTF.process([args, ...]): is called by the MaxCompute SQL framework. The PROCESS method is called for each record in SQL. The parameters in the PROCESS method are the UDTF input parameters specified in SQL statements.
  • BaseUDTF.forward([args, ...]): the UDTF output method. This method is called by user code. One record is generated each time the FORWARD method is called. The parameters in the FORWARD method are the UDTF output parameters specified in SQL statements.
  • BaseUDTF.close(): the UDTF termination method. This method is called only once by the MaxCompute SQL framework. It is only called after the last record is processed.
An example is as follows:
#coding:utf-8
# explode.py
from odps.udf import annotate
from odps.udf import BaseUDTF
@annotate('string -> string')
class Explode(BaseUDTF):
   """ Use commas (,) to separate the string into multiple records.
   def process(self, arg):
       props = arg.split(',')
       for p in props:
           self.forward(p)
Note The types of parameters and the return value of a Python UDTF can be specified without the need to use annotate to specify the UDTF signature. In this case, the function can match input parameters of any type in SQL. However, the type of the return value cannot be deduced. All output parameters are considered to be of the STRING type. Therefore, when FORWARD is called, all output values must be converted to the STRING type.

Resource reference

You can reference file and table resources in Python UDFs by using the odps.distcache module.

  • odps.distcache.get_cache_file(resource_name): returns the content of a specific file resource.
    • resource_name is a string that corresponds to the name of an existing file resource in the current project. If the resource name is invalid or no file resource exists, an error is returned.
    • The return value is a file-like object. If this object is no longer used, the caller must call the CLOSE method to release the open file resource.
    An example is as follows:
    @annotate('bigint->string')
    class DistCacheExample(object):
    def __init__(self):
        cache_file = get_cache_file('test_distcache.txt')
        kv = {}
        for line in cache_file:
            line = line.strip()
            if not line:
                continue
            k, v = line.split()
            kv[int(k)] = v
        cache_file.close()
        self.kv = kv
    def evaluate(self, arg):
        return self.kv.get(arg)
  • odps.distcache.get_cache_table(resource_name): returns the content of a specific table resource.
    • resource_name is a string that corresponds to the name of an existing table resource in the current project. If the resource name is invalid or no table resource exists, an error is returned.
    • The return value is of the generator type. The caller traverses the table to obtain the content. Each time the caller traverses the table, a record of the ARRAY type is obtained.
    An example is as follows:
    from odps.udf import annotate
    from odps.distcache import get_cache_table
    @annotate('->string')
    class DistCacheTableExample(object):
        def __init__(self):
            self.records = list(get_cache_table('udf_test'))
            self.counter = 0
            self.ln = len(self.records)
        def evaluate(self):
            if self.counter > self.ln - 1:
                return None
            ret = self.records[self.counter]
            self.counter += 1
            return str(ret)