すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Python 3 UDTFを使用してMaxComputeからリソースを読み取る

最終更新日:Jan 06, 2025

このトピックでは、Python 3ユーザー定義テーブル値関数 (UDTF) を使用して、MaxComputeクライアントでMaxComputeからリソースを読み取る方法について説明します。

前提条件

MaxComputeクライアントがインストールされています。 MaxComputeクライアントのインストール方法の詳細については、「MaxComputeクライアントのインストールと設定」をご参照ください。

UDTFの動的パラメータ

Python UDTFの関数シグネチャの形式の詳細については、「関数シグネチャとデータ型」をご参照ください。

  • パラメーターリストでアスタリスク (*) を使用して、入力パラメーターの長さとタイプを指定できます。 たとえば、@ annotate('double,*->string ') は、最初のパラメーターがDOUBLEデータ型で、その後に任意の長さと型のパラメーターが続くパラメーターリストを示します。 この場合、コードをコンパイルして入力パラメーターの数と種類を計算し、Cプログラミング言語のprintf関数に基づいてそれらを管理する必要があります。

    説明

    戻り値のアスタリスク (*) は異なる意味を示します。

  • UDTFの戻り値でアスタリスク (*) を使用して、STRINGデータ型の任意の数の値を返すことができることを示すことができます。 戻り値の数は、関数が呼び出されたときに設定されるエイリアスの数に基づいています。 たとえば、@ annotate("bigint,string->double,*") の呼び出しメソッドは、UDTF(x, y) as (a, b, c) です。 この例では、エイリアスabcとして設定されます。 エディタは、aがDOUBLE型であり、bおよびcがSTRINGデータ型であることを識別する。 アノテーションで返される最初の列の値のデータ型が指定されます。 この例では、3つの戻り値が提供される。 したがって、UDTFによって呼び出されるforwardメソッドは、3つの要素の配列を転送する必要があります。 それ以外の場合は、エラーが返されます。

    説明

    ただし、コンパイル中にエラーは返されません。 したがって、UDTFの呼び出し元は、UDTFで定義されているルールに基づいてSQLのエイリアスの数を設定する必要があります。 集計関数の戻り値の数は1に固定されています。 したがって、このルールはユーザー定義の集計関数 (UDAF) には影響しません。

サンプルコード

  • MaxComputeからリソースを読み取る

    from odps.udf import annotate
    from odps.udf import BaseUDTF
    from odps.distcache import get_cache_file
    from odps.distcache import get_cache_table
    @annotate('string -> string, bigint')
    class UDTFExample(BaseUDTF):
        """Read pageid and adid_list from the file get_cache_file and the table get_cache_table to generate dict.
        """
        def __init__(self):
            import json
            cache_file = get_cache_file('test_json.txt')
            self.my_dict = json.load(cache_file)
            cache_file.close()
            records = list(get_cache_table('table_resource1'))
            for record in records:
                self.my_dict[record[0]] = record[1]
        """Enter pageid and generate pageid and all adid values.
        """
        def process(self, pageid):
            for adid in self.my_dict[pageid]:
                self.forward(pageid, adid)
  • 動的パラメーターの設定

    from odps.udf import annotate
    from odps.udf import BaseUDTF
    import json
    @annotate('string,*->string,*')
    class JsonTuple(BaseUDTF):
        def process(self, *args):
            length = len(args)
            result = [None] * length
            try:
                obj = json.loads(args[0])
                for i in range(1, length):
                    result[i] = str(obj.get(args[i]))
            except Exception as err:
                result[0] = str(err)
                for i in range(1, length):
                    result[i] = None
            self.forward(*result)

    この例では、戻り値の数は入力パラメーターの数に基づいています。 最初の出力パラメーターはJSONファイルを示し、その他の出力パラメーターはJSONファイルのキーに基づいて解析されます。 最初の戻り値は、JSONファイルの解析中のエラーメッセージです。 エラーが発生しない場合、JSONファイルの解析中に生成されるコンテンツは、入力キーのシーケンスに基づいて提供されます。 サンプル文:

    -- Configure the number of output aliases based on that of input parameters. 
    SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons;
    
    -- The variable-length part can have no columns. 
    SELECT my_json_tuple(json) as exceptions FROM jsons;
    
    -- An error occurs when the following SQL statement is executed because the number of aliases does not match the actual number. 
    -- This error is not returned during compilation. 
    SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;

手順

  1. サンプルコードをpy_udtf_example.pyファイルとして保存し、MaxComputeクライアントのbinフォルダーにファイルを配置します。

  2. MaxComputeクライアントにログインして、table_resource1という名前のリソーステーブルとtmp1という名前の内部テーブルを作成し、テーブルにデータを挿入します。 リソースファイルtest_json.txtを準備し、MaxComputeクライアントのbinフォルダーにファイルを配置します。 tmp1テーブルは、後続の操作でデータを書き込むためにDMLステートメントを実行するテーブルです。

    MaxComputeクライアントにログインする方法の詳細については、「MaxComputeクライアントのインストールと起動」をご参照ください。 次のサンプルコードは、この手順で実行できる操作を示しています。

    • table_resource1という名前のリソーステーブルを作成し、テーブルにデータを挿入します。

      create table if not exists table_resource1 (pageid string, adid_list array<int>);
      insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));
      説明

      table_resource1のadid_listフィールドはARRAY型です。 set odps.sql.python.version=cp37; ステートメントをセッションレベルで実行し、Python 3がARRAY型のデータを読み取ることができるようにします。

    • tmp1という名前の内部テーブルを作成し、テーブルにデータを挿入します。

      create table if not exists tmp1 (pageid string);
      insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
    • リソースファイルtest_json.txtの内容を表示します。

      {"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
  3. MaxComputeクライアントでmy_udtfという名前のUDTFを作成します。

    UDTFの作成方法の詳細については、「UDFの作成」をご参照ください。 例:

    create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
  4. MaxComputeクライアントでSQL文を実行し、作成されたUDTFを呼び出します。

    サンプル文:

    • 例1: UDTFのみを使用してSQL文を実行します。

      select my_udtf(pageid) as (pageid, adid) from tmp1;

      次の応答が返されます。

      +------------+------------+
      | pageid     | adid       |
      +------------+------------+
      | front_page | 1          |
      | front_page | 2          |
      | front_page | 3          |
      | contact_page1 | 3          |
      | contact_page1 | 4          |
      | contact_page1 | 5          |
      | contact_page3 | 5          |
      | contact_page3 | 6          |
      | contact_page3 | 7          |
      +------------+------------+
    • 例2: 例1のステートメントを書き換え、LATERAL VIEW句とUDTFを使用してSQLステートメントを実行します。

      select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;

      次の応答が返されます。

      +--------+------------+
      | pageid | adid       |
      +--------+------------+
      | front_page | 1          |
      | front_page | 2          |
      | front_page | 3          |
      | contact_page1 | 3          |
      | contact_page1 | 4          |
      | contact_page1 | 5          |
      | contact_page3 | 5          |
      | contact_page3 | 6          |
      | contact_page3 | 7          |
      +--------+------------+
    • 例3: SQLステートメントを実行するには、集計関数、LATERAL VIEW句、およびUDTFを使用します。

      select adid, count(1) as cnt
          from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid
      group by adid;

      次の応答が返されます。

      +------------+------------+
      | adid       | cnt        |
      +------------+------------+
      | 1          | 1          |
      | 2          | 1          |
      | 3          | 2          |
      | 4          | 1          |
      | 5          | 2          |
      | 6          | 1          |
      | 7          | 1          |
      +------------+------------+