MaxCompute supports user-defined functions (UDFs), user-defined aggregate functions (UDAFs), and user-defined table-valued functions (UDTFs). This topic describes how to implement the three types of functions by using Java.
For more information about how to create a Java UDF, see (Optional) Develop Java UDFs.
Data types
Java UDFs in MaxCompute V2.0 support more data types in addition to BIGINT, STRING, DOUBLE, and BOOLEAN. These UDFs also support complex data types, such as ARRAY, MAP, and STRUCT, as well as Writable types.
- To use a new data type, Java UDFs specify their function signatures in the following
ways:
- A UDTF uses the
@Resolve
annotation, for example,@Resolve("smallint->varchar(10)")
. - A UDF uses the
evaluate
method. In this case, MaxCompute built-in function types and Java function types are one-to-one mapping. - A UDAF uses the
@Resolve
annotation. MaxCompute V2.0 supports the use of new data types in annotations, for example,@Resolve("smallint->varchar(10)")
.
- A UDTF uses the
- To use a complex data type, Java UDFs specify their function signatures in the following
ways:
- A UDTF uses the
@Resolve
annotation, for example,@Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")
. - A UDF uses the signature of the
evaluate
method to match input and output types. This involves the following mapping between MaxCompute and Java data types:- ARRAY in MaxCompute maps to
java.util.List
. - MAP in MaxCompute maps to
java.util.Map
. - STRUCT in MaxCompute maps to
com.aliyun.odps.data.Struct
.
- ARRAY in MaxCompute maps to
- A UDAF uses the
@Resolve
annotation. MaxCompute V2.0 allows you to use new data types in annotations, for example,@Resolve("smallint->varchar(10)")
.Note- You can use
type,*
to pass any number of parameters, for example,@resolve("string,*->array<string>")
. Note that you must add Subtype after ARRAY. - The field name and field type of
com.aliyun.odps.data.Struct
cannot be reflected. Therefore, you must use the@Resolve
annotation to obtain the field name and field type. In other words, to use STRUCT in a UDF, you must add the@Resolve
annotation to the UDF class. This annotation affects only the overloads of parameters or return values that containcom.aliyun.odps.data.Struct
. - Only one
@Resolve
annotation can be provided to the class. Therefore, only one overload with a STRUCT parameter or return value exists in a UDF.
- You can use
- A UDTF uses the
- The following table describes the mapping between MaxCompute and Java data types.
MaxCompute Type Java Type TINYINT java.lang.Byte SMALLINT java.lang.Short INT java.lang.Integer BIGINT java.lang.Long FLOAT java.lang.Float DOUBLE java.lang.Double DECIMAL java.math.BigDecimal BOOLEAN java.lang.Boolean STRING java.lang.String VARCHAR com.aliyun.odps.data.Varchar BINARY com.aliyun.odps.data.Binary DATETIME java.util.Date TIMESTAMP java.sql.Timestamp ARRAY java.util.List MAP java.util.Map STRUCT com.aliyun.odps.data.Struct Note- Make sure that the input and output parameters in a UDF are of a Java type. Otherwise, error ODPS-0130071 is returned.
- Java data types and the data types of returned values are objects. Java data types must start with an uppercase letter.
- The NULL value in SQL statements is represented by a NULL reference in Java. Java Primitive Type cannot have NULL values and therefore must not be used.
- The ARRAY type in the preceding table maps java.util.List.
- MaxCompute V2.0 allows you to use Writable types as parameters and return values when
you define Java UDFs. The following table describes the mapping between MaxCompute
data types and Java Writable types.
MaxCompute Type Java Writable Type TINYINT ByteWritable SMALLINT ShortWritable INT IntWritable BIGINT LongWritable FLOAT FloatWritable DOUBLE DoubleWritable DECIMAL BigDecimalWritable BOOLEAN BooleanWritable STRING Text VARCHAR VarcharWritable BINARY BytesWritable DATETIME DatetimeWritable TIMESTAMP TimestampWritable INTERVAL_YEAR_MONTH IntervalYearMonthWritable INTERVAL_DAY_TIME IntervalDayTimeWritable ARRAY N/A MAP N/A STRUCT N/A
UDF
com.aliyun.odps.udf.UDF
and use the evaluate
method of the class. The evaluate
method must be a non-static public method, and its parameter and return value are
used as the UDF signature in SQL statements.
You can implement multiple evaluate
methods in a UDF. To call a UDF, the framework matches the correct evaluate
method based on the parameter type called by the UDF.
- We recommend that you do not use the classes that have the same name but different
function logic in different JAR packages. For example, in
UDF(UDAF/UDTF): udf1, udf2
, the JAR package of udf1 isudf1.jar
and the JAR package of udf2 isudf2.jar
. Assume that the two JAR packages contain thecom.aliyun.UserFunction.class
class. If you use two UDFs in the same SQL statement, the system randomly loads the class contained in one of the two JAR packages. This may cause the UDFs to be executed in different ways or even cause a compilation failure. - If you execute an SQL statement that has two UDFs, the UDFs are not isolated from each other. This is because the UDFs share the same classpath. If the resources referenced by the UDFs contain the same class, the class that the classloader attempts to load is uncertain. To avoid this issue, make sure that the resources referenced by the two UDFs do not contain the same class.
You can use void setup(ExecutionContext ctx)
to initialize a UDF and use void close()
to terminate a UDF.
The method of using UDFs is the same as that of using built-in functions of MaxCompute SQL. For more information, see Mathematical functions.
package org.alidata.odps.udf.examples;
import com.aliyun.odps.udf.UDF;
public final class Lower extends UDF {
public String evaluate(String s) {
if (s == null) {
return null;
}
return s.toLowerCase();
}
}
package com.aliyun.odps.udf.example;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.udf.UDF;
public class MyConcat extends UDF {
private Text ret = new Text();
public Text evaluate(Text a, Text b) {
if (a == null || b == null) {
return null;
}
ret.clear();
ret.append(a.getBytes(), 0, a.getLength());
ret.append(b.getBytes(), 0, b.getLength());
return ret;
}
}
The com.aliyun.odps.io
package contains all Writable types. To use Writable types, you can download the
odps-sdk-commons
package from the API document website.
Package | Description |
---|---|
odps-sdk-core | Provides basic features of MaxCompute, for example, the features that allow you to manage tables and projects. You can also use this package to access the Tunnel service. |
odps-sdk-commons | Provides Util packages. |
odps-sdk-udf | Provides main interfaces of UDFs. |
odps-sdk-mapred | Provides MapReduce features. |
odps-sdk-graph | Provides Graph SDK for Java. You can obtain the SDK by searching for odps-sdk-graph. |
UDAF
com.aliyun.odps.udf.Aggregator
class and implement the following methods: import com.aliyun.odps.udf.ContextFunction;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
public abstract class Aggregator implements ContextFunction {
@Override
public void setup(ExecutionContext ctx) throws UDFException {
}
@Override
public void close() throws UDFException {
}
/**
* Create an aggregation buffer * @return Writable aggregation buffer
*/
abstract public Writable newBuffer();
/**
* @param buffer: aggregation buffer * @param args: a parameter specified to call a UDAF in SQL. It cannot be NULL, but the values in args can be NULL, which indicates that the input data is NULL * @throws UDFException.
*/
abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
/**
* Generate the final result * @param buffer * @return Final result of Object UDAF * @throws UDFException.
*/
abstract public Writable terminate(Writable buffer) throws UDFException;
abstract public void merge(Writable buffer, Writable partial) throws UDFException;
}
The most important methods are iterate
, merge
, and terminate
because the main logic of UDAFs relies on these methods. In addition, you must implement
the user-defined Writable buffer.
average value
by using a MaxCompute UDAF.
- Step 1: Each worker counts the data quantity and total sum in a slice. You can consider the data quantity and total sum in each slice as an intermediate result.
- Step 2: Each worker gathers the information about each slice generated in Step 1.
In the final output,
r.sum/r.count
is the average value of all input data.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve("double->double")
public class AggrAvg extends Aggregator {
private static class AvgBuffer implements Writable {
private double sum = 0;
private long count = 0;
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(sum);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readDouble();
count = in.readLong();
}
}
private DoubleWritable ret = new DoubleWritable();
@Override
public Writable newBuffer() {
return new AvgBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
DoubleWritable arg = (DoubleWritable) args[0];
AvgBuffer buf = (AvgBuffer) buffer;
if (arg ! = null) {
buf.count += 1;
buf.sum += arg.get();
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
if (buf.count == 0) {
ret.set(0);
} else {
ret.set(buf.sum / buf.count);
}
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
AvgBuffer p = (AvgBuffer) partial;
buf.sum += p.sum;
buf.count += p.count;
}
}
package com.aliyun.odps.udf.example;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.udf.UDF;
public class MyConcat extends UDF {
private Text ret = new Text();
public Text evaluate(Text a, Text b) {
if (a == null || b == null) {
return null;
}
ret.clear();
ret.append(a.getBytes(), 0, a.getLength());
ret.append(b.getBytes(), 0, b.getLength());
return ret;
}
}
Writable[] writables
: indicates a row of data. In the code, it indicates the passed column. For example,writables[0]
indicates the first column, andwritables[1]
indicates the second column.iterate(Writable writable,Writable[] writables)
method:writable
indicates the aggregated data in a phase. In different Map tasks, each row of the data (also regarded as a set) obtained by usinggroup by
is executed once.merge()
method: aggregates the results obtained from different Map tasks.terminate()
method: returns data.newBuffer()
method: creates the initial value to be returned.- In the
readFields
method of Writable, thereadFields
method of the same object is called multiple times because thewritable
objects of partial are reused. This method resets the entire object each time it is called. If an object containsCollection
, it is cleared. - UDAFs are similar to aggregate functions in MaxCompute SQL. For more information, see Aggregate functions.
- The method for running UDTFs is similar to that for running UDFs. For more information, see (Optional) Develop Java UDFs.
- The Writable type that corresponds to the STRING type is Text.
UDTF
com.aliyun.odps.udf.UDTF
class and implement four methods. The following table describes the methods.
Interface | Description |
---|---|
public void setup(ExecutionContext ctx) throws UDFException | The initialization method to call the user-defined initialization behavior before
a UDTF processes the input data. setup is called once first for each worker.
|
public void process(Object[] args) throws UDFException | This method is called by the framework. Each SQL record calls process once. The parameters of process are the input parameters of the UDTF specified in the SQL statement. The input parameters
are passed in as Object[] , and the results are returned by using the forward function. You must call forward in the process function to determine the output data.
|
public void close() throws UDFException | The method for terminating a UDTF. The framework calls this method only once. The method is called after the last record is processed. |
public void forward(Object …o) throws UDFException | You can call the forward method to return data. One record is returned each time forward is called. The record corresponds to the column specified by the as clause in the SQL statement of the UDTF.
|
- The following sample code shows the implementation of a UDTF.
package org.alidata.odps.udtf.examples; import com.aliyun.odps.udf.UDTF; import com.aliyun.odps.udf.UDTFCollector; import com.aliyun.odps.udf.annotation.Resolve; import com.aliyun.odps.udf.UDFException; // TODO define input and output types, e.g., "string,string->string,bigint". @Resolve("string,bigint->string,bigint") public class MyUDTF extends UDTF { @Override public void process(Object[] args) throws UDFException { String a = (String) args[0]; Long b = (Long) args[1]; for (String t: a.split("\\s+")) { forward(t, b); } } }
Note The method for running UDTFs in MaxCompute is similar to that for running UDFs. For more information, see (Optional) Develop Java UDFs. - Assume that you create a UDTF in MaxCompute with the registered function name of
user_udtf
. Execute the following statement to use the UDTF:select user_udtf(col0, col1) as (c0, c1) from my_table;
The following example shows the values ofcol0
andcol1
inmy_table
.+------+------+ | col0 | col1 | +------+------+ | A B | 1 | | C D | 2 | +------+------+
The following example shows the execution result of theSELECT
statement.+----+----+ | c0 | c1 | +----+----+ | A | 1 | | B | 1 | | C | 2 | | D | 2 | +----+----+
Example of using a UDTF to read resources in MaxCompute
- Compile a UDTF program. After the compilation is successful, export the JAR package.
In this example, the JAR package is named udtfexample1.jar.
package com.aliyun.odps.examples.udf; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Iterator; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDFException; import com.aliyun.odps.udf.UDTF; import com.aliyun.odps.udf.annotation.Resolve; /** * project: example_project * table: wc_in2 * partitions: p2=1,p1=2 * columns: colc,colb */ @Resolve("string,string->string,bigint,string") public class UDTFResource extends UDTF { ExecutionContext ctx; long fileResourceLineCount; long tableResource1RecordCount; long tableResource2RecordCount; @Override public void setup(ExecutionContext ctx) throws UDFException { this.ctx = ctx; try { InputStream in = ctx.readResourceFileAsStream("file_resource.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line; fileResourceLineCount = 0; while ((line = br.readLine()) ! = null) { fileResourceLineCount++; } br.close(); Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator(); tableResource1RecordCount = 0; while (iterator.hasNext()) { tableResource1RecordCount++; iterator.next(); } iterator = ctx.readResourceTable("table_resource2").iterator(); tableResource2RecordCount = 0; while (iterator.hasNext()) { tableResource2RecordCount++; iterator.next(); } } catch (IOException e) { throw new UDFException(e); } } @Override public void process(Object[] args) throws UDFException { String a = (String) args[0]; long b = args[1] == null ? 0 : ((String) args[1]).length(); forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount); } }
- Add resources to MaxCompute.
Add file file_resource.txt; Add jar udtfexample1.jar; Add table table_resource1 as table_resource1; Add table table_resource2 as table_resource2;
- Create the
my_udtf
UDTF in MaxCompute.create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';
- Run this UDTF.
select mp_udtf("10","20") as (a, b, fileResourceLineCount) from tmp1; -- Return result: +-------+------------+-------+ | a | b | fileResourceLineCount | +-------+------------+-------+ | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 | | 10 | 2 | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 | +-------+------------+-------+
Example of using complex data types in a UDF
@Resolve
annotation must be added to the UDF class to specify the STRUCT type. import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve("struct,string->string")
public class UdfArray extends UDF {
public String evaluate(List vals, Long len) {
return vals.get(len.intValue());
}
public String evaluate(Map map, String key) {
return map.get(key);
}
public String evaluate(Struct struct, String key) {
return struct.getFieldValue("a") + key;
}
}
create function my_index as 'UdfArray' using 'myjar.jar';
select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from co
Example of compatibility with Hive UDFs
MaxCompute V2.0 supports Hive UDFs. Some Hive UDFs and UDTFs can be directly used in MaxCompute.
package com.aliyun.odps.compiler.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class Collect extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
if (objectInspectors.length == 0) {
throw new UDFArgumentException("Collect: input args should >= 1");
}
for (int i = 1; i < objectInspectors.length; i++) {
if (objectInspectors[i] ! = objectInspectors[0]) {
throw new UDFArgumentException("Collect: input oi should be the same for all args");
}
}
return ObjectInspectorFactory.getStandardListObjectInspector(objectInspectors[0]);
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
List<Object> objectList = new ArrayList<>(deferredObjects.length);
for (DeferredObject deferredObject : deferredObjects) {
objectList.add(deferredObject.get());
}
return objectList;
}
@Override
public String getDisplayString(String[] strings) {
return "Collect";
}
}
- The Hive UDF in the example supports all data types, such as ARRAY, MAP, and STRUCT.
- For more information about how to use Hive UDFs, see the following references:
test.jar
. -- Add a resource.
Add jar test.jar;
-- Create a Hive UDF.
CREATE FUNCTION hive_collect as 'com.aliyun.odps.compiler.hive.Collect' using 'test.jar';
-- Use the Hive UDF.
set odps.sql.hive.compatible=true;
select hive_collect(4y,5y,6y);
+------+
| _c0 |
+------+
| [4, 5, 6] |
+------+
- Specify the JAR package when you create a Hive UDF. MaxCompute cannot automatically
add all JAR packages to the classpath.
Note The
add jar
command in MaxCompute creates a permanent resource in the project. - Insert
set odps.sql.hive.compatible=true;
before an SQL statement, and commit and execute theSET
statement with the SQL statement. - Pay attention to the limits imposed by the Java sandbox on MaxCompute.