このトピックでは、Java ユーザー定義関数 (UDF) および Python UDF で複合データ型を使用する方法について説明します。
説明
この例では、UDF_COMPLEX_DATA という名前の UDF を作成します。
この例では、UDF で ARRAY、MAP、および STRUCT 複合データ型を使用する方法を示します。 Java UDF の場合、オーバーロードメソッドに基づいて、前述のデータ型に同じ UDF 名を使用できます。 Python UDF の場合、前述のデータ型を使用するには、UDF_COMPLEX_DATA_ARRAY、UDF_COMPLEX_DATA_MAP、および UDF_COMPLEX_DATA_STRUCT という名前の UDF を作成する必要があります。
構文:
array<string> UDF_COMPLEX_DATA(array<bigint> <as>) map<string, string> UDF_COMPLEX_DATA(map<string,bigint> <ms>) struct<output_name:string,output_time:string> UDF_COMPLEX_DATA(<input_name:string,input_timestamp:bigint> <st>)
説明:
入力タイムスタンプを yyyy-MM-dd HH:mm:ss 形式の時間文字列に変換します。 入力パラメーターは、ARRAY、MAP、および STRUCT 複合データ型です。
パラメーター:
as: このパラメーターは ARRAY<BIGINT> データ型です。 タイムスタンプのリストが指定されています。 このパラメーターは必須です。
ms: このパラメーターは MAP<STRING, BIGINT> データ型です。 各マップ要素の値はタイムスタンプです。 このパラメーターは必須です。
st: このパラメーターは STRUCT データ型です。 input_timestamp フィールドの値はタイムスタンプです。
開発と使用方法
1. UDF を記述する
Java UDF のサンプルコード
package com.aliyun; // パッケージ名を指定します。
import com.aliyun.odps.data.Struct;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.text.SimpleDateFormat;
import java.util.*;
@Resolve("struct<input_name:string, input_timestamp:bigint>->map<string,string>")
public class ComplexDataTypeExample extends UDF{
private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";
/**
* タイムスタンプのリストを時間文字列のリストに変換します。
* @param timestamps タイムスタンプのリストを入力します。
* @return 時間文字列のリストを取得します。
*/
public List<String> evaluate(List<Long> timestamps) {
if (timestamps == null) {
return null;
}
List<String> result = new ArrayList<>();
SimpleDateFormat formatter = new SimpleDateFormat(PATTERN);
for (Long timestamp : timestamps) {
Date date = new Date(timestamp < 9999999999L ? timestamp * 1000 : timestamp);
String dateString = formatter.format(date);
result.add(dateString);
}
return result;
}
/**
* MAP データ型のタイムスタンプを MAP データ型の時間文字列に変換します。
* @param timestamps 値がタイムスタンプである MAP データ型を入力します。
* @return MAP データ型の時間文字列のリストを取得します。
*/
public Map<String, String> evaluate(Map<String, Long> timestamps) {
if (timestamps == null) {
return null;
}
Map<String, String> result = new HashMap<>(timestamps.size());
SimpleDateFormat formatter = new SimpleDateFormat(PATTERN);
for (String key : timestamps.keySet()) {
Long timestamp = timestamps.get(key);
Date date = new Date(timestamp < 9999999999L ? timestamp * 1000 : timestamp);
String dateString = formatter.format(date);
result.put(key, dateString);
}
return result;
}
/**
* タイムスタンプを時間文字列に変換します。
* @param input STRUCT データ型のタイムスタンプを入力します。
* @return STRUCT データ型の時間文字列を取得します。
*/
public Map<String, String> evaluate(Struct input) {
if (input == null) {
return null;
}
SimpleDateFormat formatter = new SimpleDateFormat(PATTERN);
String nameValue = (String) input.getFieldValue("input_name");
Long timestampValue = (Long) input.getFieldValue("input_timestamp");
Date date = new Date(timestampValue < 9999999999L ? timestampValue * 1000 : timestampValue);
String dateString = formatter.format(date);
Map<String, String> result = new HashMap<>(8);
result.put("output_name", nameValue);
result.put("output_time", dateString);
return result;
}
}次の依存関係を pom.xml に導入する必要があります。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>0.29.10-public</version>
</dependency>サンプルコードでは、UDF のオーバーロード用に 3 つの evaluate メソッドが定義されています。
メソッド 1: java.util.List クラスに対応するパラメーターとして ARRAY を使用します。
メソッド 2: java.util.Map クラスに対応するパラメーターとして MAP を使用します。
メソッド 3: com.aliyun.odps.data.Struct クラスに対応するパラメーターとして STRUCT を使用します。
説明com.aliyun.odps.data.Struct クラスの reflection 機能を使用して、フィールドの名前とデータ型を取得することはできません。 UDF に STRUCT データ型を使用する場合、
@Resolve アノテーションを com.aliyun.odps.data.Struct クラスに追加する必要があります。 このアノテーションは、入力パラメーターまたは戻り値に com.aliyun.odps.data.Struct クラスが含まれる UDF のオーバーロードにのみ影響します。
Java で UDF を記述する場合は、UDF クラスを継承する必要があります。 この例では、evaluate メソッドは LIST、MAP、および STRUCT 型の 3 つの入力パラメーターを定義し、それぞれ LIST、MAP、および MAP 型の値を返します。 入力パラメーターと戻り値のデータ型は、SQL 文で UDF のシグネチャとして使用されます。 その他のコード仕様と要件の詳細については、「Java UDF」をご参照ください。
Python 3 UDF のサンプルコード
次の例では、入力パラメーターは MAP<STRING, BIGINT> データ型です。 サンプルコードは、UDF_COMPLEX_DATA_MAP という名前の UDF を作成します。
from odps.udf import annotate import datetime @annotate('map<string,bigint>->map<string,datetime>') class MapExample: def evaluate(self, input_dict): output_dict = dict() for key in input_dict: value = input_dict[key] t = datetime.datetime.fromtimestamp(value) output_dict[key] = t return output_dict次の例では、入力パラメーターは ARRAY<BIGINT> データ型です。 サンプルコードは、UDF_COMPLEX_DATA_ARRAY という名前の UDF を作成します。
from odps.udf import annotate import datetime @annotate('array<bigint>->array<datetime>') class ArrayExample: def evaluate(self, input_list): output_list = list() for item in input_list: t = datetime.datetime.fromtimestamp(item) output_list.append(t) return output_list次の例では、入力パラメーターは STRUCT<input_name:string,input_timestamp:bigint> データ型です。 サンプルコードは、UDF_COMPLEX_DATA_STRUCT という名前の UDF を作成します。
from odps.udf import annotate import datetime, collections @annotate('struct<input_name:string,input_timestamp:bigint>->struct<output_name:string,output_time:datetime>') class StructExample: def evaluate(self, input_namedtuple): OutputNamedTuple = collections.namedtuple('output_namedtuple', ['output_name', 'output_time']) name_val = input_namedtuple.input_name time_val = datetime.datetime.fromtimestamp(input_namedtuple.input_timestamp) output_namedtuple = OutputNamedTuple(name_val, time_val) return output_namedtuple
デフォルトでは、MaxCompute プロジェクトで UDF を実行するために Python 2 が使用されます。 Python 3 で UDF を実行する場合は、セッションレベルで次のコマンドを実行します: set odps.sql.python.version=cp37。 Python 3 UDF 仕様の詳細については、「Python 3 UDF」をご参照ください。
2. リソースをアップロードし、UDF を作成する
UDF コードを開発およびデバッグした後、リソースを MaxCompute にアップロードし、UDF を作成します。 この例では、UDF_COMPLEX_DATA という名前の Java UDF を作成します。 Python UDF の場合、次の UDF を作成する必要があります: UDF_COMPLEX_DATA_ARRAY、UDF_COMPLEX_DATA_MAP、および UDF_COMPLEX_DATA_STRUCT。 リソースをアップロードして Java UDF を作成する方法の詳細については、「Java プログラムをパッケージ化し、パッケージをアップロードし、MaxCompute UDF を作成する」をご参照ください。 リソースをアップロードして Python UDF を作成する方法の詳細については、「Python プログラムをアップロードし、MaxCompute UDF を作成する」をご参照ください。
3. UDF を使用する
次のコマンドを実行して、ARRAY データ型のタイムスタンプを時間文字列に変換します。
-- Java UDF を呼び出します。 SELECT UDF_COMPLEX_DATA(array(1554047999, 1554047989)); -- Python UDF を呼び出します。 set odps.sql.python.version=cp37; -- Python3 UDF を使用するには、このコマンドを実行して Python3 を有効にします。 SELECT UDF_COMPLEX_DATA_ARRAY(array(1554047999, 1554047989));次の結果が返されます。
+---------------------------------------------+ | _c0 | +---------------------------------------------+ | [2019-03-31 23:59:59, 2019-03-31 23:59:49] | +---------------------------------------------+次のコマンドを実行して、MAP データ型のタイムスタンプを時間文字列に変換します。
-- Java UDF を呼び出します。 SELECT UDF_COMPLEX_DATA(map('date1', 1554047989, 'date2', 1554047999)); -- Python UDF を呼び出します。 set odps.sql.python.version=cp37; -- Python3 UDF を使用するには、このコマンドを実行して Python3 を有効にします。 SELECT UDF_COMPLEX_DATA_MAP(map('date1', 1554047989, 'date2', 1554047999));次の結果が返されます。
+----------------------------------------------------------------+ | _c0 | +----------------------------------------------------------------+ | {"date1":"2019-03-31 23:59:49","date2":"2019-03-31 23:59:59"} | +----------------------------------------------------------------+次のコマンドを実行して、STRUCT データ型のタイムスタンプを時間文字列に変換します。
-- Java UDF を呼び出します。 SELECT UDF_COMPLEX_DATA(struct('date', 1554047989)); -- Python UDF を呼び出します。 set odps.sql.python.version=cp37; -- Python3 UDF を使用するには、このコマンドを実行して Python3 を有効にします。 SELECT UDF_COMPLEX_DATA_STRUCT(struct('date', 1554047989));次の結果が返されます。
+-------------------------------------------------------------+ | _c0 | +-------------------------------------------------------------+ | {"output_name":"date","output_time":"2019-03-31 23:59:49"} | +-------------------------------------------------------------+