All Products
Search
Document Center

MaxCompute:User-defined join (UDJ)

Last Updated:Mar 27, 2026

User-defined join (UDJ) memperluas kerangka kerja user-defined function (UDF) MaxCompute dengan kemampuan menerapkan logika join kustom antara dua tabel. Dibangun di atas mesin komputasi MaxCompute V2.0, UDJ memungkinkan Anda mengekspresikan operasi lintas tabel yang tidak dapat ditangani oleh tipe JOIN standar maupun kerangka kerja UDF/UDTF/UDAF, tanpa harus menggunakan implementasi MapReduce kustom.

Kapan menggunakan UDJ

MaxCompute menyediakan enam tipe join bawaan: INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL JOIN, SEMI JOIN, dan ANTI-SEMI JOIN. Kerangka kerja UDF, user-defined table-valued function (UDTF), dan user-defined aggregate function (UDAF) yang ada masing-masing hanya beroperasi pada satu tabel dalam satu waktu.

Untuk melakukan join beberapa tabel dengan logika kustom, saat ini Anda memiliki dua opsi, yang keduanya memiliki kelemahan signifikan:

  • Built-in JOIN dengan SQL kompleks: Menggabungkan berbagai tipe JOIN dengan UDF dalam satu pernyataan SQL menciptakan kotak hitam logis yang menyulitkan pembuatan rencana eksekusi optimal.

  • MapReduce kustom: Rencana eksekusi sulit dioptimalkan. Sebagian besar kode MapReduce ditulis dalam Java, dan eksekusinya kurang efisien dibandingkan kode native MaxCompute yang dihasilkan oleh generator kode Low Level Virtual Machine (LLVM).

UDJ mengatasi kedua keterbatasan tersebut. Logika join berjalan di dalam runtime native MaxCompute, dan pertukaran data antara mesin runtime UDJ MaxCompute dan kode Java Anda telah dioptimalkan — sehingga logika join UDJ lebih efisien daripada reducer MapReduce yang setara.

Dengan UDJ, Anda dapat:

  • Menerapkan logika penggabungan kustom pada catatan yang dikelompokkan bersama dari dua tabel

  • Menangani kasus asimetris (kelompok kiri kosong, kelompok kanan kosong) secara eksplisit

  • Menggunakan pre-sorting SORT BY untuk memproses kelompok besar secara efisien tanpa memuat semua catatan ke dalam memori

  • Menggantikan logika reducer MapReduce yang kompleks dengan kode Java yang dapat dipanggil melalui SQL

Keterbatasan

Anda tidak dapat menggunakan UDF, UDAF, atau UDTF untuk membaca data dari jenis tabel berikut:

  • Tabel tempat penerapan evolusi skema

  • Tabel yang berisi tipe data kompleks

  • Tabel yang berisi tipe data JSON

  • Tabel transaksional

Kinerja

Untuk memvalidasi kinerja UDJ, sebuah pekerjaan MapReduce dunia nyata yang menjalankan algoritma kompleks ditulis ulang menggunakan UDJ. Kedua pendekatan tersebut dijalankan terhadap set data yang sama dengan tingkat konkurensi yang identik. Gambar di bawah ini menunjukkan hasilnya.

UDJ jauh lebih unggul dibandingkan versi MapReduce. Seluruh logika mapper berjalan di runtime native MaxCompute. Logika pertukaran data antara mesin runtime UDJ MaxCompute dan antarmuka Java telah dioptimalkan dalam kode Java. Logika join-nya lebih efisien daripada reducer yang setara.

Implementasi UDJ: contoh join lintas tabel

Bagian ini memandu Anda melalui contoh lengkap: untuk setiap catatan dalam log aktivitas klien, temukan catatan pembayaran dengan timestamp terdekat lalu gabungkan keduanya.

Tabel sampel

payment — menyimpan catatan pembayaran pengguna

user_idtimepay_info
26561992018-02-13 22:30:00gZhvdySOQb
88812372018-02-13 08:30:00pYvotuLDIT
88812372018-02-13 10:32:00KBuMzRpsko

user_client_log — menyimpan log aktivitas klien

user_idtimecontent
88812372018-02-13 00:30:00click MpkvilgWSmhUuPn
88812372018-02-13 06:14:00click OkTYNUHMqZzlDyL
88812372018-02-13 10:30:00click OkTYNUHMqZzlDyL

Tujuan: Untuk setiap catatan user_client_log, temukan catatan payment dengan nilai time terdekat untuk user_id yang sama, lalu gabungkan kedua catatan tersebut.

JOIN standar tidak dapat mencapai hal ini — kondisi ABS(p.time - u.time) = MIN(ABS(p.time - u.time)) memerlukan fungsi agregat di dalam predikat JOIN, yang tidak diizinkan oleh SQL.

Langkah 1: Konfigurasikan SDK

Tambahkan SDK UDF ke proyek Maven Anda:

<dependency>
  <groupId>com.aliyun.odps</groupId>
  <artifactId>odps-sdk-udf</artifactId>
  <version>0.29.10-public</version>
  <scope>provided</scope>
</dependency>

Langkah 2: Tulis kelas UDJ

Kelas UDJ mengimplementasikan tiga metode siklus hidup:

  • setup() — dipanggil sekali sebelum pemrosesan dimulai; inisialisasi skema output dan status bersama di sini

  • join() — dipanggil sekali per kunci join; menerima iterator atas kelompok catatan kiri dan kanan

  • close() — dipanggil setelah semua kelompok diproses; bebaskan sumber daya di sini

Contoh di bawah ini mengimplementasikan pencocokan waktu terdekat. Semua catatan pembayaran untuk user_id tertentu dimuat ke dalam ArrayList sehingga iterator sisi kanan dapat membandingkan setiap catatan log terhadap semuanya.

package com.aliyun.odps.udf.example.udj;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Yieldable;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.udf.DataAttributes;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDJ;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.ArrayList;
import java.util.Iterator;

// Output schema: (user_id STRING, time BIGINT, content STRING)
@Resolve("->string,bigint,string")
public class PayUserLogMergeJoin extends UDJ {

  private Record outputRecord;

  // Initialize the output record schema before processing starts.
  @Override
  public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
    outputRecord = new ArrayRecord(new Column[]{
      new Column("user_id", OdpsType.STRING),
      new Column("time", OdpsType.BIGINT),
      new Column("content", OdpsType.STRING)
    });
  }

  // Called once per join key (user_id).
  // left  = payment records for this user_id
  // right = log records for this user_id
  @Override
  public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
    outputRecord.setString(0, key.getString(0));

    if (!right.hasNext()) {
      // No log records for this user — nothing to output.
      return;
    } else if (!left.hasNext()) {
      // No payment records — output log records unmerged.
      while (right.hasNext()) {
        Record logRecord = right.next();
        outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
        outputRecord.setString(2, logRecord.getString(1));
        output.yield(outputRecord);
      }
      return;
    }

    // Load all payment records into memory so each log record
    // can be compared against the full set.
    ArrayList<Record> pays = new ArrayList<>();
    left.forEachRemaining(pay -> pays.add(pay.clone()));

    while (right.hasNext()) {
      Record log = right.next();
      long logTime = log.getDatetime(0).getTime();
      long minDelta = Long.MAX_VALUE;
      Record nearestPay = null;

      // Find the payment record with the smallest time difference.
      for (Record pay : pays) {
        long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
        if (delta < minDelta) {
          minDelta = delta;
          nearestPay = pay;
        }
      }

      // Merge the log record with its nearest payment record.
      outputRecord.setBigint(1, log.getDatetime(0).getTime());
      outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
      output.yield(outputRecord);
    }
  }

  String mergeLog(String payInfo, String logContent) {
    return logContent + ", pay " + payInfo;
  }

  @Override
  public void close() {}
}

Paketkan kelas ini sebagai odps-udj-example.jar.

Langkah 3: Daftarkan fungsi UDJ

Unggah file JAR dan daftarkan fungsinya:

ADD jar odps-udj-example.jar;

CREATE FUNCTION pay_user_log_merge_join
  AS 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin'
  USING 'odps-udj-example.jar';

Langkah 4: Siapkan data sampel

CREATE TABLE payment(user_id STRING, time DATETIME, pay_info STRING);
CREATE TABLE user_client_log(user_id STRING, time DATETIME, content STRING);

-- Insert payment records
INSERT OVERWRITE TABLE payment VALUES
('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'),
('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'),
('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'),
('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'),
('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'),
('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'),
('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'),
('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa');

-- Insert log records
INSERT OVERWRITE TABLE user_client_log VALUES
('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'),
('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'),
('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'),
('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'),
('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'),
('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'),
('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'),
('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'),
('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'),
('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'),
('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'),
('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'),
('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'),
('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'),
('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'),
('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'),
('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ');

Langkah 5: Jalankan UDJ dalam SQL

Klausa USING mengidentifikasi fungsi UDJ dan memetakan kolom dari masing-masing tabel:

SELECT r.user_id, FROM_UNIXTIME(time/1000) AS time, content
FROM (
  SELECT user_id, time AS time, pay_info FROM payment
) p
JOIN (
  SELECT user_id, time AS time, content FROM user_client_log
) u
ON p.user_id = u.user_id
USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
r
AS (user_id, time, content);

Parameter klausa USING:

ParameterDeskripsi
pay_user_log_merge_joinNama fungsi UDJ yang telah didaftarkan
(p.time, p.pay_info, u.time, u.content)Kolom dari tabel kiri dan kanan yang dilewatkan ke UDJ
rAlias untuk set hasil UDJ, dapat dirujuk dalam query luar
(user_id, time, content)Nama kolom untuk output UDJ

Output yang diharapkan:

+---------+---------------------+-----------------------------------------------+
| user_id | time                | content                                       |
+---------+---------------------+-----------------------------------------------+
| 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB                         |
| 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ                         |
| 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn         |
| 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn         |
| 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT         |
| 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT         |
| 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn         |
| 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn         |
| 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb         |
| 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb         |
| 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier                         |
| 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier                         |
| 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT         |
| 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT         |
| 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko         |
| 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb         |
| 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa         |
+---------+---------------------+-----------------------------------------------+

Optimalkan penggunaan memori dengan pre-sorting SORT BY

Implementasi di atas memuat semua catatan pembayaran untuk setiap user_id ke dalam ArrayList. Pendekatan ini berfungsi ketika pengguna memiliki jumlah catatan pembayaran yang sedikit, tetapi gagal ketika kelompok terlalu besar untuk muat dalam memori.

Jika data diurutkan berdasarkan waktu, Anda hanya perlu melacak beberapa catatan sekaligus alih-alih seluruh kelompok.

Perubahan SQL

Tambahkan klausa SORT BY untuk mengurutkan kedua tabel dalam setiap kelompok join:

SELECT r.user_id, from_unixtime(time/1000) AS time, content
FROM (
  SELECT user_id, time AS time, pay_info FROM payment
) p
JOIN (
  SELECT user_id, time AS time, content FROM user_client_log
) u
ON p.user_id = u.user_id
USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
r
AS (user_id, time, content)
SORT BY p.time, u.time;

Metode join() yang diperbarui

Dengan kedua sisi diurutkan berdasarkan waktu, Anda dapat menemukan catatan pembayaran terdekat menggunakan satu pemindaian linear — membandingkan jendela geser maksimal tiga catatan alih-alih mengiterasi seluruh kelompok kiri untuk setiap catatan kanan. Perbarui metode join() untuk mengimplementasikan logika ini:

@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
  outputRecord.setString(0, key.getString(0));

  if (!right.hasNext()) {
    return;
  } else if (!left.hasNext()) {
    while (right.hasNext()) {
      Record logRecord = right.next();
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      outputRecord.setString(2, logRecord.getString(1));
      output.yield(outputRecord);
    }
    return;
  }

  long prevDelta = Long.MAX_VALUE;
  Record logRecord = right.next();
  Record payRecord = left.next();
  Record lastPayRecord = payRecord.clone();

  while (true) {
    long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();

    if (left.hasNext() && delta > 0) {
      // The time gap is still shrinking — advance the left iterator.
      lastPayRecord = payRecord.clone();
      prevDelta = delta;
      payRecord = left.next();
    } else {
      // Minimum delta reached. Output the merged record and advance the right iterator.
      Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      outputRecord.setString(2, mergeLog(nearestPay.getString(1), logRecord.getString(1)));
      output.yield(outputRecord);

      if (right.hasNext()) {
        logRecord = right.next();
        prevDelta = Math.abs(
          logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime()
        );
      } else {
        break;
      }
    }
  }
}

Versi ini hanya menyimpan maksimal tiga catatan sekaligus dan menghasilkan output yang sama dengan pendekatan ArrayList.

Catatan

Setelah memodifikasi kelas Java UDJ, bangun ulang file JAR dan tambahkan kembali ke MaxCompute agar perubahan diterapkan.