This topic describes how to use a Python 3 user-defined table-valued function (UDTF) to read resources from MaxCompute on the MaxCompute client.

Prerequisites

The MaxCompute client is installed. For more information, see Install and configure the MaxCompute client.

Dynamic parameters of UDTFs

For more information about the format of the function signature of Python UDTFs, see Function signatures and data types.
  • You can use an asterisk (*) in a parameter list to indicate that an input parameter can be of any length and type. For example, @annotate('double,*->string') indicates a parameter list in which the first parameter is of the DOUBLE data type and is followed by parameters of any length and type. In this case, you must compile code to calculate the number and types of input parameters, and manage them based on the printf function in the C programming language.
    Note Asterisks (*) in return values indicate different meanings.
  • Asterisks (*) can be used in return values of UDTFs to indicate that any number of values of the STRING data type can be returned. The number of return values is based on the number of aliases that are configured when a function is called. For example, the call method of @annotate("bigint,string->double,*") is UDTF(x, y) as (a, b, c). In this example, the aliases a, b, and c are configured after as. The editor identifies that a is of the DOUBLE type and b and c are of the STRING data type. The data type of values in the first column returned in the annotation is given. Three return values are provided in this example. Therefore, the forward method called by a UDTF must forward an array of three elements. Otherwise, an error is returned.
    Note However, the error is not returned during compilation. Therefore, the UDTF caller must configure the number of aliases in SQL based on the rule that is defined in the UDTF. The number of return values of an aggregate function is fixed to 1. Therefore, this rule has no effect on user-defined aggregate functions (UDAFs).

Sample code

  • Read resources from MaxCompute
    from odps.udf import annotate
    from odps.udf import BaseUDTF
    from odps.distcache import get_cache_file
    from odps.distcache import get_cache_table
    @annotate('string -> string, bigint')
    class UDTFExample(BaseUDTF):
        """Read pageid and adid_list from the file get_cache_file and the table get_cache_table to generate dict.
        """
        def __init__(self):
            import json
            cache_file = get_cache_file('test_json.txt')
            self.my_dict = json.load(cache_file)
            cache_file.close()
            records = list(get_cache_table('table_resource1'))
            for record in records:
                self.my_dict[record[0]] = record[1]
        """Enter pageid and generate pageid and all adid values.
        """
        def process(self, pageid):
            for adid in self.my_dict[pageid]:
                self.forward(pageid, adid)
  • Configure dynamic parameters
    from odps.udf import annotate
    from odps.udf import BaseUDTF
    import json
    @annotate('string,*->string,*')
    class JsonTuple(BaseUDTF):
        def process(self, *args):
            length = len(args)
            result = [None] * length
            try:
                obj = json.loads(args[0])
                for i in range(1, length):
                    result[i] = str(obj.get(args[i]))
            except Exception as err:
                result[0] = str(err)
                for i in range(1, length):
                    result[i] = None
            self.forward(*result)
    In this example, the number of return values is based on the number of input parameters. The first output parameter indicates a JSON file, and the other output parameters are parsed based on the keys from the JSON file. The first return value is an error message during the parsing of the JSON file. If no error occurs, the content that is generated during the parsing of the JSON file is provided based on the sequence of input keys. Sample statements:
    -- Configure the number of output aliases based on that of input parameters. 
    SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons;
    
    -- The variable-length part can have no columns. 
    SELECT my_json_tuple(json) as exceptions FROM jsons;
    
    -- An error occurs when the following SQL statement is executed because the number of aliases does not match the actual number. 
    -- This error is not returned during compilation. 
    SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;

Procedure

  1. Save the sample code as a py_udtf_example.py file and place the file in the bin folder of the MaxCompute client.
  2. Log on to the MaxCompute client to create a resource table named table_resource1 and an internal table named tmp1 and insert data into the tables. Prepare the resource file test_json.txt and place the file in the bin folder of the MaxCompute client. The tmp1 table is the table on which you execute DML statements to write data in subsequent operations.
    For more information about how to log on to the MaxCompute client, see Start the MaxCompute client. The following sample code shows the operations that you can perform in this step:
    • Create a resource table named table_resource1 and insert data into the table.
      create table if not exists table_resource1 (pageid string, adid_list array<int>);
      insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));
      Note The adid_list field in table_resource1 is of the ARRAY type. Execute the set odps.sql.python.version=cp37; statement at the session level to enable Python 3 to read data of the ARRAY type.
    • Create an internal table named tmp1 and insert data into the table.
      create table if not exists tmp1 (pageid string);
      insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
    • View the content of the resource file test_json.txt.
      {"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
  3. Add the py_udtf_example.py and test_json.txt files and the table_resource1 table as MaxCompute resources on the MaxCompute client.
    For more information about how to add resources, see Add resources. Sample statements:
    add py py_udtf_example.py;
    add file test_json.txt;
    add table table_resource1 as table_resource1;
  4. Create a UDTF named my_udtf on the MaxCompute client.
    For more information about how to create a UDTF, see Create a UDF. Sample statement:
    create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
  5. Execute SQL statements on the MaxCompute client to call the created UDTF.
    Sample statements:
    • Example 1: Use only a UDTF to execute an SQL statement.
      select my_udtf(pageid) as (pageid, adid) from tmp1;
      The following result is returned:
      +------------+------------+
      | pageid     | adid       |
      +------------+------------+
      | front_page | 1          |
      | front_page | 2          |
      | front_page | 3          |
      | contact_page1 | 3          |
      | contact_page1 | 4          |
      | contact_page1 | 5          |
      | contact_page3 | 5          |
      | contact_page3 | 6          |
      | contact_page3 | 7          |
      +------------+------------+
    • Example 2: Rewrite the statement in Example 1 and use the LATERAL VIEW clause and a UDTF to execute an SQL statement.
      select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;
      The following result is returned:
      +--------+------------+
      | pageid | adid       |
      +--------+------------+
      | front_page | 1          |
      | front_page | 2          |
      | front_page | 3          |
      | contact_page1 | 3          |
      | contact_page1 | 4          |
      | contact_page1 | 5          |
      | contact_page3 | 5          |
      | contact_page3 | 6          |
      | contact_page3 | 7          |
      +--------+------------+
    • Example 3: Use an aggregate function, the LATERAL VIEW clause, and a UDTF to execute an SQL statement.
      select adid, count(1) as cnt
          from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid
      group by adid;
      The following result is returned:
      +------------+------------+
      | adid       | cnt        |
      +------------+------------+
      | 1          | 1          |
      | 2          | 1          |
      | 3          | 2          |
      | 4          | 1          |
      | 5          | 2          |
      | 6          | 1          |
      | 7          | 1          |
      +------------+------------+