Use a Python 3 user-defined table-valued function (UDTF) to read MaxCompute resources — including resource files and resource tables — on the MaxCompute client.
Prerequisites
Before you begin, ensure that you have:
-
The MaxCompute client installed. For more information, see Install and configure the MaxCompute client.
Handler class methods
A Python 3 UDTF is implemented as a class that extends BaseUDTF. The class methods are:
| Method | Required | Description |
|---|---|---|
__init__ |
No | Initializes state. Use it to load resource files and tables once at startup, before any rows are processed. |
process |
Yes | Processes each input row. Call forward() inside this method to emit output rows. |
The @annotate decorator defines the function signature. Call forward(*args) inside process to emit each output row — one forward call produces one output row.
Dynamic parameters
For a complete reference on function signature formats and data types, see Function signatures and data types.
Asterisks (*) in @annotate signatures have different meanings depending on their position:
In parameter lists: * means the input accepts any number of additional parameters of any type. For example, @annotate('double,*->string') declares a DOUBLE first parameter followed by any number of parameters. You must compile code to calculate the number and types of input parameters, and manage them based on the printf function in the C programming language. The process method receives these as *args and must handle them explicitly.
In return values: * means any number of STRING values are returned, with the count determined by the number of aliases provided at call time. For example, @annotate("bigint,string->double,*") called as UDTF(x, y) as (a, b, c) returns three values: a as DOUBLE, and b and c as STRING. The forward call inside process must emit an array whose length exactly matches the alias count — a mismatch causes a runtime error, not a compile-time error.
This * rule in return values applies to UDTFs only. User-defined aggregate functions (UDAFs) always return exactly one value.
Sample code
Read resources from MaxCompute
The following UDTF reads page-to-ad mappings from a JSON resource file (test_json.txt) and a resource table (table_resource1), then emits one row per ad ID for each input page ID.
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
"""Read pageid and adid_list from the file get_cache_file and the table get_cache_table to generate dict.
"""
def __init__(self):
import json
cache_file = get_cache_file('test_json.txt')
self.my_dict = json.load(cache_file)
cache_file.close()
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = record[1]
"""Enter pageid and generate pageid and all adid values.
"""
def process(self, pageid):
for adid in self.my_dict[pageid]:
self.forward(pageid, adid)
get_cache_file and get_cache_table (from odps.distcache) load resources by the name used when the resource was registered with add file or add table. Resources are loaded once in __init__ and reused across all process calls.
Use dynamic parameters
The following UDTF parses a JSON string and extracts values by key. The number of return values equals the number of input parameters.
from odps.udf import annotate
from odps.udf import BaseUDTF
import json
@annotate('string,*->string,*')
class JsonTuple(BaseUDTF):
def process(self, *args):
length = len(args)
result = [None] * length
try:
obj = json.loads(args[0])
for i in range(1, length):
result[i] = str(obj.get(args[i]))
except Exception as err:
result[0] = str(err)
for i in range(1, length):
result[i] = None
self.forward(*result)
The first argument (args[0]) is the JSON string. Additional arguments are the keys to extract. The first return value carries any parse error; subsequent values carry the extracted content in key order.
Call this UDTF with a matching number of aliases:
-- Number of output aliases matches number of input parameters
SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons;
-- The variable-length part can have no columns
SELECT my_json_tuple(json) as exceptions FROM jsons;
-- This causes a runtime error: alias count (4) does not match input parameter count (3)
SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;
Register and call a UDTF
The following procedure uses UDTFExample to demonstrate the end-to-end workflow. Save the code as py_udtf_example.py in the bin folder of the MaxCompute client before starting.
Step 1: Create resource tables and prepare data
Log on to the MaxCompute client. For more information, see Start the MaxCompute client.
Create and populate the resource table table_resource1:
create table if not exists table_resource1 (pageid string, adid_list array<int>);
insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));
The adid_list field is of ARRAY type. To enable Python 3 to read ARRAY type data, run set odps.sql.python.version=cp37; at the session level before querying.
Create and populate the internal table tmp1:
create table if not exists tmp1 (pageid string);
insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
Place test_json.txt in the bin folder of the MaxCompute client. The file contains:
{"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
Step 2: Register resources
Add the Python file, the JSON file, and the table as MaxCompute resources. For more information, see Add resources.
add py py_udtf_example.py;
add file test_json.txt;
add table table_resource1 as table_resource1;
Step 3: Create the UDTF
Register the UDTF, listing all resources it depends on. For more information, see Create a UDF.
create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
Step 4: Call the UDTF
Three call patterns are supported. All produce the same output for this example.
Direct call:
select my_udtf(pageid) as (pageid, adid) from tmp1;
With LATERAL VIEW:
select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;
With LATERAL VIEW and an aggregate function:
select adid, count(1) as cnt
from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid
group by adid;
The direct call and LATERAL VIEW queries return the same 9 rows:
+------------+------------+
| pageid | adid |
+------------+------------+
| front_page | 1 |
| front_page | 2 |
| front_page | 3 |
| contact_page1 | 3 |
| contact_page1 | 4 |
| contact_page1 | 5 |
| contact_page3 | 5 |
| contact_page3 | 6 |
| contact_page3 | 7 |
+------------+------------+
The aggregate query returns:
+------------+------------+
| adid | cnt |
+------------+------------+
| 1 | 1 |
| 2 | 1 |
| 3 | 2 |
| 4 | 1 |
| 5 | 2 |
| 6 | 1 |
| 7 | 1 |
+------------+------------+