All Products
Search
Document Center

MaxCompute:UDJ

Last Updated:Oct 12, 2023

MaxCompute introduces user defined join (UDJ) to the user-defined function (UDF) framework based on the MaxCompute V2.0 compute engine. UDJ allows more user-defined operations by flexibly joining tables and simplifies MapReduce-based operations in the underlying distributed system.

Background information

MaxCompute provides multiple built-in JOIN methods, including INNER JOIN, RIGHT JOIN, OUTER JOIN, LEFT JOIN, FULL JOIN, SEMI JOIN, and ANTI-SEMI JOIN. You can use the built-in JOIN methods in most scenarios. However, these methods are insufficient if you want to perform cross join operations.

In most cases, you can use UDFs to describe your code framework. However, the current UDF, user-defined table-valued function (UDTF), and user-defined aggregate function (UDAF) frameworks can only handle one table at a time. To perform user-defined operations for multiple tables, you must use built-in JOIN methods, UDFs, UDTFs, and complex SQL statements. In such scenarios, you must use a custom MapReduce framework instead of SQL to complete the required computing tasks.

Regardless of the scenario, the operations require technological expertise and may cause the following issues:

  • In scenarios where you use built-in JOIN methods, UDFs, UDTFs, and complex SQL statements: The use of multiple JOIN methods and code in SQL statements results in a logical black box, which causes difficulties in generating an optimal execution plan.

  • In scenarios where you use a custom MapReduce framework: Execution plans are hard to optimize. Most of the MapReduce code is written in Java. During the deep optimization of native runtime code, the execution of the MapReduce code is less efficient than the execution of the MaxCompute code that is generated by the Low Level Virtual Machine (LLVM) code generator.

Limits

You cannot use UDFs, UDAFs, or UDTFs to read data from the following types of tables:

  • Table on which schema evolution is performed

  • Table that contains complex data types

  • Table that contains JSON data types

  • Transactional table

UDJ performance

A real online MapReduce job is used as an example to verify the performance of UDJ. The job runs based on a complex algorithm. In this example, two tables are joined, UDJ is used to rewrite the MapReduce job, and the correctness of the UDJ results is checked. The following figure shows the MapReduce and UDJ performance under the same data concurrency.

UDJ conveniently describes the complex logic for handling multiple tables and greatly improves performance, as shown in the figure. The code is called only within UDJ. The logic of the entire mapper in this example is executed by the native runtime of MaxCompute. The data exchange logic between the MaxCompute UDJ runtime engine and Java interfaces is optimized in Java code. The JOIN logic of UDJ is more efficient than the JOIN logic of a reducer.

Cross join operations by using UDJ

The following example describes how to use UDJ in MaxCompute.

For example, two log tables named payment and user_client_log exist.

  • The payment table stores the payment records of users. Each payment record contains the user ID, payment time, and payment content. The following table describes the sample data.

    user_id

    time

    pay_info

    2656199

    2018-02-13 22:30:00

    gZhvdySOQb

    8881237

    2018-02-13 08:30:00

    pYvotuLDIT

    8881237

    2018-02-13 10:32:00

    KBuMzRpsko

  • The user_client_log table stores the client logs of users. Each log contains the user ID, logging time, and log content. The following table describes the sample data.

    user_id

    time

    content

    8881237

    2018-02-13 00:30:00

    click MpkvilgWSmhUuPn

    8881237

    2018-02-13 06:14:00

    click OkTYNUHMqZzlDyL

    8881237

    2018-02-13 10:30:00

    click OkTYNUHMqZzlDyL

Requirement: For each record in the user_client_log table, find the payment record that has the closest time to this record in the payment table. Then, join the two records and generate results. The following table describes the results.

user_id

time

content

8881237

2018-02-13 00:30:00

click MpkvilgWSmhUuPn, pay pYvotuLDIT

8881237

2018-02-13 06:14:00

click OkTYNUHMqZzlDyL, pay pYvotuLDIT

8881237

2018-02-13 10:30:00

click OkTYNUHMqZzlDyL, pay KBuMzRpsko

To meet this requirement, use one of the following methods:

  • Use built-in JOIN methods. Sample SQL pseudocode:

    SELECT
      p.user_id,
      p.time,
      merge(p.pay_info, u.content)
    FROM
      payment p RIGHT OUTER JOIN user_client_log u
    ON p.user_id = u.user_id and abs(p.time - u.time) = min(abs(p.time - u.time))

    When you join two records in the tables, you must calculate the minimum difference between p.time and u.time that correspond to the same user_id. However, you cannot call aggregate functions in a JOIN condition. Therefore, you cannot use standard JOIN methods to complete this task.

  • Use the UDJ method.

    1. Create a UDJ function.

      1. Configure the SDK of the new version.

        <dependency>
          <groupId>com.aliyun.odps</groupId>
          <artifactId>odps-sdk-udf</artifactId>
          <version>0.29.10-public</version>
          <scope>provided</scope>
        </dependency>
      2. Write UDJ code and package the code as odps-udj-example.jar.

        package com.aliyun.odps.udf.example.udj;
        import com.aliyun.odps.Column;
        import com.aliyun.odps.OdpsType;
        import com.aliyun.odps.Yieldable;
        import com.aliyun.odps.data.ArrayRecord;
        import com.aliyun.odps.data.Record;
        import com.aliyun.odps.udf.DataAttributes;
        import com.aliyun.odps.udf.ExecutionContext;
        import com.aliyun.odps.udf.UDJ;
        import com.aliyun.odps.udf.annotation.Resolve;
        import java.util.ArrayList;
        import java.util.Iterator;
        /** For each record in the table on the right, find the record in the table on the left that has the closest time to the record.
         * Join the two records. 
         */
        @Resolve("->string,bigint,string")
        public class PayUserLogMergeJoin extends UDJ {
          private Record outputRecord;
          /** Call the preceding code before data is processed.  You can use the PayUserLogMergeJoin method for initialization. 
           */
          @Override
          public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
            //
            outputRecord = new ArrayRecord(new Column[]{
              new Column("user_id", OdpsType.STRING),
              new Column("time", OdpsType.BIGINT),
              new Column("content", OdpsType.STRING)
            });
          }
          /** Rewrite the PayUserLogMergeJoin method to implement the connection logic. 
           * @param key: the current key for joining. 
           * @param left: the record group of the current key in the table on the left. 
           * @param right: the record group of the current key in the table on the right. 
           * @param output: generates the result of UDJ. 
           */
          @Override
          public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
            outputRecord.setString(0, key.getString(0));
            if (!right.hasNext()) {
              // The group on the right is empty. Do not perform operations on this group. 
              return;
            } else if (!left.hasNext()) {
              // The group on the left is empty.  Generate all records from the group on the right but do not join the records of the two tables. 
              while (right.hasNext()) {
                Record logRecord = right.next();
                outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
                outputRecord.setString(2, logRecord.getString(1));
                output.yield(outputRecord);
              }
              return;
            }
            ArrayList<Record> pays = new ArrayList<>();
            // The records in the group on the left are iterated from the beginning to the end. 
            // The iterator cannot reset the records in the group on the right. 
            // Save each record of the group on the left to an ArrayList. 
            left.forEachRemaining(pay -> pays.add(pay.clone()));
            while (right.hasNext()) {
              Record log = right.next();
              long logTime = log.getDatetime(0).getTime();
              long minDelta = Long.MAX_VALUE;
              Record nearestPay = null;
              // Iterate all records of the group on the left. Then, find the minimum time difference between the records. 
              for (Record pay: pays) {
                long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
                if (delta < minDelta) {
                  minDelta = delta;
                  nearestPay = pay;
                }
              }
              // Merge the log records and payment records that are closest in time and then generate results. 
              outputRecord.setBigint(1, log.getDatetime(0).getTime());
              outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
              output.yield(outputRecord);
            }
          }
          String mergeLog(String payInfo, String logContent) {
            return logContent + ", pay " + payInfo;
          }
          @Override
          public void close() {
          }
        }
      3. Add JAR package resources to MaxCompute.

        add jar odps-udj-example.jar;
      4. Register the UDJ function pay_user_log_merge_join in MaxCompute.

        create function pay_user_log_merge_join
          as 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin'
          using 'odps-udj-example.jar';
    2. Prepare sample data.

      1. Create the payment table and the user_client_log table.

        create table payment(user_id string,time datetime,pay_info string);
        create table user_client_log(user_id string,time datetime,content string);
      2. Insert data into the tables.

        -- Insert data into the payment table. 
        INSERT OVERWRITE TABLE payment VALUES
        ('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'),
        ('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'),
        ('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'),
        ('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'),
        ('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'),
        ('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'),
        ('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'),
        ('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa')
        ;
        -- Insert data into the user_client_log table. 
        INSERT OVERWRITE TABLE user_client_log VALUES
        ('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'),
        ('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'),
        ('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'),
        ('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'),
        ('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'),
        ('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'),
        ('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'),
        ('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'),
        ('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'),
        ('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'),
        ('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'),
        ('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'),
        ('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'),
        ('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'),
        ('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'),
        ('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'),
        ('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ')
        ;
    3. Use a UDJ function in SQL.

      SELECT r.user_id, from_unixtime(time/1000) as time, content FROM (
      SELECT user_id, time as time, pay_info FROM payment
      ) p JOIN (
      SELECT user_id, time as time, content FROM user_client_log
      ) u
      ON p.user_id = u.user_id
      USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
      r
      AS (user_id, time, content);

      Parameters in the USING clause:

      • pay_user_log_merge_join: the name of the UDJ function in SQL.

      • (p.time, p.pay_info, u.time, u.content): the columns used in UDJ.

      • r: the alias of the result that is returned by the UDJ function. You can reference this alias in other SQL statements.

      • (user_id, time, content): the columns that are returned by the UDJ function.

      In this example, the following result is returned:

      +---------+------------+---------+
      | user_id | time       | content |
      +---------+------------+---------+
      | 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB |
      | 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ |
      | 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn |
      | 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT |
      | 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT |
      | 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb |
      | 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb |
      | 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier |
      | 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier |
      | 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT |
      | 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT |
      | 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko |
      | 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb |
      | 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa |
      +---------+------------+---------+

Pre-sorting of UDJ

An iterator is used to iterate all records in the payment table and find the payment record that has the closest time to a specific log record in the user_client_log table. To perform this operation, you must load all payment records that have the same user_id to an ArrayList. You can use this method if the number of payment records for a user is small. If a large number of payment records are generated, you must use another method to load the data due to the limits of the memory size. This topic describes how to use the SORT BY clause to address this issue.

If the number of payment records for a user is excessively large and cannot be stored in the memory and all data in the table is sorted by time, you need to only compare the first element in the ArrayList.

This method uses the SORT BY clause to pre-sort the UDJ data. To achieve the same effect by using the SORT BY clause, you need to only cache a maximum of three data records during pre-sorting. UDJ in Java:

@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
  outputRecord.setString(0, key.getString(0));
  if (!right.hasNext()) {
    return;
  } else if (!left.hasNext()) {
    while (right.hasNext()) {
      Record logRecord = right.next();
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      outputRecord.setString(2, logRecord.getString(1));
      output.yield(outputRecord);
    }
    return;
  }
  long prevDelta = Long.MAX_VALUE;
  Record logRecord = right.next();
  Record payRecord = left.next();
  Record lastPayRecord = payRecord.clone();
  while (true) {
    long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();
    if (left.hasNext() && delta > 0) {
      // The time delta between the two records decreases and the operation can continue. 
      // Explore the group on the left to try to obtain a smaller time delta. 
      lastPayRecord = payRecord.clone();
      prevDelta = delta;
      payRecord = left.next();
    } else {
     // The minimum time delta point is reached.  Check the final record in the payment table. 
     // Generate the merged result and prepare to process the next record. 
     // The group on the right. 
      Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      String mergedString = mergeLog(nearestPay.getString(1), logRecord.getString(1));
      outputRecord.setString(2, mergedString);
      output.yield(outputRecord);
      if (right.hasNext()) {
        logRecord = right.next();
        prevDelta = Math.abs(
          logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime()
        );
      } else {
        break;
      }
    }
  }
}