This topic describes how to use a Python 3 user-defined table-valued function (UDTF) to read resources from MaxCompute on the MaxCompute client.
Prerequisites
The MaxCompute client is installed. For more information, see Install and configure the MaxCompute client.
Dynamic parameters of UDTFs
For more information about the format of the function signature of Python UDTFs, see
Function signatures and data types.
- You can use an asterisk (
*
) in a parameter list to indicate that an input parameter can be of any length and type. For example,@annotate('double,*->string')
indicates a parameter list in which the first parameter is of the DOUBLE data type and is followed by parameters of any length and type. In this case, you must compile code to calculate the number and types of input parameters, and manage them based on theprintf
function in the C programming language.NoteAsterisks (*)
in return values indicate different meanings. Asterisks (*)
can be used in return values of UDTFs to indicate that any number of values of the STRING data type can be returned. The number of return values is based on the number of aliases that are configured when a function is called. For example, the call method of@annotate("bigint,string->double,*")
isUDTF(x, y) as (a, b, c)
. In this example, the aliasesa
,b
, andc
are configured afteras
. The editor identifies thata
is of the DOUBLE type andb
andc
are of the STRING data type. The data type of values in the first column returned in the annotation is given. Three return values are provided in this example. Therefore, theforward
method called by a UDTF mustforward
an array of three elements. Otherwise, an error is returned.Note However, the error is not returned during compilation. Therefore, the UDTF caller must configure the number of aliases in SQL based on the rule that is defined in the UDTF. The number of return values of an aggregate function is fixed to 1. Therefore, this rule has no effect on user-defined aggregate functions (UDAFs).
Sample code
- Read resources from MaxCompute
from odps.udf import annotate from odps.udf import BaseUDTF from odps.distcache import get_cache_file from odps.distcache import get_cache_table @annotate('string -> string, bigint') class UDTFExample(BaseUDTF): """Read pageid and adid_list from the file get_cache_file and the table get_cache_table to generate dict. """ def __init__(self): import json cache_file = get_cache_file('test_json.txt') self.my_dict = json.load(cache_file) cache_file.close() records = list(get_cache_table('table_resource1')) for record in records: self.my_dict[record[0]] = record[1] """Enter pageid and generate pageid and all adid values. """ def process(self, pageid): for adid in self.my_dict[pageid]: self.forward(pageid, adid)
- Configure dynamic parameters
from odps.udf import annotate from odps.udf import BaseUDTF import json @annotate('string,*->string,*') class JsonTuple(BaseUDTF): def process(self, *args): length = len(args) result = [None] * length try: obj = json.loads(args[0]) for i in range(1, length): result[i] = str(obj.get(args[i])) except Exception as err: result[0] = str(err) for i in range(1, length): result[i] = None self.forward(*result)
In this example, the number of return values is based on the number of input parameters. The first output parameter indicates a JSON file, and the other output parameters are parsed based on the keys from the JSON file. The first return value is an error message during the parsing of the JSON file. If no error occurs, the content that is generated during the parsing of the JSON file is provided based on the sequence of input keys. Sample statements:-- Configure the number of output aliases based on that of input parameters. SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons; -- The variable-length part can have no columns. SELECT my_json_tuple(json) as exceptions FROM jsons; -- An error occurs when the following SQL statement is executed because the number of aliases does not match the actual number. -- This error is not returned during compilation. SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;