Python官方即将停止维护Python 2,MaxCompute已支持Python 3,对应版本为CPython-3.7.3。本文为您介绍如何通过Python 3语言编写UDF。

UDF代码结构

您可以通过MaxCompute Studio工具使用Python 3语言编写UDF代码,代码中需要包含如下信息:
  • 导入模块:必选。

    至少要包含from odps.udf import annotate,导入函数签名模块,MaxCompute才可以识别后续代码中定义的函数签名。当UDF代码中需要引用文件资源或表资源时,需要包含from odps.distcache import get_cache_file(文件资源)或from odps.distcache import get_cache_table(表资源)。

  • 函数签名:必选。

    格式为@annotate(<signature>)signature用于定义函数的输入参数和返回值的数据类型。更多函数签名信息,请参见函数签名与数据类型

  • 自定义Python类:必选。

    UDF代码的组织单位,定义了实现业务需求的变量及方法。您还可以在代码中引用MaxCompute内置的第三方库或引用文件、表资源。更多信息,请参见第三方库引用资源

  • evaluate方法:必选。

    位于自定义的Python类中。evaluate方法定义了输入参数和返回值。一个Python类中只能包含一个evaluate方法。

UDF代码示例如下。
#导入函数签名模块。
from odps.udf import annotate
#函数签名。
@annotate("bigint,bigint->bigint")
#自定义Python类。
class MyPlus(object):
#evaluate方法。
   def evaluate(self, arg0, arg1):
       if None in (arg0, arg1):
           return None
       return arg0 + arg1
说明 Python 2 UDF与Python 3 UDF区别在于底层Python语言版本不一致,请您根据对应版本语言支持的能力编写UDF。

使用限制

Python 3与Python 2不兼容。在您使用Python 3之前,需要考虑兼容性问题,在一个SQL中不允许同时使用Python 3和Python 2。

Python 2 UDF迁移

Python 2官方即将停止维护,建议您根据项目类型执行迁移操作:
  • 全新项目:新MaxCompute项目,或第一次使用Python语言编写UDF的MaxCompute项目。建议所有的Python UDF都直接使用Python 3语言编写。
  • 存量项目:创建了大量Python 2 UDF的MaxCompute项目。请您谨慎开启Python 3。如果您计划逐步将所有Python 2 UDF迁移为Python 3 UDF,推荐方法如下:
    • 新作业和新UDF:使用Python 3语言编写,在Session级别开启Python 3。开启Python 3方法,请参见开启Python 3
    • Python 2 UDF:改写Python 2 UDF,使其可以同时兼容Python 2和Python 3。改写方法请参见将Python 2代码移植到Python 3
      说明 如果您需要编写公共UDF,并为多个MaxCompute项目授权UDF的操作权限,建议UDF同时兼容Python 2和Python 3。

开启Python 3

MaxCompute默认使用Python 2,如果您要使用Python 3,可以在Session级别设置如下属性开启Python 3,并与SQL语句一起提交执行。
set odps.sql.python.version=cp37;

第三方库

MaxCompute内置的Python 3运行环境中未安装第三方库Numpy。如果您需要使用Numpy的UDF,请手动上传Numpy的WHEEL包。从PyPI或镜像下载Numpy包时,包的文件名为numpy-<版本号>-cp37-cp37m-manylinux1_x86_64.whl。上传包的操作请参见资源操作Python UDF使用第三方包

Python 3支持的标准库列表请参见Python 3标准库

函数签名与数据类型

函数签名格式如下。
@annotate(<signature>)
signature为字符串,用于标识输入参数和返回值的数据类型。执行UDF时,UDF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。
'arg_type_list -> type'
其中:
  • arg_type_list:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
    arg_type_list还支持星号(*)或为空(''):
    • arg_type_list为星号(*)时,表示输入参数为任意个数。
    • arg_type_list为空('')时,表示无输入参数。
  • type:表示返回值的数据类型。UDF只返回一列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
说明 在编写UDF代码过程中,您可以根据MaxCompute项目的数据类型版本选取合适的数据类型,更多数据类型版本及各版本支持的数据类型信息,请参见数据类型版本说明
合法的函数签名示例如下。
函数签名示例 说明
'bigint,double->string' 输入参数类型为BIGINT、DOUBLE,返回值类型为STRING。
'*->string' 输入任意个参数,返回值类型为STRING。
'->double' 无输入参数,返回值类型为DOUBLE。
'array<bigint>->struct<x:string, y:int>' 输入参数类型为ARRAY<BIGINT>,返回值类型为STRUCT<x:STRING, y:INT>。
'->map<bigint, string>' 无输入参数,返回值类型为MAP<BIGINT, STRING>。

为确保编写Python UDF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。

MaxCompute SQL Type Python 3 Type
BIGINT INT
STRING UNICODE
DOUBLE FLOAT
BOOLEAN BOOL
DATETIME DATETIME.DATETIME
FLOAT FLOAT
CHAR UNICODE
VARCHAR UNICODE
BINARY BYTES
DATE DATETIME.DATE
DECIMAL DECIMAL.DECIMAL
ARRAY LIST
MAP DICT
STRUCT COLLECTIONS.NAMEDTUPLE

引用资源

Python UDF可以通过odps.distcache模块引用资源,支持引用文件资源和表资源。

  • odps.distcache.get_cache_file(resource_name, mode):以指定模式mode返回指定文件资源的内容。
    • resource_name支持STRING类型,对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。
    • mode支持STRING类型,默认值为't'。当mode't'时以文本格式打开文件,当mode'b'时以二进制格式打开文件。
    • 返回值为File-like对象。在使用完此对象后,您需要调用close方法释放打开的资源文件。
    引用文件资源示例如下。
    from odps.udf import annotate
    from odps.distcache import get_cache_file
    @annotate('bigint->string')
    class DistCacheExample(object):
    def __init__(self):
        cache_file = get_cache_file('test_distcache.txt')
        kv = {}
        for line in cache_file:
            line = line.strip()
            if not line:
                continue
            k, v = line.split()
            kv[int(k)] = v
        cache_file.close()
        self.kv = kv
    def evaluate(self, arg):
        return self.kv.get(arg)
  • odps.distcache.get_cache_table(resource_name):返回指定资源表的内容。
    • resource_name对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。支持读取表中BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、FLOAT、CHAR、VARCHAR、BINARY、DATE、DECIMAL、ARRAY、MAP和STRUCT类型数据。
    • 返回值为Generator类型,调用者通过遍历获取表的内容,每次遍历得到的是以数组形式存在的表中的一条记录。
引用表资源示例如下。
from odps.udf import annotate
from odps.distcache import get_cache_table
@annotate('->string')
class DistCacheTableExample(object):
    def __init__(self):
        self.records = list(get_cache_table('udf_test'))
        self.counter = 0
        self.ln = len(self.records)
    def evaluate(self):
        if self.counter > self.ln - 1:
            return None
        ret = self.records[self.counter]
        self.counter += 1
        return str(ret)

使用说明

按照开发流程,完成Python 3 UDF开发后,您即可通过MaxCompute SQL调用Python 3 UDF。调用方法如下:
  • 在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。
  • 跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:select B:udf_in_other_project(arg0, arg1) as res from table_t;。更多跨项目分享信息,请参见基于Package跨项目访问资源

使用MaxCompute客户端完整开发及调用Python 3 UDF的操作,请参见Python UDF使用示例