MaxCompute は CPython 3.7.3 を使用した Python 3 をサポートしています。Python 2 はサポート終了(End of Life:EOL)に達しているため、新規に作成するユーザー定義のテーブル値関数(UDTF)はすべて Python 3 で記述してください。
Python 3 の有効化
デフォルトでは、MaxCompute プロジェクトの UDF は Python 2 を使用します。Python 3 を使用するには、SQL ステートメントの前に以下のセッションレベルのコマンドを追加し、両方をまとめて実行します:
set odps.sql.python.version=cp37;UDTF のコード構造
MaxCompute Studio を使用して Python 3 で UDTF コードを作成します。UDTF は以下の 4 つのコンポーネントで構成されます:
| コンポーネント | 必須 | 説明 |
|---|---|---|
| モジュールのインポート | 必須 | from odps.udf import annotate および from odps.udf import BaseUDTF のインポートが必須です。ファイルまたはテーブルを参照する場合は、from odps.distcache import get_cache_file または from odps.distcache import get_cache_table も追加でインポートしてください。 |
| 関数シグネチャ | 任意 | @annotate(<signature>) で宣言します。入力パラメーターおよび戻り値のデータの型を定義します。シグネチャを指定しない場合、任意の入力データの型を受け付け、すべての戻り値は STRING 型として扱われます。 |
| カスタム Python クラス | 必須 | BaseUDTF を継承した派生クラスです。ビジネスロジックに必要な変数およびメソッドを定義します。 |
| クラスメソッド | 必須 | 下記の表に示す必須メソッドを実装します。 |
クラスメソッド
| メソッド | 必須 | 呼び出しタイミング | 説明 |
|---|---|---|---|
BaseUDTF.init() | 任意 | 最初のレコード処理前(1 回のみ) | 初期化メソッドです。オーバーライドする場合は、先頭で super(BaseUDTF, self).init() を呼び出してください。レコード間で維持される内部状態のセットアップに使用します。 |
BaseUDTF.process([args, ...]) | 必須 | SQL の各レコードごと(1 回) | 各入力行を処理します。process 関数のパラメーターは、SQL ステートメントで指定された UDTF の入力パラメーターです。 |
BaseUDTF.forward([args, ...]) | 必須 | ユーザーのコードから呼び出し | 1 回の呼び出しで 1 行を出力します。forward メソッドのパラメーターは、SQL ステートメントで指定された UDTF の出力パラメーターです。関数シグネチャを指定しない場合、forward を呼び出す前にすべての値を STRING に変換してください。 |
BaseUDTF.close() | 任意 | 最後のレコード処理前(1 回のみ) | クリーンアップメソッドです。UDTF の終了時にリソースを解放するために使用します。 |
以下は、カンマ区切りの文字列を個別の行に分割する最小限の UDTF の例です:
# 関数シグネチャモジュールおよび基底クラスをインポートします。
from odps.udf import annotate
from odps.udf import BaseUDTF
# 関数シグネチャ:STRING を受け取り、STRING を返します。
@annotate('string -> string')
# BaseUDTF を継承したカスタム Python クラスです。
class Explode(BaseUDTF):
def process(self, arg):
props = arg.split(',')
for p in props:
self.forward(p)Python 2 UDTF と Python 3 UDTF は、それぞれ異なる基盤となる Python バージョンで実行されます。各 UDTF は、対象とする Python バージョンの構文および機能に基づいて記述してください。
制限事項
Python 3 は Python 2 と互換性がありません。1 つの SQL ステートメント内で Python 2 UDTF と Python 3 UDTF を混在させることはできません。
Python 2 UDTF の移行
Python 2 はサポート終了(EOL)に達しています。既存の Python 2 UDTF を、プロジェクトの状況に応じて移行してください:
新規プロジェクトまたは初回の Python UDTF 作成の場合:すべての Python UDTF を最初から Python 3 で記述してください。
多数の Python 2 UDTF を含む既存プロジェクトの場合:業務への影響を最小限に抑えるため、段階的に移行してください。以下のいずれかのアプローチを選択します:
新規の UDTF は Python 3 で記述し、その UDTF を使用するジョブに対してセッションレベルで Python 3 を有効化します。詳細については、「Python 3 の有効化」をご参照ください。
既存の Python 2 UDTF を Python 2 および Python 3 の両方と互換性を持つように書き直します。「Python 2 コードの Python 3 への移行」を参考にしてください。
UDTF が複数の MaxCompute プロジェクト間で共有されている場合、Python 2 をまだ使用しているプロジェクトで動作を保証するため、Python 2 および Python 3 の両方と互換性を持つように実装してください。
サードパーティライブラリ
NumPy は MaxCompute の Python 3 実行環境に含まれていません。UDTF で NumPy を使用するには、NumPy の WHL パッケージを手動でリソースとしてアップロードしてください。Python Package Index(PyPI)またはイメージから期待されるファイル名は以下のとおりです:
numpy-<Version>-cp37-cp37m-manylinux1_x86_64.whlパッケージのアップロード手順については、「リソース操作」または「Python UDF におけるサードパーティパッケージの参照」をご参照ください。
関数シグネチャおよびデータの型
関数シグネチャは、UDTF の入力パラメーターおよび戻り値のデータの型を宣言します。MaxCompute はセマンティクス解析時にこのシグネチャを検証し、実際の型が一致しない場合にはエラーを返します。
シグネチャの形式
@annotate('arg_type_list -> type_list')arg_type_list:カンマ区切りの入力パラメーターの型一覧です。*を指定すると任意の数のパラメーターを受け付け、空欄の場合はパラメーターを一切受け付けません。type_list:戻り値の型一覧です。UDTF は複数のカラムを返すことができます。
type_list でサポートされる型:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、および複合型(ARRAY、MAP、STRUCT)— ネストされた複合型も含みます。
arg_type_list でサポートされる型:上記のすべての型に加え、CHAR および VARCHAR がサポートされます。
使用するデータの型は、ご利用の MaxCompute プロジェクトのデータの型エディションに基づいて選択してください。
シグネチャの例
| シグネチャ | 説明 |
|---|---|
@annotate('bigint,boolean->string,datetime') | 2 つの入力パラメーター(BIGINT、BOOLEAN)および 2 つの戻り値(STRING、DATETIME)。 |
@annotate('*->string,datetime') | 任意の数の入力パラメーターおよび 2 つの戻り値(STRING、DATETIME)。 |
@annotate('->double,bigint,string') | 入力パラメーターなし、3 つの戻り値(DOUBLE、BIGINT、STRING)。 |
@annotate("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>") | 複合型の入力および出力。 |
データの型マッピング
MaxCompute SQL の型に対応する Python の型を使用して Python UDTF を記述してください:
| MaxCompute SQL の型 | Python 3 の型 |
|---|---|
| BIGINT | INT |
| STRING | UNICODE |
| DOUBLE | FLOAT |
| BOOLEAN | BOOL |
| DATETIME | DATETIME.DATETIME |
| FLOAT | FLOAT |
| CHAR | UNICODE |
| VARCHAR | UNICODE |
| BINARY | BYTES |
| DATE | DATETIME.DATE |
| DECIMAL | DECIMAL.DECIMAL |
| ARRAY | LIST |
| MAP | DICT |
| STRUCT | COLLECTIONS.NAMEDTUPLE |
リファレンスリソース
Python UDTF 内でファイルおよびテーブルを参照するには、odps.distcache モジュールを使用します。
odps.distcache.get_cache_file(resource_name)
ファイルリソースの内容を返します。
resource_name:ご利用の MaxCompute プロジェクト内に存在する既存のファイルリソースの名前です。無効な名前や存在しないファイルを指定した場合、エラーが返されます。ファイルライクオブジェクトを返します。使用後は
close()を呼び出してファイルハンドルを解放してください。UDTF の作成時にファイルリソースを宣言する必要があります。この宣言を省略した場合、UDTF の呼び出し時にエラーが返されます。
odps.distcache.get_cache_table(resource_name)
テーブルリソースの内容を返します。
resource_name:ご利用の MaxCompute プロジェクト内に存在する既存のテーブルリソースの名前です。無効な名前や存在しないテーブルを指定した場合、エラーが返されます。ジェネレーターを返します。このジェネレーターを反復処理すると、各行ごとに 1 つのレコード(各レコードは ARRAY)が得られます。
以下の例では、JSON ファイルおよびテーブルリソースからデータを読み込み、ルックアップに基づいて行を出力します:
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
def __init__(self):
import json
# JSON ファイルリソースを辞書に読み込みます。
cache_file = get_cache_file('test_json.txt')
self.my_dict = json.load(cache_file)
cache_file.close()
# テーブルリソースのレコードを辞書に追加します。
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = record[1]
def process(self, pageid):
# 各入力 pageid について、関連付けられた adid の値をすべて転送します。
for adid in self.my_dict[pageid]:
self.forward(pageid, adid)Python 3 UDTF の呼び出し
開発手順に従って Python 3 UDTF を開発した後、MaxCompute SQL から呼び出します。
プロジェクト内では:UDTF を ビルトイン関数 と同じ方法で呼び出します。
プロジェクト間での呼び出し:他のプロジェクトの UDTF を参照するには、プロジェクト名をプレフィックスとして指定します:
SELECT B:udf_in_other_project(arg0, arg1) AS res FROM table_t;プロジェクト間のリソース共有の設定については、「パッケージベースのプロジェクト間リソース共有」をご参照ください。
MaxCompute Studio で Python 3 UDTF を開発およびテストする方法については、「Python UDF の開発」をご参照ください。