MaxCompute 的 UDF 包括:UDF,UDAF 和 UDTF 三種函數,本文將重點介紹如何通過 Java 實現這三種函數。
參數與傳回值類型
MaxCompute2.0 版本升級後,Java UDF 支援的資料類型從原來的 Bigint,String,Double,Boolean 擴充了更多基本的資料類型,同時還擴充支援了 ARRAY,MAP,STRUCT 等複雜類型。
- Java UDF 使用新基本類型的方法,如下所示:
- UDTF 通過 @Resolve 註解來擷取 signature,如:
@Resolve("smallint->varchar(10)")
。 - UDF 通過反射分析 evaluate 來擷取 signature,此時 MaxCompute 內建類型與 Java 類型符合一一映射關係。
- UDAF通過 @Resolve 註解來擷取 signature,MaxCompute2.0支援在註解中使用新類型,如:
@Resolve("smallint->varchar(10)")
。
- UDTF 通過 @Resolve 註解來擷取 signature,如:
- Java UDF 使用複雜類型的方法,如下所示:
- UDTF 通過 @Resolve annotation 來指定 sinature,如:
@Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")
。 - UDF 通過 evaluate 方法的 signature 來映射 UDF 的輸入輸出類型,此時參考 MaxCompute 類型與 Java 類型的映射關係。其中 array 對應 java.util.List,map 對應 java.util.Map,struct 對應 com.aliyun.odps.data.Struct。
- UDAF通過 @Resolve 註解來擷取 signature,MaxCompute2.0支援在註解中使用新類型,如:
@Resolve("smallint->varchar(10)")
。说明- com.aliyun.odps.data.Struct 從反射看不出 field name 和 field type,所以需要用 @Resolve annotation 來輔助。即如果需要在 UDF 中使用 struct,要求在 UDF class 上也標註上 @Resolve 註解,這個註解只會影響參數或傳回值中包含 com.aliyun.odps.data.Struct 的重載。
- 目前 class 上只能提供一個 @Resolve annotation,因此一個 UDF 中帶有 struct 參數或傳回值的重載只能有一個。
- UDTF 通過 @Resolve annotation 來指定 sinature,如:
MaxCompute Type | Java Type |
---|---|
Tinyint | java.lang.Byte |
Smallint | java.lang.Short |
Int | java.lang.Integer |
Bigint | java.lang.Long |
Float | java.lang.Float |
Double | java.lang.Double |
Decimal | java.math.BigDecimal |
Boolean | java.lang.Boolean |
String | java.lang.String |
Varchar | com.aliyun.odps.data.Varchar |
Binary | com.aliyun.odps.data.Binary |
Datetime | java.util.Date |
Timestamp | java.sql.Timestamp |
Array | java.util.List |
Map | java.util.Map |
Struct | com.aliyun.odps.data.Struct |
- 在UDF中使用輸入或輸出參數的類型請務必使用Java Type,否則會報錯ODPS-0130071。
- Java 中對應的資料類型以及傳回值資料類型是對象,首字母請務必大寫。
- SQL 中的 NULL 值通過 Java 中的 NULL 參考資料表示,因此 Java primitive type 是不允許使用的,因為無法表示 SQL 中的 NULL 值。
- 此處 Array 類型對應的 Java 類型是 List,而不是數組。
UDF
實現 UDF 需要繼承 com.aliyun.odps.udf.UDF 類,並實現 evaluate 方法。evaluate 方法必須是非 static 的 public 方法 。Evaluate 方法的參數和傳回值類型將作為 SQL 中 UDF 的函數簽名。這意味著您可以在 UDF 中實現多個 evaluate 方法,在調用 UDF 時,架構會依據 UDF 調用的參數類型匹配正確的 evaluate 方法 。
特別注意:不同的jar包最好不要有類名相同但實現功能邏輯不一樣的類。如,UDF(UDAF/UDTF): udf1、 udf2分別對應資源udf1.jar、udf2.jar,如果兩個jar包裡都包含一個 com.aliyun.UserFunction.class 類,當同一個sql中同時使用到這兩個udf時,系統會隨機載入其中一個類,那麼就會導致udf執行行為不一致甚至編譯失敗。
package org.alidata.odps.udf.examples;
import com.aliyun.odps.udf.UDF;
public final class Lower extends UDF {
public String evaluate(String s) {
if (s == null) {
return null;
}
return s.toLowerCase();
}
}
可以通過實現void setup(ExecutionContext ctx)
和void close()
來分別實現 UDF 的初始化和結束代碼。
UDF 的使用方式與 MaxCompute SQL 中普通的內建函數相同,詳情請參見 內建函數。
新版的MaxCompute支援定義Java UDF時,使用Writable類型作為參數和傳回值。下面為MaxCompute類型和Java Writable類型的映射關係。
MaxCompute Type | Java Writable Type |
---|---|
tinyint | ByteWritable |
smallint | ShortWritable |
int | IntWritable |
bigint | LongWritable |
float | FloatWritable |
double | DoubleWritable |
decimal | BigDecimalWritable |
boolean | BooleanWritable |
string | Text |
varchar | VarcharWritable |
binary | BytesWritable |
datetime | DatetimeWritable |
timestamp | TimestampWritable |
interval_year_month | IntervalYearMonthWritable |
interval_day_time | IntervalDayTimeWritable |
array | 暫不支援 |
map | 暫不支援 |
struct | 暫不支援 |
其他 UDF 樣本
@Resolve
annotation,來指定 struct 的具體類型。
@Resolve("struct,string->string")
public class UdfArray extends UDF {
public String evaluate(List vals, Long len) {
return vals.get(len.intValue());
}
public String evaluate(Map map, String key) {
return map.get(key);
}
public String evaluate(Struct struct, String key) {
return struct.getFieldValue("a") + key;
}
}
create function my_index as 'UdfArray' using 'myjar.jar';
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from co
UDAF
public abstract class Aggregator implements ContextFunction {
@Override
public void setup(ExecutionContext ctx) throws UDFException {
}
@Override
public void close() throws UDFException {
}
/**
* 建立彙總Buffer
* @return Writable 彙總buffer
*/
abstract public Writable newBuffer();
/**
* @param buffer 彙總buffer
* @param args SQL中調用UDAF時指定的參數,不能為null,但是args裡面的元素可以為null,代表對應的輸入資料是null
* @throws UDFException
*/
abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
/**
* 產生最終結果
* @param buffer
* @return Object UDAF的最終結果
* @throws UDFException
*/
abstract public Writable terminate(Writable buffer) throws UDFException;
abstract public void merge(Writable buffer, Writable partial) throws UDFException;
}
其中最重要的是 iterate,merge 和 terminate 三個介面,UDAF 的主要邏輯依賴於這三個介面的實現。此外,還需要您實現自訂的 Writable buffer。
在上圖中,輸入資料被按照一定的大小進行分區(有關分區的描述請參見 MapReduce),每片的大小適合一個 worker 在適當的時間內完成。這個分區大小的設定需要您手動設定完成。
-
第一階段:每個 worker 統計分區內資料的個數及匯總值,您可以將每個分區內的資料個數及匯總值視為一個中間結果。
-
第二階段:worker 匯總上一個階段中每個分區內的資訊。在最終輸出時,r.sum / r.count 即是所有輸入資料的平均值。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve("double->double")
public class AggrAvg extends Aggregator {
private static class AvgBuffer implements Writable {
private double sum = 0;
private long count = 0;
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(sum);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readDouble();
count = in.readLong();
}
}
private DoubleWritable ret = new DoubleWritable();
@Override
public Writable newBuffer() {
return new AvgBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
DoubleWritable arg = (DoubleWritable) args[0];
AvgBuffer buf = (AvgBuffer) buffer;
if (arg != null) {
buf.count += 1;
buf.sum += arg.get();
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
if (buf.count == 0) {
ret.set(0);
} else {
ret.set(buf.sum / buf.count);
}
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
AvgBuffer p = (AvgBuffer) partial;
buf.sum += p.sum;
buf.count += p.count;
}
}
UDTF
介面定義 | 描述 |
---|---|
public void setup(ExecutionContext ctx) throws UDFException | 初始化方法,在UDTF處理輸入資料前,調用使用者自訂的初始化行為。在每個Worker內setup會被先調用一次。 |
public void process(Object[] args) throws UDFException | 這個方法由架構調用,SQL中每一條記錄都會對應調用一次process,process的參數為SQL語句中指定的UDTF輸入參數。輸入參數以Object[]的形式傳入,輸出結果通過forward函數輸出。使用者需要在process函數內自行調用forward,以決定輸出資料。 |
public void close() throws UDFException | UDTF的結束方法,此方法由架構調用,並且只會被調用一次,即在處理完最後一條記錄之後。 |
public void forward(Object …o) throws UDFException | 使用者調用forward方法輸出資料,每次forward代表輸出一條記錄。對應SQL語句UDTF的as子句指定的列。 |
package org.alidata.odps.udtf.examples;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.udf.UDFException;
// TODO define input and output types, e.g., "string,string->string,bigint".
@Resolve("string,bigint->string,bigint")
public class MyUDTF extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
Long b = (Long) args[1];
for (String t: a.split("\\s+")) {
forward(t, b);
}
}
}
select user_udtf(col0, col1) as (c0, c1) from my_table;
+------+------+
| col0 | col1 |
+------+------+
| A B | 1 |
| C D | 2 |
+------+------+
+----+----+
| c0 | c1 |
+----+----+
| A | 1 |
| B | 1 |
| C | 2 |
| D | 2 |
+----+----+
使用說明
select user_udtf(col0, col1, col2) as (c0, c1) from my_table;
select user_udtf(col0, col1, col2) as (c0, c1) from (select * from my_table distribute by key sort by key) t;
select reduce_udtf(col0, col1, col2) as (c0, c1) from (select col0, col1, col2 from (select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1 distribute by col0 sort by col0, col1) t2;
- 同一個 SELECT 子句中不允許有其他運算式。
select value, user_udtf(key) as mycol ...
- UDTF 不能嵌套使用。
select user_udtf1(user_udtf2(key)) as mycol...
- 不支援在同一個 select 子句中與 group by / distribute by / sort by 聯用。
select user_udtf(key) as mycol ... group by mycol
其他 UDTF 樣本
- 編寫 UDTF 程式,編譯成功後匯出 jar 包(udtfexample1.jar)。
package com.aliyun.odps.examples.udf; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Iterator; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDFException; import com.aliyun.odps.udf.UDTF; import com.aliyun.odps.udf.annotation.Resolve; /** * project: example_project * table: wc_in2 * partitions: p2=1,p1=2 * columns: colc,colb */ @Resolve("string,string->string,bigint,string") public class UDTFResource extends UDTF { ExecutionContext ctx; long fileResourceLineCount; long tableResource1RecordCount; long tableResource2RecordCount; @Override public void setup(ExecutionContext ctx) throws UDFException { this.ctx = ctx; try { InputStream in = ctx.readResourceFileAsStream("file_resource.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line; fileResourceLineCount = 0; while ((line = br.readLine()) != null) { fileResourceLineCount++; } br.close(); Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator(); tableResource1RecordCount = 0; while (iterator.hasNext()) { tableResource1RecordCount++; iterator.next(); } iterator = ctx.readResourceTable("table_resource2").iterator(); tableResource2RecordCount = 0; while (iterator.hasNext()) { tableResource2RecordCount++; iterator.next(); } } catch (IOException e) { throw new UDFException(e); } } @Override public void process(Object[] args) throws UDFException { String a = (String) args[0]; long b = args[1] == null ? 0 : ((String) args[1]).length(); forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount); } }
- 添加資源到 MaxCompute。
Add file file_resource.txt; Add jar udtfexample1.jar; Add table table_resource1 as table_resource1; Add table table_resource2 as table_resource2;
- 在 MaxCompute 中建立 UDTF 函數(my_udtf)。
create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
- 在 MaxCompute 建立資源表 table_resource1、table_resource2 和物理表 tmp1,並插入相應的資料。
- 運行該 UDTF。
select mp_udtf("10","20") as (a, b, fileResourceLineCount) from tmp1; 返回: +-------+------------+-------+ | a | b | fileResourceLineCount | +-------+------------+-------+ | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 | | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 | +-------+------------+-------+
複雜資料類型樣本
@Resolve("struct<a:bigint>,string->string")
public class UdfArray extends UDF {
public String evaluate(List<String> vals, Long len) {
return vals.get(len.intValue());
}
public String evaluate(Map<String,String> map, String key) {
return map.get(key);
}
public String evaluate(Struct struct, String key) {
return struct.getFieldValue("a") + key;
}
}
create function my_index as 'UdfArray' using 'myjar.jar';
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;
HIVE UDF相容樣本
MaxCompute 2.0支援了Hive風格的UDF,有部分的HIVE UDF、UDTF可以直接在MaxCompute上使用。
package com.aliyun.odps.compiler.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class Collect extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
if (objectInspectors.length == 0) {
throw new UDFArgumentException("Collect: input args should >= 1");
}
for (int i = 1; i < objectInspectors.length; i++) {
if (objectInspectors[i] != objectInspectors[0]) {
throw new UDFArgumentException("Collect: input oi should be the same for all args");
}
}
return ObjectInspectorFactory.getStandardListObjectInspector(objectInspectors[0]);
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
List<Object> objectList = new ArrayList<>(deferredObjects.length);
for (DeferredObject deferredObject : deferredObjects) {
objectList.add(deferredObject.get());
}
return objectList;
}
@Override
public String getDisplayString(String[] strings) {
return "Collect";
}
}
--添加資源
Add jar test.jar;
--建立function
CREATE FUNCTION hive_collect as 'com.aliyun.odps.compiler.hive.Collect' using 'test.jar';
--使用function
set odps.sql.hive.compatible=true;
select hive_collect(4y,5y,6y) from dual;
+------+
| _c0 |
+------+
| [4, 5, 6] |
+------+
- MaxCompute的add jar命令會永久地在project中建立一個resource,所以建立udf時需要指定jar包,無法自動將所有jar包加入classpath。
- 在使用相容的HIVE UDF的時候,需要在sql前加set語句
set odps.sql.hive.compatible=true;
語句,set語句和sql語句一起提交執行。 - 在使用相容的HIVE UDF時,還要注意MaxCompute的JAVA沙箱限制。