Python 3 のユーザー定義テーブル関数 (UDTF) を使用して、MaxCompute クライアント上で MaxCompute のリソース (リソースファイルやリソーステーブルなど) を読み取ります。
前提条件
開始する前に、以下を確認してください。
MaxCompute クライアントがインストール済みであること。詳細については、「MaxCompute クライアントのインストールと設定」をご参照ください。
ハンドラークラスのメソッド
Python 3 UDTF は、BaseUDTF を拡張したクラスとして実装されます。クラスのメソッドは次のとおりです。
| メソッド | 必須 | 説明 |
|---|---|---|
__init__ | いいえ | 状態を初期化します。これを使用して、行が処理される前の起動時にリソースファイルとテーブルを一度ロードします。 |
process | はい | 各入力行を処理します。このメソッド内で forward() を呼び出して、出力行を生成します。 |
@annotate デコレーターは関数シグネチャを定義します。process 内で forward(*args) を呼び出して各出力行を生成します。1 回の forward 呼び出しで 1 つの出力行が生成されます。
動的パラメーター
関数シグネチャのフォーマットとデータ型の完全なリファレンスについては、「関数シグネチャとデータ型」をご参照ください。
@annotate シグネチャ内のアスタリスク (*) は、その位置によって意味が異なります。
パラメーターリスト内: * は、入力が任意の型の追加パラメーターを任意の数だけ受け入れることを意味します。たとえば、@annotate('double,*->string') は、DOUBLE 型の最初のパラメーターの後に任意の数のパラメーターが続くことを宣言します。入力パラメーターの数と型を計算し、C 言語の printf 関数に基づいてそれらを管理するコードをコンパイルする必要があります。process メソッドはこれらを *args として受け取り、明示的に処理する必要があります。
戻り値内: * は、任意の数の STRING 値が返されることを意味し、その数は呼び出し時に指定されたエイリアスの数によって決まります。たとえば、@annotate("bigint,string->double,*") を UDTF(x, y) as (a, b, c) として呼び出すと、3 つの値が返されます。a は DOUBLE 型、b と c は STRING 型です。process 内の forward 呼び出しは、エイリアスの数と完全に一致する長さの配列を出力する必要があります。不一致があると、コンパイル時エラーではなく実行時エラーが発生します。
この戻り値における * のルールは UDTF にのみ適用されます。ユーザー定義集計関数 (UDAF) は、常に正確に 1 つの値を返します。
コード例
MaxCompute からのリソースの読み取り
次の UDTF は、JSON リソースファイル (test_json.txt) とリソーステーブル (table_resource1) からページと広告のマッピングを読み取り、各入力ページ ID に対して広告 ID ごとに 1 行を出力します。
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
"""get_cache_file ファイルと get_cache_table テーブルから pageid と adid_list を読み取り、辞書を生成します。
"""
def __init__(self):
import json
cache_file = get_cache_file('test_json.txt')
self.my_dict = json.load(cache_file)
cache_file.close()
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = record[1]
"""pageid を入力し、pageid とすべての adid 値を生成します。
"""
def process(self, pageid):
for adid in self.my_dict[pageid]:
self.forward(pageid, adid)get_cache_file と get_cache_table (odps.distcache から) は、add file または add table でリソースが登録されたときに使用された名前でリソースをロードします。リソースは __init__ で一度ロードされ、すべての process 呼び出しで再利用されます。
動的パラメーターの使用
次の UDTF は、JSON 文字列を解析し、キーによって値を抽出します。戻り値の数は入力パラメーターの数と等しくなります。
from odps.udf import annotate
from odps.udf import BaseUDTF
import json
@annotate('string,*->string,*')
class JsonTuple(BaseUDTF):
def process(self, *args):
length = len(args)
result = [None] * length
try:
obj = json.loads(args[0])
for i in range(1, length):
result[i] = str(obj.get(args[i]))
except Exception as err:
result[0] = str(err)
for i in range(1, length):
result[i] = None
self.forward(*result)最初の引数 (args[0]) は JSON 文字列です。追加の引数は抽出するキーです。最初の戻り値には解析エラーが格納され、後続の値には抽出された内容がキーの順序で格納されます。
この UDTF は、一致する数のエイリアスで呼び出します。
-- 出力エイリアスの数は入力パラメーターの数と一致します
SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons;
-- 可変長部分には列がない場合もあります
SELECT my_json_tuple(json) as exceptions FROM jsons;
-- これは実行時エラーを引き起こします:エイリアスの数 (4) が入力パラメーターの数 (3) と一致しません
SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;UDTF の登録と呼び出し
次のプロシージャでは、UDTFExample を使用して、エンドツーエンドのワークフローをデモンストレーションします。開始する前に、コードを py_udtf_example.py として MaxCompute クライアントの bin フォルダーに保存します。
ステップ 1:リソーステーブルの作成とデータの準備
MaxCompute クライアントにログインします。詳細については、「MaxCompute クライアントの起動」をご参照ください。
リソーステーブル table_resource1 を作成し、データを入力します。
create table if not exists table_resource1 (pageid string, adid_list array<int>);
insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));adid_list フィールドは ARRAY 型です。Python 3 で ARRAY 型のデータを読み取れるようにするには、クエリを実行する前にセッションレベルで set odps.sql.python.version=cp37; を実行します。
内部テーブル tmp1 を作成し、データを入力します。
create table if not exists tmp1 (pageid string);
insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");test_json.txt を MaxCompute クライアントの bin フォルダーに配置します。ファイルの内容は次のとおりです。
{"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}ステップ 2:リソースの登録
Python ファイル、JSON ファイル、およびテーブルを MaxCompute リソースとして追加します。詳細については、「リソースの追加」をご参照ください。
add py py_udtf_example.py;
add file test_json.txt;
add table table_resource1 as table_resource1;ステップ 3:UDTF の作成
UDTF が依存するすべてのリソースをリストアップし、UDTF を登録します。詳細については、「UDF の作成」をご参照ください。
create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';ステップ 4:UDTF の呼び出し
3 つの呼び出しパターンがサポートされています。この例では、すべて同じ出力が生成されます。
直接呼び出し:
select my_udtf(pageid) as (pageid, adid) from tmp1;LATERAL VIEW を使用:
select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;LATERAL VIEW と集計関数を使用:
select adid, count(1) as cnt
from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid
group by adid;直接呼び出しと LATERAL VIEW クエリは、同じ 9 行を返します。
+------------+------------+
| pageid | adid |
+------------+------------+
| front_page | 1 |
| front_page | 2 |
| front_page | 3 |
| contact_page1 | 3 |
| contact_page1 | 4 |
| contact_page1 | 5 |
| contact_page3 | 5 |
| contact_page3 | 6 |
| contact_page3 | 7 |
+------------+------------+集計クエリは次を返します。
+------------+------------+
| adid | cnt |
+------------+------------+
| 1 | 1 |
| 2 | 1 |
| 3 | 2 |
| 4 | 1 |
| 5 | 2 |
| 6 | 1 |
| 7 | 1 |
+------------+------------+