Based on the MaxCompute 2.0 computing engine, MaxCompute introduces a new interface: user defined join (UDJ) to the user defined function (UDF) framework. This interface allows you to handle multiple tables more flexibly based on user-defined methods. It also simplifies the operations performed in the underlying distributed system using MapReduce. This is a major improvement of MaxCompute in big data processing based on NewSQL.

Overview

MaxCompute provides multiple native Join methods, including INNER JOIN, RIGHT JOIN, OUTER JOIN, LEFT JOIN, FULL JOIN, SEMIJOIN , and ANTISEMIJOIN methods. You can use these native join methods in most scenarios. However, these methods do not support handling multiple tables.

In most cases, you can build your code framework using UDFs. However, the current UDF, UDTF, and UDAF frameworks only support handling one table. To perform user-defined operations for multiple tables, you have to use native join methods, UDFs, UDTFs, and complex SQL statements. In certain cases, when handling multiple tables, you even have to use custom MapReduce framework instead of SQL, in order to complete the required computing task.

In any situation, these operations require technological expertise and may cause the following issues:

  • For the computing platform, calling multiple join methods in SQL statements may cause a "black box," which is complex and difficult to execute with the least overheads.
  • Using MapReduce even make optimal execution of code becomes impossible. Most of the MapReduce code is written in Java. The execution of the MapReduce code is less efficient than the execution of MaxCompute code generated by the LLVM code generator at an optimized native runtime.

Examples

The following example describes how to use UDJ in MaxCompute. This example uses the payment table and the user_client_log table.

  • The payment (user_id string,time datetime,pay_info string) table stores the payment information of a user. Each payment record includes the user ID, payment time, and the payment details.
  • The user_client_log(user_id string,time datetime,content string) table stores the client log records for a user. Each record contains the user ID, operation time, and operation.

Requirement: For each record in the user_client_log table, locate the payment record that has the time closest to the operation time, and join and output the content of both records.

The sample data is as follows:

payment

user_id time pay_info
2656199 2018-02-13 22:30:00 gZhvdySOQb
8881237 2018-02-13 08:30:00 pYvotuLDIT
8881237 2018-02-13 10:32:00 KBuMzRpsko

user_client_log

user_id time content
8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn
8881237 2018-02-13 06:14:00 click OkTYNUHMqZzlDyL
8881237 2018-02-13 10:30:00 click OkTYNUHMqZzlDyL

The following is a record in the user_client_log table.

user_id time content
8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn

The following payment record has the time closest to the operation time in the preceding client log record.

user_id time pay_info
8881237 2018-02-13 08:30:00 pYvotuLDIT

These two records are merged as follows:

8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn, pay pYvotuLDIT

The merging result of the two tables is as follows:

user_id time content
8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn, pay pYvotuLDIT
8881237 2018-02-13 06:14:00 click OkTYNUHMqZzlDyL, pay pYvotuLDIT
8881237 2018-02-13 10:30:00 click OkTYNUHMqZzlDyL, pay KBuMzRpsko

Call native join operations

If you use standard join methods, you have to join these two tables based on the common user_id field. You must locate the payment record that has the closest time to the operation time in each client log record. The SQL statement may be written as follows:

SELECT
  p.user_id,
  p.time,
  merge(p.pay_info, u.content)
FROM
  payment p RIGHT OUTER JOIN user_client_log u
ON p.user_id = u.user_id and abs(p.time - u.time) = min(abs(p.time - u.time))

When you join two rows in the tables, you must calculate the minimum difference between the p.time and u.time under the same user_id. However, you cannot call the aggregate function in the join condition. Therefore, this task cannot be completed by calling the standard join method.

In a distributed system, the join method merges rows retrieved from two or more tables based on a field that is shared by these tables. If you use the join method in standard SQL, you only have a few options to handle the merged data. Therefore, a generic interface, such as UDJ, is required to handle the merged data in a customized manner and output the result. This interface may be integrated as a plug-in.

Use Java to write UDJ code

The following describes how to use UDJ to achieve a function that cannot be implemented by calling a native join method.

Prerequisites

Since UDJ is a new feature, a new SDK is required.

<dependency>
  <groupId>com.aliyun.odps</groupId>
  <artifactId>odps-sdk-udf</artifactId>
  <version>0.29.10-public</version>
  <scope>provided</scope>
</dependency>

The SDK contains a new abstract class UDJ. By extending this UDJ, you can implement all UDJ functions.

package com.aliyun.odps.udf.example.udj;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Yieldable;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.udf.DataAttributes;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDJ;
import com.aliyun.odps.udf.annotation.Resolve;
Import java. util. arraylist;
import java.util.Iterator;
/** For each record of the right table, find the nearest record of the left table and
 * merge two records.
 */
@Resolve("->string,bigint,string")
public class PayUserLogMergeJoin extends UDJ {
  private Record outputRecord;
  /** Will be called prior to the data processing phase. User could implement
   * use this method to do initialization work.
   */
  @Override
  public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
    //
    outputRecord = new ArrayRecord(new Column[]{
      new Column("user_id", OdpsType.STRING),
      new Column("time", OdpsType.BIGINT),
      new Column("content", OdpsType.STRING)
    });
  }
  /** Override this method to implement join logic.
   * @param key Current join key
   * @param left Group of records of the left table corresponding to the current key
   * @param right Group of records of the right table corresponding to the current key
   * @param output Used to output the result of UDJ
   */
  @Override
  public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
    outputRecord.setString(0, key.getString(0));
    if (! right.hasNext()) {
      // Empty the right group and do nothing.
      return;
    } else if (! left.hasNext()) {
      // Empty left group. Output all records of the right group without merge.
      while (iter.hasNext()) {
        Record logRecord = right.next();
        outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
        outputRecord.setString(2, logRecord.getString(1));
        output.yield(outputRecord);
      }
      return;
    }
    ArrayList<Record> pays = new ArrayList<>();
    // The left group of records will be iterated from the start to the end
    // for each record of the right group, but the iterator cannot be reset.
    // So we save every record of the left to an ArrayList.
    left.forEachRemaining(pay -> pays.add(pay.clone()));
    while (right.hasNext()) {
      Record log = right.next();
      long logTime = log.getDatetime(0).getTime();
      long minDelta = Long.MAX_VALUE;
      Record nearestPay = null;
      // Iterate through all records of the left, and find the pay record that has
      // the minimal difference in terms of time.
      for (Record pay: pays) {
        long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
        if (delta < minDelta) {
          minDelta = delta;
          nearestPay = pay;
        }
      }
      // Merge the log record with the nearest pay record and output the result.
      outputRecord.setBigint(1, log.getDatetime(0).getTime());
      outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
      output.yield(outputRecord);
    }
  }
  String mergeLog(String payInfo, String logContent) {
    return logContent + ", pay " + payInfo;
  }
  @Override
  public void close() {
  }
}
Note In this example, the NULL values in the entries are not processed. To simplify the data processing procedure for better presentation, this example assumes that no NULL values are contained in the tables.

Each time you call this join method of UDJ, records that match the same key in the two tables are returned. Therefore, UDJ searches all records in the payment table to locate the record with the time closest to each record the user_client_log table.

Assume that the user only has a few payment records. In this case, you can load the data in the payment table to the RAM. Typically, the RAM has enough space to store the payment data of a user generated in one day. What if this assumption is invalid? How can we resolve this issue? This issue will be discussed in topic Use the SORT BY clause

Create a UDJ in MaxCompute

After you have written the UDJ code in Java, upload the code to MaxCompute SQL as a plug-in. You must register the code with MaxCompute first. Assume that the code is packed into JAR package odps-udj-example.jar.

Use the Add JAR command to upload the JAR package to MaxCompute as follows:

add jar odps-udj-example.jar;

Use the CREATE FUNCTION statement to create UDJ function pay_user_log_merge_join, referencing the corresponding Java class and using the odps-udj-example.jar JAR package.

com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin:
create function pay_user_log_merge_join
  as 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin'
  using 'odps-udj-example.jar';

Use UDJ in MaxCompute SQL

After you have registered UDJ in the database, the function can be used by MaxCompute SQL.

  1. Create a sample source table
    create table payment (user_id string,time datetime,pay_info string);
    create table user_client_log(user_id string,time datetime,content string);
  2. Create sample data:
    Create the data in the payment table
    INSERT OVERWRITE TABLE payment VALUES
    ('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'),
    ('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'),
    ('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'),
    ('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'),
    ('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'),
    ('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'),
    ('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'),
    ('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa')
    ;
    --Create data in the user_client_log table
    INSERT OVERWRITE TABLE user_client_log VALUES
    ('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'),
    ('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'),
    ('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'),
    ('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'),
    ('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'),
    ('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'),
    ('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'),
    ('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'),
    ('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'),
    ('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'),
    ('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'),
    ('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'),
    ('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'),
    ('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'),
    ('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'),
    ('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'),
    ('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ')
    ;
  3. In MaxCompute SQL, use the UDJ function you have created:
    SELECT r.user_id, from_unixtime(time/1000) as time, content FROM (
    SELECT user_id, time as time, pay_info FROM payment
    ) p JOIN (
    SELECT user_id, time as time, content FROM user_client_log
    ) u
    ON p.user_id = u.user_id
    USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
    r
    AS (user_id, time, content)
    ;

    The syntax of UDJ is similar to the standard join syntax. The only difference is that the USING clause is added to UDJ.

    • The name of the UDJ function in SQL is pay_user_log_merge_join.
    • (p.time, p.pay_info, u.time, u.content) are the columns used in these two tables.
    • r is the alias of the result returned by the UDJ function. You can reference this alias in other SQL statements.
    • (user_id, time, content) are the columns returned by the UDJ function.

    Execute this SQL statement, and the result is as follows:

    +---------+------------+---------+
    | user_id | time       | content |
    +---------+------------+---------+
    | 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB |
    | 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ |
    | 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn |
    | 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn |
    | 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT |
    | 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT |
    | 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn |
    | 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn |
    | 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb |
    | 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb |
    | 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier |
    | 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier |
    | 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT |
    | 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT |
    | 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko |
    | 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb |
    | 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa |
    +---------+------------+---------+

    As shown in the preceding code, the task that could not be performed by calling native join methods has been completed by using UDJ.

Pre-sorting

To locate the matching payment record, an iterator is used to search all records in the payment table. To perform this task, you must load all payment records with the same user_id to an ArrayList. This method can be applied when the number of payment records is small. If a large number of payment records has been generated, due to RAM size limits, you must find another method to load the data. This section describes how to address this issue using the SORT BY clause.

When the size of the payment data is too large to be stored in the RAM, it would be easier to address this issue if all data in the table has already been sorted by time. You then only need to compare the first element in these two lists.

UDJ in Java:

@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
  outputRecord.setString(0, key.getString(0));
  if (! right.hasNext()) {
    return;
  } else if (! left.hasNext()) {
    while (right.hasNext()) {
      Record logRecord = right.next();
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      outputRecord.setString(2, logRecord.getString(1));
      output.yield(outputRecord);
    }
    return;
  }
  long prevDelta = Long.MAX_VALUE;
  Record logRecord = right.next();
  Record payRecord = left.next();
  Record lastPayRecord = payRecord.clone();
  while (true) {
    long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();
    if (left.hasNext() && delta > 0) {
      // The delta of time between two records is decreasing, we can still
      // explore the left group to try to gain a smaller delta.
      lastPayRecord = payRecord.clone();
      prevDelta = delta;
      payRecord = left.next();
    } else {
      // Hit to the point of minimal delta. Check with the last pay record,
      // output the merge result and prepare to process the next record of
      // right group.
      Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      String mergedString = mergeLog(nearestPay.getString(1), logRecord.getString(1));
      outputRecord.setString(2, mergedString);
      output.yield(outputRecord);
      if (right.hasNext()) {
        logRecord = right.next();
        prevDelta = Math.abs(
          logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime()
        );
      } else {
        break;
      }
    }
  }
}

In the native SQL language, you only need to make a few modifications to this example and add a SORT BY clause to the end of the UDJ clause, and then sort the data in both tables by time. Note: After you have modified the UDJ code, you must update the corresponding JAR package.

SELECT r.user_id, from_unixtime(time/1000) as time, content FROM (
  SELECT user_id, time as time, pay_info FROM payment
) p JOIN (
  SELECT user_id, time as time, content FROM user_client_log
) u
ON p.user_id = u.user_id
USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
r
AS (user_id, time, content)
SORT BY p.time, u.time
;

The execution result is the same as the result before the code is modified.

This method uses the SORT BY clause to pre-sort the data. To achieve the same result, only a maximum of three records need to be cached.

UDJ performance

Without UDJ, you have to use MapReduce to handle complex cross-table computing tasks in a distributed system. The applicable scenarios include complex business scenarios such as advertising and information search.

The following example uses an online MapReduce job to test the UDJ performance. This MapReduce job uses a complex algorithm to join two tables. This example uses UDJ to rewrite the SQL statements of the MapReduce job and then check whether the execution result is correct. Under the same programming concurrency, the comparison of performance is as follows:



As shown in the figure, using UDJ helps to describe the complex logic of handling multiple tables, and greatly improves the query performance. The code is only executed inside the UDJ function, and the entire logic of the code, such as the logic of the map stage in this example, is executed by the high-performance MaxCompute native runtime. UDJ optimizes the MaxCompute runtime engine and the data exchange between interfaces. The join logic of UDJ is more efficient than that at the reduce stage in MapReduce.