MaxCompute 的 UDF 包括:UDF,UDAF 和 UDTF 三種函數,本文將重點介紹如何通過 Java 實現這三種函數。

參數與傳回值類型

MaxCompute2.0 版本升級後,Java UDF 支援的資料類型從原來的 Bigint,String,Double,Boolean 擴充了更多基本的資料類型,同時還擴充支援了 ARRAY,MAP,STRUCT 等複雜類型。

  • Java UDF 使用新基本類型的方法,如下所示:
    • UDTF 通過 @Resolve 註解來擷取 signature,如:@Resolve("smallint->varchar(10)")
    • UDF 通過反射分析 evaluate 來擷取 signature,此時 MaxCompute 內建類型與 Java 類型符合一一映射關係。
    • UDAF通過 @Resolve 註解來擷取 signature,MaxCompute2.0支援在註解中使用新類型,如:@Resolve("smallint->varchar(10)")
  • Java UDF 使用複雜類型的方法,如下所示:
    • UDTF 通過 @Resolve annotation 來指定 sinature,如:@Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")
    • UDF 通過 evaluate 方法的 signature 來映射 UDF 的輸入輸出類型,此時參考 MaxCompute 類型與 Java 類型的映射關係。其中 array 對應 java.util.List,map 對應 java.util.Map,struct 對應 com.aliyun.odps.data.Struct。
    • UDAF通過 @Resolve 註解來擷取 signature,MaxCompute2.0支援在註解中使用新類型,如: @Resolve("smallint->varchar(10)")
      说明
      • com.aliyun.odps.data.Struct 從反射看不出 field name 和 field type,所以需要用 @Resolve annotation 來輔助。即如果需要在 UDF 中使用 struct,要求在 UDF class 上也標註上 @Resolve 註解,這個註解只會影響參數或傳回值中包含 com.aliyun.odps.data.Struct 的重載。
      • 目前 class 上只能提供一個 @Resolve annotation,因此一個 UDF 中帶有 struct 參數或傳回值的重載只能有一個。
MaxCompute 資料類型與 Java 類型的對應關係,如下所示:
MaxCompute Type Java Type
Tinyint java.lang.Byte
Smallint java.lang.Short
Int java.lang.Integer
Bigint java.lang.Long
Float java.lang.Float
Double java.lang.Double
Decimal java.math.BigDecimal
Boolean java.lang.Boolean
String java.lang.String
Varchar com.aliyun.odps.data.Varchar
Binary com.aliyun.odps.data.Binary
Datetime java.util.Date
Timestamp java.sql.Timestamp
Array java.util.List
Map java.util.Map
Struct com.aliyun.odps.data.Struct
说明
  • 在UDF中使用輸入或輸出參數的類型請務必使用Java Type,否則會報錯ODPS-0130071。
  • Java 中對應的資料類型以及傳回值資料類型是對象,首字母請務必大寫。
  • SQL 中的 NULL 值通過 Java 中的 NULL 參考資料表示,因此 Java primitive type 是不允許使用的,因為無法表示 SQL 中的 NULL 值。
  • 此處 Array 類型對應的 Java 類型是 List,而不是數組。

UDF

實現 UDF 需要繼承 com.aliyun.odps.udf.UDF 類,並實現 evaluate 方法。evaluate 方法必須是非 static 的 public 方法 。Evaluate 方法的參數和傳回值類型將作為 SQL 中 UDF 的函數簽名。這意味著您可以在 UDF 中實現多個 evaluate 方法,在調用 UDF 時,架構會依據 UDF 調用的參數類型匹配正確的 evaluate 方法 。

特別注意:不同的jar包最好不要有類名相同但實現功能邏輯不一樣的類。如,UDF(UDAF/UDTF): udf1、 udf2分別對應資源udf1.jar、udf2.jar,如果兩個jar包裡都包含一個 com.aliyun.UserFunction.class 類,當同一個sql中同時使用到這兩個udf時,系統會隨機載入其中一個類,那麼就會導致udf執行行為不一致甚至編譯失敗。

UDF 的樣本如下:
package org.alidata.odps.udf.examples; 
  import com.aliyun.odps.udf.UDF; 

public final class Lower extends UDF { 
  public String evaluate(String s) { 
    if (s == null) { 
        return null; 
    } 
        return s.toLowerCase(); 
  } 
}

可以通過實現void setup(ExecutionContext ctx)void close()來分別實現 UDF 的初始化和結束代碼。

UDF 的使用方式與 MaxCompute SQL 中普通的內建函數相同,詳情請參見 內建函數

新版的MaxCompute支援定義Java UDF時,使用Writable類型作為參數和傳回值。下面為MaxCompute類型和Java Writable類型的映射關係。

MaxCompute Type Java Writable Type
tinyint ByteWritable
smallint ShortWritable
int IntWritable
bigint LongWritable
float FloatWritable
double DoubleWritable
decimal BigDecimalWritable
boolean BooleanWritable
string Text
varchar VarcharWritable
binary BytesWritable
datetime DatetimeWritable
timestamp TimestampWritable
interval_year_month IntervalYearMonthWritable
interval_day_time IntervalDayTimeWritable
array 暫不支援
map 暫不支援
struct 暫不支援

其他 UDF 樣本

如以下代碼,定義了一個有三個 overloads 的 UDF,其中第一個用了 array 作為參數,第二個用了 map 作為參數,第三個用了 struct。由於第三個 overloads 用了 struct 作為參數或者傳回值,因此要求必須要對 UDF class 打上 @Resolve annotation,來指定 struct 的具體類型。
@Resolve("struct,string->string") 
public class UdfArray extends UDF { 
  public String evaluate(List vals, Long len) { 
    return vals.get(len.intValue()); 
  } 
  public String evaluate(Map map, String key) { 
    return map.get(key); 
  } 
  public String evaluate(Struct struct, String key) { 
  return struct.getFieldValue("a") + key;
 } 
}
使用者可以直接將複雜類型傳入 UDF 中:
create function my_index as 'UdfArray' using 'myjar.jar'; 
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from co

UDAF

實現 Java UDAF 類需要繼承 com.aliyun.odps.udf.Aggregator,並實現如下幾個介面:
public abstract class Aggregator implements ContextFunction {
  @Override
  public void setup(ExecutionContext ctx) throws UDFException {                                                                   
  }
  @Override
  public void close() throws UDFException {    
  }
  /**
   * 建立彙總Buffer
   * @return Writable 彙總buffer
   */
  abstract public Writable newBuffer();
  /**
   * @param buffer 彙總buffer
   * @param args SQL中調用UDAF時指定的參數,不能為null,但是args裡面的元素可以為null,代表對應的輸入資料是null
   * @throws UDFException
   */
  abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
  /**
   * 產生最終結果
   * @param buffer
   * @return Object UDAF的最終結果
   * @throws UDFException
   */
  abstract public Writable terminate(Writable buffer) throws UDFException;
  abstract public void merge(Writable buffer, Writable partial) throws UDFException;
}

其中最重要的是 iterate,merge 和 terminate 三個介面,UDAF 的主要邏輯依賴於這三個介面的實現。此外,還需要您實現自訂的 Writable buffer。

以實現求平均值 avg 為例,下圖簡要說明了在 MaxCompute UDAF 中這一函數的實現邏輯及計算流程:

在上圖中,輸入資料被按照一定的大小進行分區(有關分區的描述請參見 MapReduce),每片的大小適合一個 worker 在適當的時間內完成。這個分區大小的設定需要您手動設定完成。
UDAF 的計算過程分為兩個階段:
  • 第一階段:每個 worker 統計分區內資料的個數及匯總值,您可以將每個分區內的資料個數及匯總值視為一個中間結果。

  • 第二階段:worker 匯總上一個階段中每個分區內的資訊。在最終輸出時,r.sum / r.count 即是所有輸入資料的平均值。

計算平均值的 UDAF 的程式碼範例,如下所示:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve("double->double")
public class AggrAvg extends Aggregator {
  private static class AvgBuffer implements Writable {
    private double sum = 0;
    private long count = 0;
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeDouble(sum);
      out.writeLong(count);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
      sum = in.readDouble();
      count = in.readLong();
    }
  }
  private DoubleWritable ret = new DoubleWritable();
  @Override
  public Writable newBuffer() {
    return new AvgBuffer();
  }
  @Override
  public void iterate(Writable buffer, Writable[] args) throws UDFException {
    DoubleWritable arg = (DoubleWritable) args[0];
    AvgBuffer buf = (AvgBuffer) buffer;
    if (arg != null) {
      buf.count += 1;
      buf.sum += arg.get();
    }
  }
  @Override
  public Writable terminate(Writable buffer) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    if (buf.count == 0) {
      ret.set(0);
    } else {
      ret.set(buf.sum / buf.count);
    }
    return ret;
  }
  @Override
  public void merge(Writable buffer, Writable partial) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    AvgBuffer p = (AvgBuffer) partial;
    buf.sum += p.sum;
    buf.count += p.count;
  }
}
说明
  • UDAF 在 SQL 中的使用文法與普通的內建彙總函式相同,詳情請參見 彙總函式
  • 關於如何運行 UDTF 的方法與 UDF 類似,詳情請參見 運行 UDF
  • String對應的Writable類型為Text。

UDTF

Java UDTF 需要繼承 com.aliyun.odps.udf.UDTF 類。這個類需要實現 4 個介面,如下表所示:
介面定義 描述
public void setup(ExecutionContext ctx) throws UDFException 初始化方法,在UDTF處理輸入資料前,調用使用者自訂的初始化行為。在每個Worker內setup會被先調用一次。
public void process(Object[] args) throws UDFException 這個方法由架構調用,SQL中每一條記錄都會對應調用一次process,process的參數為SQL語句中指定的UDTF輸入參數。輸入參數以Object[]的形式傳入,輸出結果通過forward函數輸出。使用者需要在process函數內自行調用forward,以決定輸出資料。
public void close() throws UDFException UDTF的結束方法,此方法由架構調用,並且只會被調用一次,即在處理完最後一條記錄之後。
public void forward(Object …o) throws UDFException 使用者調用forward方法輸出資料,每次forward代表輸出一條記錄。對應SQL語句UDTF的as子句指定的列。
UDTF 的程式樣本,如下所示:
package org.alidata.odps.udtf.examples;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.udf.UDFException;
// TODO define input and output types, e.g., "string,string->string,bigint".
   @Resolve("string,bigint->string,bigint")
   public class MyUDTF extends UDTF {
     @Override
     public void process(Object[] args) throws UDFException {
       String a = (String) args[0];
       Long b = (Long) args[1];
       for (String t: a.split("\\s+")) {
         forward(t, b);
       }
     }
   }
说明 以上只是程式樣本,關於如何在 MaxCompute 中運行 UDTF 的方法與 UDF 類似,詳情請參見:運行 UDF
在 SQL 中可以這樣使用這個 UDTF,假設在 MaxCompute 上建立 UDTF 時註冊函數名為 user_udtf:
select user_udtf(col0, col1) as (c0, c1) from my_table;
假設 my_table 的 col0,col1 的值如下所示:
+------+------+
| col0 | col1 |
+------+------+
| A B | 1 |
| C D | 2 |
+------+------+
則 select 出的結果,如下所示:
+----+----+
| c0 | c1 |
+----+----+
| A  | 1  |
| B  | 1  |
| C  | 2  |
| D  | 2  |
+----+----+

使用說明

UDTF 在 SQL 中的常用方式如下:
select user_udtf(col0, col1, col2) as (c0, c1) from my_table; 
select user_udtf(col0, col1, col2) as (c0, c1) from (select * from my_table distribute by key sort by key) t;
select reduce_udtf(col0, col1, col2) as (c0, c1) from (select col0, col1, col2 from (select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1 distribute by col0 sort by col0, col1) t2;
但使用 UDTF 有如下使用限制:
  • 同一個 SELECT 子句中不允許有其他運算式。
    select value, user_udtf(key) as mycol ...
  • UDTF 不能嵌套使用。
    select user_udtf1(user_udtf2(key)) as mycol...
  • 不支援在同一個 select 子句中與 group by / distribute by / sort by 聯用。
    select user_udtf(key) as mycol ... group by mycol

其他 UDTF 樣本

在 UDTF 中,您可以讀取 MaxCompute 的 資源。利用 UDTF 讀取 MaxCompute 資源的樣本,如下所示:
  1. 編寫 UDTF 程式,編譯成功後匯出 jar 包(udtfexample1.jar)。
    package com.aliyun.odps.examples.udf;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.util.Iterator;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.UDFException;
    import com.aliyun.odps.udf.UDTF;
    import com.aliyun.odps.udf.annotation.Resolve;
    /**
     * project: example_project 
     * table: wc_in2 
     * partitions: p2=1,p1=2 
     * columns: colc,colb
     */
    @Resolve("string,string->string,bigint,string")
    public class UDTFResource extends UDTF {
      ExecutionContext ctx;
      long fileResourceLineCount;
      long tableResource1RecordCount;
      long tableResource2RecordCount;
      @Override
      public void setup(ExecutionContext ctx) throws UDFException {
      this.ctx = ctx;
      try {
       InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
       BufferedReader br = new BufferedReader(new InputStreamReader(in));
       String line;
       fileResourceLineCount = 0;
       while ((line = br.readLine()) != null) {
         fileResourceLineCount++;
       }
       br.close();
       Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
       tableResource1RecordCount = 0;
       while (iterator.hasNext()) {
         tableResource1RecordCount++;
         iterator.next();
       }
       iterator = ctx.readResourceTable("table_resource2").iterator();
       tableResource2RecordCount = 0;
       while (iterator.hasNext()) {
         tableResource2RecordCount++;
         iterator.next();
       }
     } catch (IOException e) {
       throw new UDFException(e);
     }
    }
       @Override
       public void process(Object[] args) throws UDFException {
         String a = (String) args[0];
         long b = args[1] == null ? 0 : ((String) args[1]).length();
         forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
         + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
        }
    }
  2. 添加資源到 MaxCompute。
    Add file file_resource.txt;
    Add jar udtfexample1.jar;
    Add table table_resource1 as table_resource1;
    Add table table_resource2 as table_resource2;
  3. 在 MaxCompute 中建立 UDTF 函數(my_udtf)。
    create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 
    'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
  4. 在 MaxCompute 建立資源表 table_resource1、table_resource2 和物理表 tmp1,並插入相應的資料。
  5. 運行該 UDTF。
    select mp_udtf("10","20") as (a, b, fileResourceLineCount) from tmp1;  
    返回:
    +-------+------------+-------+
    | a | b      | fileResourceLineCount |
    +-------+------------+-------+
    | 10    | 2          | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
    | 10    | 2          | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
    +-------+------------+-------+

複雜資料類型樣本

如以下代碼,定義了一個有三個 overloads 的 UDF,其中第一個用了 array 作為參數,第二個用了 map 作為參數,第三個用了 struct。由於第三個 overloads 用了 struct 作為參數或者傳回值,因此要求必須要對 UDF class 打上 @Resolve annotation,來指定 struct 的具體類型。
@Resolve("struct<a:bigint>,string->string")
public class UdfArray extends UDF {
  public String evaluate(List<String> vals, Long len) {
    return vals.get(len.intValue());
  }
  public String evaluate(Map<String,String> map, String key) {
    return map.get(key);
  }
  public String evaluate(Struct struct, String key) {
    return struct.getFieldValue("a") + key;
  }
}
您可以直接將複雜類型傳入 UDF 中,如下所示:
create function my_index as 'UdfArray' using 'myjar.jar';
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;

HIVE UDF相容樣本

MaxCompute 2.0支援了Hive風格的UDF,有部分的HIVE UDF、UDTF可以直接在MaxCompute上使用。

注意 目前支援相容的Hive版本為2.1.0; 對應Hadoop版本為2.7.2。UDF如果是在其他版本的Hive/Hadoop開發的,可能需要使用此Hive/Hadoop版本重新編譯。
樣本如下:
package com.aliyun.odps.compiler.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class Collect extends GenericUDF {
  @Override
  public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
    if (objectInspectors.length == 0) {
      throw new UDFArgumentException("Collect: input args should >= 1");
    }
    for (int i = 1; i < objectInspectors.length; i++) {
      if (objectInspectors[i] != objectInspectors[0]) {
        throw new UDFArgumentException("Collect: input oi should be the same for all args");
      }
    }
    return ObjectInspectorFactory.getStandardListObjectInspector(objectInspectors[0]);
  }
  @Override
  public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
    List<Object> objectList = new ArrayList<>(deferredObjects.length);
    for (DeferredObject deferredObject : deferredObjects) {
      objectList.add(deferredObject.get());
    }
    return objectList;
  }
  @Override
  public String getDisplayString(String[] strings) {
    return "Collect";
  }
}
該udf可以將任意類型、數量的參數打包成array輸出。假設輸出jar包名為 test.jar:
--添加資源
Add jar test.jar;
--建立function
CREATE FUNCTION hive_collect as 'com.aliyun.odps.compiler.hive.Collect' using 'test.jar';
--使用function
set odps.sql.hive.compatible=true;
select hive_collect(4y,5y,6y) from dual;
+------+
| _c0  |
+------+
| [4, 5, 6] |
+------+
说明 該udf可以支援所有的類型,包括array,map,struct等複雜類型。
使用相容hive的udf需要注意:
  • MaxCompute的add jar命令會永久地在project中建立一個resource,所以建立udf時需要指定jar包,無法自動將所有jar包加入classpath。
  • 在使用相容的HIVE UDF的時候,需要在sql前加set語句set odps.sql.hive.compatible=true;語句,set語句和sql語句一起提交執行。
  • 在使用相容的HIVE UDF時,還要注意MaxCompute的JAVA沙箱限制。