MaxCompute introduces user defined join (UDJ) to the user-defined function (UDF) framework based on the MaxCompute V2.0 compute engine. UDJ allows more user-defined operations by flexibly joining tables and simplifies MapReduce-based operations in the underlying distributed system.

Background information

MaxCompute provides multiple JOIN methods, including INNER JOIN, RIGHT JOIN, OUTER JOIN, LEFT JOIN, FULL JOIN, SEMI JOIN, and ANTI-SEMI JOIN.JOIN You can use these built-in JOIN methods in most scenarios. However, these methods are insufficient if you want to perform cross join operations.

In most cases, you can use UDFs to describe your code framework. However, the current UDF, user-defined table-valued function (UDTF), and user-defined aggregate function (UDAF) frameworks can only handle one table at a time. To perform user-defined operations for multiple tables, you must use built-in JOIN methods, UDFs, UDTFs, and complex SQL statements. In such scenarios, you must use a custom MapReduce framework instead of SQL to complete the required computing tasks.

Regardless of the scenario, these operations require technological expertise and may cause the following issues:
  • In scenarios where you use built-in JOIN methods, UDFs, UDTFs, and complex SQL statements: The use of multiple JOIN methods and code in SQL statements results in a logical black box, which causes difficulties in generating an optimal execution plan.
  • In scenarios where you use a custom MapReduce framework: Execution plans are hard to optimize. Most of the MapReduce code is written in Java. During the deep optimization of native runtime code, the execution of the MapReduce code is less efficient than the execution of the MaxCompute code that is generated by the LLVM code generator.

UDJ performance

A real online MapReduce job is used as an example to verify the performance of UDJ. The job runs based on a complex algorithm. In this example, two tables are joined, UDJ is used to rewrite the MapReduce job, and the correctness of the UDJ results is checked. The following figure shows the MapReduce and UDJ performance under the same data concurrency.

As shown in the figure, UDJ conveniently describes the complex logic of how to handle multiple tables and greatly improves performance. The code is called only within UDJ. The entire Mapper logic in this example is executed by the native runtime engine of MaxCompute. The data exchange logic between the MaxCompute UDJ runtime engine and Java interfaces is optimized in Java code. The JOIN logic of UDJ is more efficient than that of a reducer.

Cross join operation by using UDJ

The following example describes how to use UDJ in MaxCompute.

Assume that two log tables named payment and user_client_log exist.
  • The payment table stores the payment records of users. Each payment record contains the user ID, payment time, and payment content. The following table lists the sample data.
    user_id time pay_info
    2656199 2018-02-13 22:30:00 gZhvdySOQb
    8881237 2018-02-13 08:30:00 pYvotuLDIT
    8881237 2018-02-13 10:32:00 KBuMzRpsko
  • The user_client_log table stores the client logs of users. Each log contains the user ID, logging time, and log content. The following table lists the sample data.
    user_id time content
    8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn
    8881237 2018-02-13 06:14:00 click OkTYNUHMqZzlDyL
    8881237 2018-02-13 10:30:00 click OkTYNUHMqZzlDyL
Requirement: For each record in the user_client_log table, find the payment record that has the closest time to this record in the payment table. Then, join the two records and generate results. The following table lists the results.
user_id time content
8881237 2018-02-13 00:30:00 click MpkvilgWSmhUuPn, pay pYvotuLDIT
8881237 2018-02-13 06:14:00 click OkTYNUHMqZzlDyL, pay pYvotuLDIT
8881237 2018-02-13 10:30:00 click OkTYNUHMqZzlDyL, pay KBuMzRpsko
To meet this requirement, use one of the following methods:
  • Use built-in JOIN methods. The SQL pseudocode is as follows:
    SELECT
      p.user_id,
      p.time,
      merge(p.pay_info, u.content)
    FROM
      payment p RIGHT OUTER JOIN user_client_log u
    ON p.user_id = u.user_id and abs(p.time - u.time) = min(abs(p.time - u.time))
    When you join two records in the tables, you must calculate the minimum difference between p.time and u.time that correspond to the same user_id. However, you cannot call aggregate functions in a JOIN condition. Therefore, you cannot use standard JOIN methods to complete this task.
  • Use the UDJ method.
    1. Create a UDJ function.
      1. Configure the SDK of the new version.
        <dependency>
          <groupId>com.aliyun.odps</groupId>
          <artifactId>odps-sdk-udf</artifactId>
          <version>0.29.10-public</version>
          <scope>provided</scope>
        </dependency>
      2. Write UDJ code and package the code as odps-udj-example.jar.
        package com.aliyun.odps.udf.example.udj;
        import com.aliyun.odps.Column;
        import com.aliyun.odps.OdpsType;
        import com.aliyun.odps.Yieldable;
        import com.aliyun.odps.data.ArrayRecord;
        import com.aliyun.odps.data.Record;
        import com.aliyun.odps.udf.DataAttributes;
        import com.aliyun.odps.udf.ExecutionContext;
        import com.aliyun.odps.udf.UDJ;
        import com.aliyun.odps.udf.annotation.Resolve;
        import java.util.ArrayList;
        import java.util.Iterator;
        /** For each record in the table on the right, find the record in the table on the left that has the closest time to this record.
         * Join the two records. 
         */
        @Resolve("->string,bigint,string")
        public class PayUserLogMergeJoin extends UDJ {
          private Record outputRecord;
          /** Call the preceding code before data is processed.  You can use the PayUserLogMergeJoin method for initialization. 
           */
          @Override
          public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
            //
            outputRecord = new ArrayRecord(new Column[]{
              new Column("user_id", OdpsType.STRING),
              new Column("time", OdpsType.BIGINT),
              new Column("content", OdpsType.STRING)
            });
          }
          /** Rewrite the PayUserLogMergeJoin method to implement the connection logic. 
           * @param key: the current key for joining. 
           * @param left: the record group of the current key in the table on the left. 
           * @param right: the record group of the current key in the table on the right. 
           * @param output: generates the result of UDJ. 
           */
          @Override
          public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
            outputRecord.setString(0, key.getString(0));
            if (!right.hasNext()) {
              // The group on the right is empty. Do not perform operations on this group. 
              return;
            } else if (!left.hasNext()) {
              // The group on the left is empty.  Generate all records from the group on the right but do not join the records of the two tables. 
              while (right.hasNext()) {
                Record logRecord = right.next();
                outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
                outputRecord.setString(2, logRecord.getString(1));
                output.yield(outputRecord);
              }
              return;
            }
            ArrayList<Record> pays = new ArrayList<>();
            // The records in the group on the left are iterated from the beginning to the end. 
            // The iterator cannot reset the records in the group on the right. 
            // Save each record of the group on the left to ArrayList. 
            left.forEachRemaining(pay -> pays.add(pay.clone()));
            while (right.hasNext()) {
              Record log = right.next();
              long logTime = log.getDatetime(0).getTime();
              long minDelta = Long.MAX_VALUE;
              Record nearestPay = null;
              // Iterate all the records of the group on the left. Then, you can find the minimum time difference between the records. 
              for (Record pay: pays) {
                long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
                if (delta < minDelta) {
                  minDelta = delta;
                  nearestPay = pay;
                }
              }
              // Merge the log records and payment records that are closest in time and then generate results. 
              outputRecord.setBigint(1, log.getDatetime(0).getTime());
              outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
              output.yield(outputRecord);
            }
          }
          String mergeLog(String payInfo, String logContent) {
            return logContent + ", pay " + payInfo;
          }
          @Override
          public void close() {
          }
        }
      3. Add JAR package resources to MaxCompute.
        add jar odps-udj-example.jar;
      4. Register the UDJ function pay_user_log_merge_join in MaxCompute.
        create function pay_user_log_merge_join
          as 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin'
          using 'odps-udj-example.jar';
    2. Prepare sample data.
      1. Create the payment table and the user_client_log table.
        create table payment(user_id string,time datetime,pay_info string);
        create table user_client_log(user_id string,time datetime,content string);
      2. Insert data into the sample tables.
        -- Insert data into the payment table. 
        INSERT OVERWRITE TABLE payment VALUES
        ('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'),
        ('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'),
        ('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'),
        ('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'),
        ('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'),
        ('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'),
        ('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'),
        ('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa')
        ;
        -- Insert data into the user_client_log table. 
        INSERT OVERWRITE TABLE user_client_log VALUES
        ('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'),
        ('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'),
        ('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'),
        ('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'),
        ('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'),
        ('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'),
        ('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'),
        ('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'),
        ('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'),
        ('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'),
        ('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'),
        ('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'),
        ('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'),
        ('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'),
        ('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'),
        ('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'),
        ('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ')
        ;
    3. Use a UDJ function in SQL.
      SELECT r.user_id, from_unixtime(time/1000) as time, content FROM (
      SELECT user_id, time as time, pay_info FROM payment
      ) p JOIN (
      SELECT user_id, time as time, content FROM user_client_log
      ) u
      ON p.user_id = u.user_id
      USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
      r
      AS (user_id, time, content);
      Parameters in the USING clause:
      • pay_user_log_merge_join: the name of the UDJ function in SQL.
      • (p.time, p.pay_info, u.time, u.content): the columns used in UDJ.
      • r: the alias of the result returned by the UDJ function. You can reference this alias in other SQL statements.
      • (user_id, time, content): the columns returned by the UDJ function.
      In this example, the following information appears on the Run Log tab:
      +---------+------------+---------+
      | user_id | time       | content |
      +---------+------------+---------+
      | 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB |
      | 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ |
      | 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn |
      | 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT |
      | 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT |
      | 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb |
      | 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb |
      | 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier |
      | 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier |
      | 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT |
      | 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT |
      | 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko |
      | 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb |
      | 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa |
      +---------+------------+---------+

Pre-sorting of UDJ

An iterator is used to iterate all records in the payment table and find the payment record that has the closest time to a specific log record in the user_client_log table. To perform this task, you must load all payment records with the same user_id to an ArrayList. This method can be applied when the number of payment records is small. Due to the limits of the memory size, you must find another method to load the data if a large number of payment records have been generated. This topic describes how to address this issue by using the SORT BY clause.

If the number of payment records is too large to be stored in the memory and all the data in the table has been sorted by time, you need only to compare the first element in the two lists.

This method uses the SORT BY clause to pre-sort the UDJ data. To achieve the same effect by using the previous method, you need only to cache a maximum of three data records during pre-sorting. UDJ in Java:
@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
  outputRecord.setString(0, key.getString(0));
  if (!right.hasNext()) {
    return;
  } else if (!left.hasNext()) {
    while (right.hasNext()) {
      Record logRecord = right.next();
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      outputRecord.setString(2, logRecord.getString(1));
      output.yield(outputRecord);
    }
    return;
  }
  long prevDelta = Long.MAX_VALUE;
  Record logRecord = right.next();
  Record payRecord = left.next();
  Record lastPayRecord = payRecord.clone();
  while (true) {
    long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();
    if (left.hasNext() && delta > 0) {
      // The time delta between the two records decreases and the operation can continue. 
      // Explore the group on the left to try to obtain a smaller time delta. 
      lastPayRecord = payRecord.clone();
      prevDelta = delta;
      payRecord = left.next();
    } else {
     // The minimum time delta point is reached.  Check the final record in the payment table. 
     // Generate the merged result and prepare to process the next record. 
     // The group on the right. 
      Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      String mergedString = mergeLog(nearestPay.getString(1), logRecord.getString(1));
      outputRecord.setString(2, mergedString);
      output.yield(outputRecord);
      if (right.hasNext()) {
        logRecord = right.next();
        prevDelta = Math.abs(
          logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime()
        );
      } else {
        break;
      }
    }
  }
}