MaxCompute introduces user defined join (UDJ) to the user defined function (UDF) framework based on the MaxCompute V2.0 compute engine. UDJ allows more customized operations by flexibly joining tables and simplifies MapReduce-based operations in the underlying distributed system.
Overview
MaxCompute provides multiple JOIN methods, including INNER JOIN, RIGHT JOIN, OUTER JOIN, LEFT JOIN, FULL JOIN, SEMI JOIN, and ANTI-SEMI JOIN. You can use these built-in JOIN methods in most scenarios. However, these methods are insufficient if you want to perform operations on multiple tables.
In most cases, you can build your code framework by using UDFs. However, the current UDF, user-defined table-valued function (UDTF), and user-defined aggregation function (UDAF) frameworks can only handle one table at a time. If you want to perform user-defined operations on multiple tables, you must use built-in JOIN methods, UDFs, UDTFs, and complex SQL statements. In such scenarios, you have to use a custom MapReduce framework instead of SQL to complete the required computing tasks.
- In scenarios where you use built-in JOIN methods, UDFs, UDTFs, and complex SQL statements: The use of multiple JOIN methods and code in SQL statements results in a logical black box, which causes difficulties in generating an optimal execution plan.
- In scenarios where you use a custom MapReduce framework: Execution plans are hard to optimize. Most of the MapReduce code is written in Java. During the deep optimization of native runtime code, the execution of the MapReduce code is less efficient than the execution of the MaxCompute code generated by the LLVM code generator.
UDJ performance

As shown in the preceding figure, UDJ helps describe the complex logic of handling multiple tables and greatly improves the execution performance. The code is only executed inside UDJ. The entire logic of the code, such as the logic of mapper in this example, is executed based on the highly efficient native runtime of MaxCompute. UDJ optimizes the data exchange logic between the MaxCompute runtime engine and Java interfaces. The JOIN logic of UDJ is more efficient than that of reducer.
Examples
The following example describes how to use UDJ in MaxCompute.
- The payment table stores the payment records of users. Each payment record contains
the user ID, payment time, and payment content. The following table lists the sample
data.
user_id time pay_info 2656199 2018-02-13 22:30:00 gZhvdySOQb 8881237 2018-02-13 08:30:00 pYvotuLDIT 8881237 2018-02-13 10:32:00 KBuMzRpsko - The user_client_log table stores the client logs of users. Each log contains the user
ID, log time, and log content. The following table lists the sample data.
user_id time content 8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn 8881237 2018-02-13 06:14:00 click OkTYNUHMqZzlDyL 8881237 2018-02-13 10:30:00 click OkTYNUHMqZzlDyL
user_id | time | content |
---|---|---|
8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT |
8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT |
8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko |
- Use built-in JOIN methods. The SQL pseudocode is as follows:
When you join two records in the tables, you must calculate the minimum difference between p.time and u.time under the same user_id. However, you cannot call aggregate functions in a JOIN condition. Therefore, you cannot use standard JOIN methods to complete this task.SELECT p.user_id, p.time, merge(p.pay_info, u.content) FROM payment p RIGHT OUTER JOIN user_client_log u ON p.user_id = u.user_id and abs(p.time - u.time) = min(abs(p.time - u.time))
- Use UDJ.
- Register the UDJ function.
- Configure the SDK of the new version.
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-udf</artifactId> <version>0.29.10-public</version> <scope>provided</scope> </dependency>
- Write the UDJ code and package the code as odps-udj-example.jar.
package com.aliyun.odps.udf.example.udj; import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; import com.aliyun.odps.Yieldable; import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.Record; import com.aliyun.odps.udf.DataAttributes; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDJ; import com.aliyun.odps.udf.annotation.Resolve; import java.util.ArrayList; import java.util.Iterator; /** For each record in the table on the right, find the record that has the closest time to this record. * Merge two records. */ @Resolve("->string,bigint,string") public class PayUserLogMergeJoin extends UDJ { private Record outputRecord; /** Call the preceding code before data is processed. You can use the PayUserLogMergeJoin method for initialization. */ @Override public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) { // outputRecord = new ArrayRecord(new Column[]{ new Column("user_id", OdpsType.STRING), new Column("time", OdpsType.BIGINT), new Column("content", OdpsType.STRING) }); } /** Rewrite the PayUserLogMergeJoin method to implement the connection logic. * @param key: the current key for joining. * @param left: the record group of the current key in the table on the left. * @param right: the record group of the current key in the table on the right. * @param output: generates the output of UDJ. */ @Override public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) { outputRecord.setString(0, key.getString(0)); if (! right.hasNext()) { // The group on the right is empty. Do not perform operations on this group. return; } else if (! left.hasNext()) { // The group on the left is empty. Generate all records from the group on the right but do not merge the records of the two tables. while (right.hasNext()) { Record logRecord = right.next(); outputRecord.setBigint(1, logRecord.getDatetime(0).getTime()); outputRecord.setString(2, logRecord.getString(1)); output.yield(outputRecord); } return; } ArrayList<Record> pays = new ArrayList<>(); // The group on the left iterates from start to end. // The iterator cannot reset the records of the group on the right. // Therefore, save each record of the group on the left to ArrayList. left.forEachRemaining(pay -> pays.add(pay.clone())); while (right.hasNext()) { Record log = right.next(); long logTime = log.getDatetime(0).getTime(); long minDelta = Long.MAX_VALUE; Record nearestPay = null; // Iterate all the records of the group on the left. Then, you can find the minimum time delta between the records. for (Record pay: pays) { long delta = Math.abs(logTime - pay.getDatetime(0).getTime()); if (delta < minDelta) { minDelta = delta; nearestPay = pay; } } // Merge the log records and payment records that are closest in time and then generate results. outputRecord.setBigint(1, log.getDatetime(0).getTime()); outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1))); output.yield(outputRecord); } } String mergeLog(String payInfo, String logContent) { return logContent + ", pay " + payInfo; } @Override public void close() { } }
- Add JAR package resources to MaxCompute.
add jar odps-udj-example.jar;
- Register the UDJ function pay_user_log_merge_join in MaxCompute.
create function pay_user_log_merge_join as 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin' using 'odps-udj-example.jar';
- Configure the SDK of the new version.
- Prepare sample data.
- This example creates the payment table and the user_client_log table.
create table payment(user_id string,time datetime,pay_info string); create table user_client_log(user_id string,time datetime,content string);
- Insert data into the sample tables.
-- Insert data into the payment table. INSERT OVERWRITE TABLE payment VALUES ('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'), ('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'), ('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'), ('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'), ('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'), ('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'), ('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'), ('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa') ; -- Insert data into the user_client_log table. INSERT OVERWRITE TABLE user_client_log VALUES ('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'), ('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'), ('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'), ('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'), ('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'), ('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'), ('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'), ('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'), ('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'), ('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'), ('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'), ('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'), ('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'), ('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'), ('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'), ('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'), ('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ') ;
- This example creates the payment table and the user_client_log table.
- Use a UDJ function in SQL.
The parameters in the USING clause are described as follows:SELECT r.user_id, from_unixtime(time/1000) as time, content FROM ( SELECT user_id, time as time, pay_info FROM payment ) p JOIN ( SELECT user_id, time as time, content FROM user_client_log ) u ON p.user_id = u.user_id USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content) r AS (user_id, time, content);
pay_user_log_merge_join
: the name of the UDJ function in SQL.(p.time, p.pay_info, u.time, u.content)
: the columns used in UDJ.r
: the alias of the result returned by the UDJ function. You can reference this alias in other SQL statements.(user_id, time, content)
: the columns returned by the UDJ function.
The statement output is as follows:+---------+------------+---------+ | user_id | time | content | +---------+------------+---------+ | 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB | | 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ | | 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn | | 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn | | 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT | | 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT | | 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn | | 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn | | 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb | | 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb | | 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier | | 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier | | 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT | | 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT | | 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko | | 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb | | 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa | +---------+------------+---------+
- Register the UDJ function.
Pre-sorting of UDJ
If you want to find the payment record that has the closest time to a specific record with the same user_id, use an iterator to traverse all the records in the payment table. To perform this task, you must load all payment records with the same user_id to an ArrayList. This method can be used if the number of payment records is small. If a large number of payment records are generated, you must find another method to load the data due to the limits on memory size. To address this issue, you can use the SORT BY clause.
If the number of payment records is too large to be stored in the memory and all the data in the table has been sorted by time, you only need to compare the first element in these two lists.
@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
outputRecord.setString(0, key.getString(0));
if (! right.hasNext()) {
return;
} else if (! left.hasNext()) {
while (right.hasNext()) {
Record logRecord = right.next();
outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
outputRecord.setString(2, logRecord.getString(1));
output.yield(outputRecord);
}
return;
}
long prevDelta = Long.MAX_VALUE;
Record logRecord = right.next();
Record payRecord = left.next();
Record lastPayRecord = payRecord.clone();
while (true) {
long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();
if (left.hasNext() && delta > 0) {
// The time delta between the two records decreases and the operation can continue.
// Explore the group on the left to try to obtain a smaller time delta.
lastPayRecord = payRecord.clone();
prevDelta = delta;
payRecord = left.next();
} else {
// The minimum time delta point is reached. Check the final salary record in the payment table.
// Generate the merged result and prepare to process the next record.
// The group on the right.
Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;
outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
String mergedString = mergeLog(nearestPay.getString(1), logRecord.getString(1));
outputRecord.setString(2, mergedString);
output.yield(outputRecord);
if (right.hasNext()) {
logRecord = right.next();
prevDelta = Math.abs(
logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime()
);
} else {
break;
}
}
}
}