MaxCompute introduces user defined join (UDJ) to the user defined function (UDF) framework based on the MaxCompute V2.0 compute engine. UDJ allows more customized operations by flexibly joining tables and simplifies MapReduce-based operations in the underlying distributed system.

Overview

MaxCompute provides multiple JOIN methods, including INNER JOIN, RIGHT JOIN, OUTER JOIN, LEFT JOIN, FULL JOIN, SEMI JOIN, and ANTI-SEMI JOIN. You can use these built-in JOIN methods in most scenarios. However, these methods are insufficient if you want to perform operations on multiple tables.

In most cases, you can build your code framework by using UDFs. However, the current UDF, user-defined table-valued function (UDTF), and user-defined aggregation function (UDAF) frameworks can only handle one table at a time. If you want to perform user-defined operations on multiple tables, you must use built-in JOIN methods, UDFs, UDTFs, and complex SQL statements. In such scenarios, you have to use a custom MapReduce framework instead of SQL to complete the required computing tasks.

Regardless of the scenario, these operations require technological expertise and may cause the following issues:
  • In scenarios where you use built-in JOIN methods, UDFs, UDTFs, and complex SQL statements: The use of multiple JOIN methods and code in SQL statements results in a logical black box, which causes difficulties in generating an optimal execution plan.
  • In scenarios where you use a custom MapReduce framework: Execution plans are hard to optimize. Most of the MapReduce code is written in Java. During the deep optimization of native runtime code, the execution of the MapReduce code is less efficient than the execution of the MaxCompute code generated by the LLVM code generator.

UDJ performance

The following example uses a real online MapReduce job to test UDJ performance. This job uses a complex algorithm to join two tables. This example uses UDJ to rewrite the SQL statements of the job and then verify the execution result. Under the same data concurrency, the comparison between MapReduce and UDJ performance is as follows:

As shown in the preceding figure, UDJ helps describe the complex logic of handling multiple tables and greatly improves the execution performance. The code is only executed inside UDJ. The entire logic of the code, such as the logic of mapper in this example, is executed based on the highly efficient native runtime of MaxCompute. UDJ optimizes the data exchange logic between the MaxCompute runtime engine and Java interfaces. The JOIN logic of UDJ is more efficient than that of reducer.

Examples

The following example describes how to use UDJ in MaxCompute.

Assume that two tables named payment and user_client_log exist.
  • The payment table stores the payment records of users. Each payment record contains the user ID, payment time, and payment content. The following table lists the sample data.
    user_id time pay_info
    2656199 2018-02-13 22:30:00 gZhvdySOQb
    8881237 2018-02-13 08:30:00 pYvotuLDIT
    8881237 2018-02-13 10:32:00 KBuMzRpsko
  • The user_client_log table stores the client logs of users. Each log contains the user ID, log time, and log content. The following table lists the sample data.
    user_id time content
    8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn
    8881237 2018-02-13 06:14:00 click OkTYNUHMqZzlDyL
    8881237 2018-02-13 10:30:00 click OkTYNUHMqZzlDyL
Requirement: For a log record in the user_client_log table, find the payment record that has the closest time to this log record in the payment table. Then, join the two records and generate the result. The following table lists the output result.
user_id time content
8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn, pay pYvotuLDIT
8881237 2018-02-13 06:14:00 click OkTYNUHMqZzlDyL, pay pYvotuLDIT
8881237 2018-02-13 10:30:00 click OkTYNUHMqZzlDyL, pay KBuMzRpsko
You can use one of the following methods to meet the preceding requirement:
  • Use built-in JOIN methods. The SQL pseudocode is as follows:
    SELECT
      p.user_id,
      p.time,
      merge(p.pay_info, u.content)
    FROM
      payment p RIGHT OUTER JOIN user_client_log u
    ON p.user_id = u.user_id and abs(p.time - u.time) = min(abs(p.time - u.time))
    When you join two records in the tables, you must calculate the minimum difference between p.time and u.time under the same user_id. However, you cannot call aggregate functions in a JOIN condition. Therefore, you cannot use standard JOIN methods to complete this task.
  • Use UDJ.
    1. Register the UDJ function.
      1. Configure the SDK of the new version.
        <dependency>
          <groupId>com.aliyun.odps</groupId>
          <artifactId>odps-sdk-udf</artifactId>
          <version>0.29.10-public</version>
          <scope>provided</scope>
        </dependency>
      2. Write the UDJ code and package the code as odps-udj-example.jar.
        package com.aliyun.odps.udf.example.udj;
        import com.aliyun.odps.Column;
        import com.aliyun.odps.OdpsType;
        import com.aliyun.odps.Yieldable;
        import com.aliyun.odps.data.ArrayRecord;
        import com.aliyun.odps.data.Record;
        import com.aliyun.odps.udf.DataAttributes;
        import com.aliyun.odps.udf.ExecutionContext;
        import com.aliyun.odps.udf.UDJ;
        import com.aliyun.odps.udf.annotation.Resolve;
        import java.util.ArrayList;
        import java.util.Iterator;
        /** For each record in the table on the right, find the record that has the closest time to this record.
         * Merge two records.
         */
        @Resolve("->string,bigint,string")
        public class PayUserLogMergeJoin extends UDJ {
          private Record outputRecord;
          /** Call the preceding code before data is processed. You can use the PayUserLogMergeJoin method for initialization.
           */
          @Override
          public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
            //
            outputRecord = new ArrayRecord(new Column[]{
              new Column("user_id", OdpsType.STRING),
              new Column("time", OdpsType.BIGINT),
              new Column("content", OdpsType.STRING)
            });
          }
          /** Rewrite the PayUserLogMergeJoin method to implement the connection logic.
           * @param key: the current key for joining.
           * @param left: the record group of the current key in the table on the left.
           * @param right: the record group of the current key in the table on the right.
           * @param output: generates the output of UDJ.
           */
          @Override
          public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
            outputRecord.setString(0, key.getString(0));
            if (! right.hasNext()) {
              // The group on the right is empty. Do not perform operations on this group.
              return;
            } else if (! left.hasNext()) {
              // The group on the left is empty. Generate all records from the group on the right but do not merge the records of the two tables.
              while (right.hasNext()) {
                Record logRecord = right.next();
                outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
                outputRecord.setString(2, logRecord.getString(1));
                output.yield(outputRecord);
              }
              return;
            }
            ArrayList<Record> pays = new ArrayList<>();
            // The group on the left iterates from start to end.
            // The iterator cannot reset the records of the group on the right.
            // Therefore, save each record of the group on the left to ArrayList.
            left.forEachRemaining(pay -> pays.add(pay.clone()));
            while (right.hasNext()) {
              Record log = right.next();
              long logTime = log.getDatetime(0).getTime();
              long minDelta = Long.MAX_VALUE;
              Record nearestPay = null;
              // Iterate all the records of the group on the left. Then, you can find the minimum time delta between the records.
              for (Record pay: pays) {
                long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
                if (delta < minDelta) {
                  minDelta = delta;
                  nearestPay = pay;
                }
              }
              // Merge the log records and payment records that are closest in time and then generate results.
              outputRecord.setBigint(1, log.getDatetime(0).getTime());
              outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
              output.yield(outputRecord);
            }
          }
          String mergeLog(String payInfo, String logContent) {
            return logContent + ", pay " + payInfo;
          }
          @Override
          public void close() {
          }
        }
      3. Add JAR package resources to MaxCompute.
        add jar odps-udj-example.jar;
      4. Register the UDJ function pay_user_log_merge_join in MaxCompute.
        create function pay_user_log_merge_join
          as 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin'
          using 'odps-udj-example.jar';
    2. Prepare sample data.
      1. This example creates the payment table and the user_client_log table.
        create table payment(user_id string,time datetime,pay_info string);
        create table user_client_log(user_id string,time datetime,content string);
      2. Insert data into the sample tables.
        -- Insert data into the payment table.
        INSERT OVERWRITE TABLE payment VALUES
        ('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'),
        ('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'),
        ('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'),
        ('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'),
        ('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'),
        ('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'),
        ('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'),
        ('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa')
        ;
        -- Insert data into the user_client_log table.
        INSERT OVERWRITE TABLE user_client_log VALUES
        ('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'),
        ('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'),
        ('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'),
        ('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'),
        ('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'),
        ('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'),
        ('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'),
        ('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'),
        ('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'),
        ('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'),
        ('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'),
        ('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'),
        ('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'),
        ('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'),
        ('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'),
        ('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'),
        ('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ')
        ;
    3. Use a UDJ function in SQL.
      SELECT r.user_id, from_unixtime(time/1000) as time, content FROM (
      SELECT user_id, time as time, pay_info FROM payment
      ) p JOIN (
      SELECT user_id, time as time, content FROM user_client_log
      ) u
      ON p.user_id = u.user_id
      USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
      r
      AS (user_id, time, content);
      The parameters in the USING clause are described as follows:
      • pay_user_log_merge_join: the name of the UDJ function in SQL.
      • (p.time, p.pay_info, u.time, u.content): the columns used in UDJ.
      • r: the alias of the result returned by the UDJ function. You can reference this alias in other SQL statements.
      • (user_id, time, content): the columns returned by the UDJ function.
      The statement output is as follows:
      +---------+------------+---------+
      | user_id | time       | content |
      +---------+------------+---------+
      | 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB |
      | 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ |
      | 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn |
      | 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT |
      | 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT |
      | 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb |
      | 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb |
      | 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier |
      | 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier |
      | 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT |
      | 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT |
      | 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko |
      | 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb |
      | 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa |
      +---------+------------+---------+

Pre-sorting of UDJ

If you want to find the payment record that has the closest time to a specific record with the same user_id, use an iterator to traverse all the records in the payment table. To perform this task, you must load all payment records with the same user_id to an ArrayList. This method can be used if the number of payment records is small. If a large number of payment records are generated, you must find another method to load the data due to the limits on memory size. To address this issue, you can use the SORT BY clause.

If the number of payment records is too large to be stored in the memory and all the data in the table has been sorted by time, you only need to compare the first element in these two lists.

This method uses the SORT BY clause to pre-sort the data. A maximum of only three records can be cached to achieve this result. UDJ in Java:
@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
  outputRecord.setString(0, key.getString(0));
  if (! right.hasNext()) {
    return;
  } else if (! left.hasNext()) {
    while (right.hasNext()) {
      Record logRecord = right.next();
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      outputRecord.setString(2, logRecord.getString(1));
      output.yield(outputRecord);
    }
    return;
  }
  long prevDelta = Long.MAX_VALUE;
  Record logRecord = right.next();
  Record payRecord = left.next();
  Record lastPayRecord = payRecord.clone();
  while (true) {
    long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();
    if (left.hasNext() && delta > 0) {
      // The time delta between the two records decreases and the operation can continue.
      // Explore the group on the left to try to obtain a smaller time delta.
      lastPayRecord = payRecord.clone();
      prevDelta = delta;
      payRecord = left.next();
    } else {
     // The minimum time delta point is reached. Check the final salary record in the payment table.
     // Generate the merged result and prepare to process the next record.
     // The group on the right.
      Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      String mergedString = mergeLog(nearestPay.getString(1), logRecord.getString(1));
      outputRecord.setString(2, mergedString);
      output.yield(outputRecord);
      if (right.hasNext()) {
        logRecord = right.next();
        prevDelta = Math.abs(
          logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime()
        );
      } else {
        break;
      }
    }
  }
}