MaxCompute supports three types of Python 2 user-defined functions (UDFs): UDFs, user-defined aggregate functions (UDAFs), and user-defined table-valued functions (UDTFs). This topic describes how to implement these Python 2 UDFs.

Limits

MaxCompute uses Python 2.7. User code is run in a sandbox, which is a restricted environment. In this environment, the following operations are prohibited:
  • Read and write local files.
  • Start subprocesses.
  • Start threads.
  • Initiate socket-based communication.
  • Call other systems.

Due to these limits, all code that you upload must be implemented by standard Python, and C extension modules are not allowed in your code.

In addition, all modules that involve the preceding operations are disabled in the Python standard library. The following modules are available in the standard library:
  • All modules that are implemented by standard Python.
  • The following C extension modules:
    • array and audioop
    • binascii and bisect
    • cmath, _codecs_cn, _codecs_hk, _codecs_iso2022, _codecs_jp, _codecs_kr, _codecs_tw, _collections, and cStringIO
    • datetime
    • _functools and future_builtins
    • _heapq and _hashlib
    • itertools
    • _json
    • _locale and _lsprof
    • math, _md5, and _multibytecodec
    • operator
    • _random
    • _sha256, _sha512, _sha, _struct, and strop
    • time
    • unicodedata
    • _weakref
    • cPickle
  • Some modules with limited functionality. For example, a sandbox limits the size of data that your code can write to the standard output and standard error output, namely, sys.stdout and sys.stderr, to 20 KB. Characters that exceed this limit are ignored.

Third-party libraries

Common third-party libraries such as NumPy are installed in the runtime environment of Python 2 to supplement the standard library.
Note The use of third-party libraries is also subject to limits. For example, when you use a third-party library, you are not allowed to access local data and you can use only limited network I/O resources. The related APIs in the third-party libraries are disabled.

Data types of parameters and return values

The data types of parameters and return values are specified by the following method:
@odps.udf.annotate(signature)

Python UDFs support the following MaxCompute SQL data types: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, ARRAY, MAP, STRUCT, and nested complex data types. Complex data types include ARRAY, MAP, and STRUCT. Before you execute an SQL statement, the data types of parameters and return values of all functions must be determined. Python is a dynamically typed language. You must add decorators to the UDF class to specify the function signature.

The function signature is specified by a string. The following syntax is supported:
arg_type_list '->' type_list
arg_type_list: type_list | '*' | '' | 'char(n)' | 'varchar(n)'
type_list: [type_list ','] type
type: 'bigint' | 'string' | 'double' | 'boolean' | 'datetime' | 'float' | 'binary' | 'date' | 'decimal' | 'decimal(precision,scale)'
  • The part to the left of the arrow indicates the data types of parameters. The part to the right of the arrow indicates the data types of return values.
  • The return value of a UDTF can contain multiple columns, whereas that of a UDF or UDAF contains only one column.
  • An asterisk (*) represents variable-length parameters. If variable-length parameters are used, UDFs, UDTFs, and UDAFs can match input parameters of any data type.
  • In a project that uses the MaxCompute V2.0 data type edition, you can set the decimal parameter to precision or scale. For more information, see MaxCompute V2.0 data type edition.
  • You cannot specify the CHAR or VARCHAR type for return values.
The following example shows a valid signature:
'bigint,double->string'            # The parameters are of the BIGINT and DOUBLE types and the return values are of the STRING type.
'bigint,boolean->string,datetime'  # The parameters are of the BIGINT and BOOLEAN types and the return values are of the STRING and DATETIME types.
'*->string'                        # The parameters are variable-length parameters and can be of any data type, and the return values are of the STRING type.
'->double'                         # The parameters are left empty and the return values are of the DOUBLE type.
'array<bigint>->struct<x:string, y:int>' # The parameters are of the ARRAY<BIGINT> type and the return values are of the STRUCT<x:STRING, y:INT> type.
'->map<bigint, string>'            # The parameters are left empty and the return values are of the MAP<BIGINT, STRING> type.

If an invalid signature is found during query parsing, an error is returned and the execution of the function is prohibited. During the execution of the function, the parameters of the type that is specified by the function signature are transferred. The return values must be of the same type as that specified by the function signature. Otherwise, an error is returned.

The following table lists the mappings between MaxCompute SQL data types and Python 2 data types.
MaxCompute SQL data type Python 2 data type
BIGINT INT
STRING STR
DOUBLE FLOAT
BOOLEAN BOOL
DATETIME INT
FLOAT FLOAT
CHAR STR
VARCHAR STR
BINARY BYTEARRAY
DATE INT
DECIMAL DECIMAL.DECIMAL
ARRAY LIST
MAP DICT
STRUCT COLLECTIONS.NAMEDTUPLE
Note
  • A value of the DATETIME type is converted to a value of the INT type. The value of the INT type indicates the number of milliseconds that have elapsed since the epoch time January 1, 1970, 00:00:00 UTC. You can process data of the DATETIME type by using the datetime module in the Python standard library.
  • silent is added to odps.udf.int(value,[silent=True]). If silent is set to True and value cannot be converted to the INT type, None is returned, and no error is reported.
  • NULL in MaxCompute SQL corresponds to None in Python.

UDFs

To implement Python UDFs, you must define a new-style class and implement the evaluate method.
from odps.udf import annotate
@annotate("bigint,bigint->bigint")
class MyPlus(object):
   def evaluate(self, arg0, arg1):
       if None in (arg0, arg1):
           return None
       return arg0 + arg1
Note A Python UDF must have its signature specified by using annotate.

UDAFs

  • class odps.udf.BaseUDAF: the base class of Python UDAFs. It can be inherited to implement Python UDAFs.
  • BaseUDAF.new_buffer(): returns buffer of the intermediate value of a UDAF. buffer must be a marshallable object such as a list or a dictionary, and the buffer size cannot increase with the data volume. Under extreme circumstances, the buffer size cannot exceed 2 MB after the marshal operation.
  • BaseUDAF.iterate(buffer[, args, ...]): aggregates args to buffer of the intermediate value.
  • BaseUDAF.merge(buffer, pbuffer): aggregates buffer of two intermediate values. It merges pbuffer to buffer.
  • BaseUDAF.terminate(buffer): converts buffer of the intermediate value to a value of a basic data type in MaxCompute SQL.
The following example shows how to use a UDAF to obtain the average value:
@annotate('double->double')
class Average(BaseUDAF):
    def new_buffer(self):
        return [0, 0]
    def iterate(self, buffer, number):
        if number is not None:
            buffer[0] += number
            buffer[1] += 1
    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]
    def terminate(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]

UDTFs

  • class odps.udf.BaseUDTF: the base class of Python UDTFs. It can be inherited to implement the process and close methods.
  • BaseUDTF.__init__(): the initialization method. To implement this method for a derived class, you must call the super(BaseUDTF, self).__init__() initialization method of the base class at the beginning of code execution. Throughout the lifecycle of a UDTF, the init method is called only once. It is called only before the first record is processed. If a UDTF needs to save internal states, you can call the init method to initialize all states.
  • BaseUDTF.process([args, ...]): is called by the MaxCompute SQL framework. The process method is called to process each record involved in the SQL statements. The UDTF input parameters that are specified in the SQL statements are used as the parameters of the process method.
  • BaseUDTF.forward([args, ...]): the UDTF output method. This method is called by user code. One record is generated each time the forward method is called. The UDTF output parameters that are specified in the SQL statements are used as the parameters of the forward method.
  • BaseUDTF.close(): the UDTF termination method. Throughout the lifecycle of a UDTF, this method is called only once by the MaxCompute SQL framework. It is called only after the last record is processed.
The following example shows how to use a UDTF:
#coding:utf-8
# explode.py
from odps.udf import annotate
from odps.udf import BaseUDTF
@annotate('string -> string')
class Explode(BaseUDTF):
   """ Use commas (,) to separate the string into multiple records.
   def process(self, arg):
       props = arg.split(',')
       for p in props:
           self.forward(p)
Note The data types of parameters and return values of a Python UDTF can be specified without the need to use annotate to specify the UDTF signature. In this case, the UDTF can match input parameters of any data type in SQL statements. However, the data types of return values cannot be inferred. All output parameters are considered to be of the STRING type. Therefore, when the forward method is called, all output values must be converted to the STRING type.

Resource reference

You can reference file and table resources in Python UDFs by using the odps.distcache module.

  • odps.distcache.get_cache_file(resource_name): returns the content of a specific file resource.
    • resource_name is a string that corresponds to the name of an existing file resource in the current project. If the resource name is invalid or the file resource does not exist, an error is returned.
      Note To reference a file resource in a UDF, you must declare the file resource when you create the UDF. Otherwise, an error is returned when the UDF is called.
    • The return value is a file-like object. If this object is no longer used, the caller must call the close method to release the open file resource.
    Example:
    @annotate('bigint->string')
    class DistCacheExample(object):
    def __init__(self):
        cache_file = get_cache_file('test_distcache.txt')
        kv = {}
        for line in cache_file:
            line = line.strip()
            if not line:
                continue
            k, v = line.split()
            kv[int(k)] = v
        cache_file.close()
        self.kv = kv
    def evaluate(self, arg):
        return self.kv.get(arg)
  • odps.distcache.get_cache_table(resource_name): returns the content of a specific table resource.
    • resource_name is a string that corresponds to the name of an existing table resource in the current project. If the resource name is invalid or the table resource does not exist, an error is returned.
    • The return value is of the generator type. The caller traverses the table to obtain the content. Each time the caller traverses the table, a record of the ARRAY type is obtained.
    Example:
    from odps.udf import annotate
    from odps.distcache import get_cache_table
    @annotate('->string')
    class DistCacheTableExample(object):
        def __init__(self):
            self.records = list(get_cache_table('udf_test'))
            self.counter = 0
            self.ln = len(self.records)
        def evaluate(self):
            if self.counter > self.ln - 1:
                return None
            ret = self.records[self.counter]
            self.counter += 1
            return str(ret)