MaxCompute は、Python 2.7 を使用して Python 2 のユーザー定義テーブル値関数 (UDTF) を実行します。UDTF は、1 つの入力行を受け取り、0 個以上の出力行を返すため、データの分割や展開などの操作に役立ちます。
Python 2 UDTF を作成して使用するには:
BaseUDTFを拡張し、必要なメソッドを実装する Python クラスを記述します。MaxCompute でクラスを UDTF として登録し、MaxCompute SQL で呼び出します。
UDTF のコード構造
Python 2 UDTF は、最大 5 つのコンポーネントで構成されます。
| コンポーネント | 必須 | 説明 |
|---|---|---|
| エンコーディング宣言 | いいえ | ファイルのエンコーディングを宣言します。#coding:utf-8 または # -*- coding: utf-8 -*- を使用します。コードに日本語などのマルチバイト文字が含まれる場合は、これを追加してください。これがないと、MaxCompute はランタイムにエラーを返します。 |
| モジュールのインポート | はい | from odps.udf import annotate と from odps.udf import BaseUDTF を含める必要があります。UDTF がファイルリソースまたはテーブルリソースを参照する場合は、from odps.distcache import get_cache_file または from odps.distcache import get_cache_table を追加します。 |
| 関数シグネチャ | いいえ | @annotate(<signature>) で UDTF にアノテーションを付け、入力と出力のデータ型を宣言します。シグネチャがない場合、MaxCompute は任意の入力型を受け入れますが、すべての出力値を STRING として扱います。 |
| 派生クラス | はい | BaseUDTF を拡張する Python クラスです。このクラスには、すべての UDTF ロジックが含まれます。 |
| クラスメソッド | はい | 少なくとも process を実装します。以下のメソッドの表をご参照ください。 |
メソッド
| メソッド | 必須 | 説明 |
|---|---|---|
BaseUDTF.init() | いいえ | 最初のレコードが処理される前に状態を初期化します。init をオーバーライドする場合は、最初に super(BaseUDTF, self).init() を呼び出します。これを使用して、UDTF がレコード間で維持する必要がある状態をセットアップします。 |
BaseUDTF.process([args, ...]) | はい | 各入力行に対して 1 回呼び出されます。引数は、SQL で宣言された UDTF の入力パラメーターと一致します。 |
BaseUDTF.forward([args, ...]) | はい (process 内で呼び出されます) | 呼び出されるたびに 1 つの出力行を生成します。返したい行ごとに 1 回呼び出します。関数シグネチャが定義されていない場合は、forward を呼び出す前にすべての引数を STRING に変換します。 |
BaseUDTF.close() | いいえ | 最後のレコードが処理される前に 1 回呼び出されます。これを使用して、リソースを解放したり、出力をフラッシュしたりします。 |
例
次の UDTF は、カンマ区切りの文字列を分割し、各値を個別の行として出力します。
#coding:utf-8
from odps.udf import annotate
from odps.udf import BaseUDTF
@annotate('string -> string')
class Explode(BaseUDTF):
def process(self, arg):
props = arg.split(',')
for p in props:
self.forward(p)関数シグネチャとデータ型
シグネチャのフォーマット
@annotate('arg_type_list -> type_list')arg_type_list:入力パラメーターの型のカンマ区切りリストです。*を使用して任意の数の引数を受け入れるか、空白のままにして引数を受け入れないようにします。type_list:戻り値の型のカンマ区切りリストです。UDTF は複数の列を返すことができます。
次の表に、有効なシグネチャの例を示します。
| シグネチャ | 入力型 | 戻り値の型 |
|---|---|---|
@annotate('bigint,boolean->string,datetime') | BIGINT, BOOLEAN | STRING, DATETIME |
@annotate('*->string,datetime') | 任意の数の引数 | STRING, DATETIME |
@annotate('->double,bigint,string') | なし | DOUBLE, BIGINT, STRING |
@annotate("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>") | ARRAY, STRUCT, STRING | MAP, STRUCT |
セマンティック解析中に、MaxCompute は実際の引数のデータ型がシグネチャと一致するかどうかをチェックします。不一致の場合はエラーが返されます。
利用可能なデータ型は、ご利用の MaxCompute プロジェクトのデータ型エディションによって異なります。詳細については、「データ型エディション」をご参照ください。
データ型のマッピング
MaxCompute SQL の型に対応する Python の型を使用して UDTF コードを記述します。
| MaxCompute SQL 型 | Python 2 型 |
|---|---|
| BIGINT | int |
| STRING | str |
| DOUBLE | float |
| BOOLEAN | bool |
| DATETIME | int (1970 年 1 月 1 日 00:00:00 協定世界時 (UTC) からのミリ秒) |
| FLOAT | float |
| CHAR | str |
| VARCHAR | str |
| BINARY | bytearray |
| DATE | int |
| DECIMAL | decimal.Decimal |
| ARRAY | list |
| MAP | dict |
| STRUCT | collections.namedtuple |
型処理に関する追加の注意点:
MaxCompute SQL の NULL は、Python の
Noneにマッピングされます。odps.udf.int(value, silent=True)は、値を int に変換できない場合にエラーを発生させる代わりにNoneを返します。
ファイルリソースとテーブルリソースの参照
odps.distcache モジュールを使用して、ファイルリソースまたはテーブルリソースを UDTF にロードします。
get_cache_file(resource_name):指定されたファイルリソースのファイルのようなオブジェクトを返します。完了したら、オブジェクトでclose()を呼び出します。UDTF を登録する際にファイルリソースを宣言してください。そうしないと、呼び出しはランタイムに失敗します。get_cache_table(resource_name):指定されたテーブルリソースに対するジェネレーターを返します。各反復では、レコードがリスト (ARRAY 型) として生成されます。
次の例では、JSON ファイルとテーブルリソースをロードし、それらを使用してページ ID で広告 ID を検索します。
# -*- coding: utf-8 -*-
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
def __init__(self):
import json
# JSON ファイルリソースを dict にロードします
cache_file = get_cache_file('test_json.txt')
self.my_dict = json.load(cache_file)
cache_file.close()
# テーブルリソースからレコードをマージします
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = [record[1]]
def process(self, pageid):
# ページに関連付けられた広告 ID ごとに 1 行を出力します
for adid in self.my_dict[pageid]:
self.forward(pageid, adid)MaxCompute SQL での UDTF の呼び出し
開発プロセスを完了した後、MaxCompute SQL から UDTF を呼び出します:
プロジェクト内では: UDTF を、ビルトイン関数 を呼び出すのと同じ方法で呼び出します。
プロジェクト間: プロジェクト A でプロジェクト B の UDTF を使用するには、関数名の前にプロジェクト名をプレフィックスとして付けます:
SELECT B:udf_in_other_project(arg0, arg1) AS res FROM table_t;詳細については、「パッケージを利用したプロジェクト間のリソースアクセス」をご参照ください。
制限事項
MaxCompute は、サンドボックス環境で Python 2 UDTF コードを実行します。以下の操作は許可されていません:
ローカルファイルの読み取りまたは書き込み
サブプロセスの開始
スレッドの開始
ソケット接続の開始
他のシステムからの Python 2 UDF の呼び出し
Python 標準ライブラリを使用するコードのみをアップロードしてください。上記の制限された操作に依存するモジュールまたは C 拡張モジュールは利用できません。
利用可能な C 拡張モジュール
サンドボックスでは、次の C 拡張モジュールが利用可能です:
array、 audioop、 binascii、 bisect、 cmath、 _codecs_cn、 _codecs_hk、 _codecs_iso2022、 _codecs_jp、 _codecs_kr、 _codecs_tw、 _collections、 cStringIO、 datetime、 _functools、 future_builtins、 _heapq、 _hashlib、 itertools、 _json、 _locale、 _lsprof、 math、 _md5、 _multibytecodec、 operator、 _random、 _sha256、 _sha512、 _sha、 _struct、 strop、 time、 unicodedata、 _weakref、 cPickle
拡張モジュールに依存しない、純粋に Python で実装されたすべてのモジュールも利用可能です。
出力サイズの制限
sys.stdout または sys.stderr への書き込みは 20 KB に制限されています。この制限を超える文字は、警告なしに破棄されます。
サードパーティライブラリ
NumPy などのサードパーティライブラリは、MaxCompute Python 2 環境にプリインストールされています。サードパーティライブラリでは、ローカルデータアクセスとほとんどのネットワーク I/O API は無効になっており、限定的なネットワーク I/O のみ利用可能です。