Python官方即将停止维护Python 2,MaxCompute已支持Python 3,对应版本为CPython-3.7.3。本文为您介绍如何通过Python 3语言编写UDF。
UDF代码结构
- 导入模块:必选。
至少要包含
from odps.udf import annotate
,导入函数签名模块,MaxCompute才可以识别后续代码中定义的函数签名。当UDF代码中需要引用文件资源或表资源时,需要包含from odps.distcache import get_cache_file
(文件资源)或from odps.distcache import get_cache_table
(表资源)。 - 函数签名:必选。
格式为
@annotate(<signature>)
,signature
用于定义函数的输入参数和返回值的数据类型。更多函数签名信息,请参见函数签名与数据类型。 - 自定义Python类:必选。
UDF代码的组织单位,定义了实现业务需求的变量及方法。您还可以在代码中引用MaxCompute内置的第三方库或引用文件、表资源。更多信息,请参见第三方库或引用资源。
evaluate
方法:必选。位于自定义的Python类中。
evaluate
方法定义了输入参数和返回值。一个Python类中只能包含一个evaluate
方法。
#导入函数签名模块。
from odps.udf import annotate
#函数签名。
@annotate("bigint,bigint->bigint")
#自定义Python类。
class MyPlus(object):
#evaluate方法。
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1
使用限制
Python 3与Python 2不兼容。在您使用Python 3之前,需要考虑兼容性问题,在一个SQL中不允许同时使用Python 3和Python 2。
Python 2 UDF迁移
- 全新项目:新MaxCompute项目,或第一次使用Python语言编写UDF的MaxCompute项目。建议所有的Python UDF都直接使用Python 3语言编写。
- 存量项目:创建了大量Python 2 UDF的MaxCompute项目。请您谨慎开启Python 3。如果您计划逐步将所有Python 2 UDF迁移为Python
3 UDF,推荐方法如下:
- 新作业和新UDF:使用Python 3语言编写,在Session级别开启Python 3。开启Python 3方法,请参见开启Python 3。
- Python 2 UDF:改写Python 2 UDF,使其可以同时兼容Python 2和Python 3。改写方法请参见将Python 2代码移植到Python 3。
说明 如果您需要编写公共UDF,并为多个MaxCompute项目授权UDF的操作权限,建议UDF同时兼容Python 2和Python 3。
开启Python 3
set odps.sql.python.version=cp37;
第三方库
MaxCompute内置的Python 3运行环境中未安装第三方库Numpy。如果您需要使用Numpy的UDF,请手动上传Numpy的WHEEL包。从PyPI或镜像下载Numpy包时,包的文件名为numpy-<版本号>-cp37-cp37m-manylinux1_x86_64.whl。上传包的操作请参见资源操作或Python UDF使用第三方包。
Python 3支持的标准库列表请参见Python 3标准库。
函数签名与数据类型
@annotate(<signature>)
signature
为字符串,用于标识输入参数和返回值的数据类型。执行UDF时,UDF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。'arg_type_list -> type'
其中:arg_type_list
:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。arg_type_list
还支持星号(*)或为空(''):- 当
arg_type_list
为星号(*)时,表示输入参数为任意个数。 - 当
arg_type_list
为空('')时,表示无输入参数。
- 当
type
:表示返回值的数据类型。UDF只返回一列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
函数签名示例 | 说明 |
---|---|
'bigint,double->string' |
输入参数类型为BIGINT、DOUBLE,返回值类型为STRING。 |
'*->string' |
输入任意个参数,返回值类型为STRING。 |
'->double' |
无输入参数,返回值类型为DOUBLE。 |
'array<bigint>->struct<x:string, y:int>' |
输入参数类型为ARRAY<BIGINT>,返回值类型为STRUCT<x:STRING, y:INT>。 |
'->map<bigint, string>' |
无输入参数,返回值类型为MAP<BIGINT, STRING>。 |
为确保编写Python UDF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。
MaxCompute SQL Type | Python 3 Type |
---|---|
BIGINT | INT |
STRING | UNICODE |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | DATETIME.DATETIME |
FLOAT | FLOAT |
CHAR | UNICODE |
VARCHAR | UNICODE |
BINARY | BYTES |
DATE | DATETIME.DATE |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
引用资源
Python UDF可以通过odps.distcache
模块引用资源,支持引用文件资源和表资源。
odps.distcache.get_cache_file(resource_name, mode)
:以指定模式mode
返回指定文件资源的内容。resource_name
支持STRING类型,对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。mode
支持STRING类型,默认值为't'
。当mode
为't'
时以文本格式打开文件,当mode
为'b'
时以二进制格式打开文件。- 返回值为File-like对象。在使用完此对象后,您需要调用
close
方法释放打开的资源文件。
引用文件资源示例如下。from odps.udf import annotate from odps.distcache import get_cache_file @annotate('bigint->string') class DistCacheExample(object): def __init__(self): cache_file = get_cache_file('test_distcache.txt') kv = {} for line in cache_file: line = line.strip() if not line: continue k, v = line.split() kv[int(k)] = v cache_file.close() self.kv = kv def evaluate(self, arg): return self.kv.get(arg)
odps.distcache.get_cache_table(resource_name)
:返回指定资源表的内容。resource_name
对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。支持读取表中BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、FLOAT、CHAR、VARCHAR、BINARY、DATE、DECIMAL、ARRAY、MAP和STRUCT类型数据。- 返回值为Generator类型,调用者通过遍历获取表的内容,每次遍历得到的是以数组形式存在的表中的一条记录。
from odps.udf import annotate
from odps.distcache import get_cache_table
@annotate('->string')
class DistCacheTableExample(object):
def __init__(self):
self.records = list(get_cache_table('udf_test'))
self.counter = 0
self.ln = len(self.records)
def evaluate(self):
if self.counter > self.ln - 1:
return None
ret = self.records[self.counter]
self.counter += 1
return str(ret)
使用说明
- 在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。
- 跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨项目分享信息,请参见基于Package跨项目访问资源。
使用MaxCompute客户端完整开发及调用Python 3 UDF的操作,请参见Python UDF使用示例。