全部产品
Search
文档中心

MaxCompute:UDJ

更新时间:Nov 10, 2025

MaxCompute memperkenalkan user defined join (UDJ) ke dalam kerangka kerja user-defined function (UDF) berdasarkan mesin komputasi MaxCompute V2.0. UDJ memungkinkan operasi yang lebih fleksibel yang ditentukan pengguna dengan melakukan penggabungan tabel secara dinamis dan menyederhanakan operasi berbasis MapReduce dalam sistem terdistribusi yang mendasarinya.

Informasi latar belakang

MaxCompute menyediakan beberapa operasi Join bawaan, seperti INNER JOIN, RIGHT JOIN, LEFT JOIN, FULL JOIN, SEMI JOIN, dan ANTI-SEMI JOIN. Meskipun operasi Join bawaan ini sangat andal, implementasi standarnya tidak dapat memenuhi kebutuhan banyak skenario yang melibatkan operasi lintas tabel.

Dalam sebagian besar kasus, Anda dapat menggunakan UDF untuk menggambarkan kerangka kode Anda. Namun, kerangka UDF, user-defined table-valued function (UDTF), dan user-defined aggregate function (UDAF) saat ini hanya dapat menangani satu tabel dalam satu waktu. Untuk melakukan operasi yang ditentukan pengguna pada beberapa tabel, Anda harus mengombinasikan JOIN methods, UDFs, UDTFs, dan pernyataan SQL yang kompleks. Dalam skenario seperti itu, Anda harus menggunakan kerangka MapReduce kustom alih-alih SQL untuk menyelesaikan tugas komputasi yang diperlukan.

Terlepas dari skenarionya, pendekatan tersebut memerlukan technological expertise dan dapat menimbulkan masalah berikut:

  • Dalam skenario di mana Anda menggunakan built-in JOIN methods, UDFs, UDTFs, and complex SQL statements: Penggunaan berbagai metode JOIN dan kode dalam pernyataan SQL menghasilkan kotak hitam logis, yang menyulitkan pembuatan rencana eksekusi optimal.

  • Dalam skenario di mana Anda menggunakan kerangka MapReduce kustom: Rencana eksekusi sulit dioptimalkan. Sebagian besar kode MapReduce ditulis dalam Java. Selama optimasi mendalam terhadap kode runtime native, eksekusi kode MapReduce kurang efisien dibandingkan eksekusi kode MaxCompute yang dihasilkan oleh generator kode Low Level Virtual Machine (LLVM).

Batasan

Anda tidak dapat menggunakan UDF, UDAF, atau UDTF untuk membaca data dari jenis tabel berikut:

  • Tabel yang menjalani evolusi skema

  • Tabel yang berisi tipe data kompleks

  • Tabel yang berisi tipe data JSON

  • Tabel transaksional

Kinerja UDJ

Pekerjaan MapReduce online nyata digunakan sebagai contoh untuk memverifikasi kinerja UDJ. Pekerjaan tersebut berjalan berdasarkan algoritma yang kompleks. Dalam contoh ini, dua tabel digabungkan, UDJ digunakan untuk menulis ulang pekerjaan MapReduce, dan kebenaran hasil UDJ diperiksa. Gambar berikut menunjukkan kinerja MapReduce dan UDJ di bawah konkurensi data yang sama.

UDJ dengan mudah menggambarkan logika kompleks untuk menangani beberapa tabel dan secara signifikan meningkatkan kinerja, seperti yang ditunjukkan dalam gambar. Kode hanya dipanggil di dalam UDJ. Logika seluruh mapper dalam contoh ini dieksekusi oleh runtime native MaxCompute. Logika pertukaran data antara mesin runtime UDJ MaxCompute dan antarmuka Java dioptimalkan dalam kode Java. Logika JOIN UDJ lebih efisien daripada logika JOIN reducer.

Operasi cross join menggunakan UDJ

Contoh berikut menjelaskan cara menggunakan UDJ di MaxCompute.

Misalnya, terdapat dua tabel log bernama payment dan user_client_log.

  • Tabel payment menyimpan catatan pembayaran pengguna. Setiap catatan pembayaran berisi ID pengguna, waktu pembayaran, dan konten pembayaran. Tabel berikut menggambarkan data sampel.

    user_id

    time

    pay_info

    2656199

    2018-02-13 22:30:00

    gZhvdySOQb

    8881237

    2018-02-13 08:30:00

    pYvotuLDIT

    8881237

    2018-02-13 10:32:00

    KBuMzRpsko

  • Tabel user_client_log menyimpan log klien pengguna. Setiap log berisi ID pengguna, waktu pencatatan, dan konten log. Tabel berikut menggambarkan data sampel.

    user_id

    time

    content

    8881237

    2018-02-13 00:30:00

    click MpkvilgWSmhUuPn

    8881237

    2018-02-13 06:14:00

    click OkTYNUHMqZzlDyL

    8881237

    2018-02-13 10:30:00

    click OkTYNUHMqZzlDyL

Persyaratan: Untuk setiap catatan dalam tabel user_client_log, temukan catatan pembayaran yang memiliki waktu paling dekat dengan catatan tersebut dalam tabel payment. Kemudian, gabungkan kedua catatan tersebut dan hasilkan output. Tabel berikut menggambarkan hasilnya.

user_id

time

content

8881237

2018-02-13 00:30:00

click MpkvilgWSmhUuPn, pay pYvotuLDIT

8881237

2018-02-13 06:14:00

click OkTYNUHMqZzlDyL, pay pYvotuLDIT

8881237

2018-02-13 10:30:00

click OkTYNUHMqZzlDyL, pay KBuMzRpsko

Untuk memenuhi persyaratan ini, gunakan salah satu metode berikut:

  • Gunakan metode JOIN bawaan. Contoh pseudokode SQL:

    SELECT
      p.user_id,
      p.time,
      MERGE(p.pay_info, u.content)
    FROM
      payment p RIGHT OUTER JOIN user_client_log u
    ON p.user_id = u.user_id AND ABS(p.time - u.time) = MIN(ABS(p.time - u.time))

    Saat menggabungkan dua catatan dalam tabel, Anda harus menghitung selisih minimum antara p.time dan u.time yang sesuai dengan user_id yang sama. Namun, Anda tidak dapat memanggil fungsi agregat dalam kondisi JOIN. Oleh karena itu, Anda tidak dapat menggunakan metode JOIN standar untuk menyelesaikan tugas ini.

  • Gunakan metode UDJ.

    1. Buat fungsi UDJ.

      1. Konfigurasikan SDK versi baru.

        <dependency>
          <groupId>com.aliyun.odps</groupId>
          <artifactId>odps-sdk-udf</artifactId>
          <version>0.29.10-public</version>
          <scope>provided</scope>
        </dependency>
      2. Tulis kode UDJ dan paketkan kode tersebut sebagai odps-udj-example.jar.

        package com.aliyun.odps.udf.example.udj;
        import com.aliyun.odps.Column;
        import com.aliyun.odps.OdpsType;
        import com.aliyun.odps.Yieldable;
        import com.aliyun.odps.data.ArrayRecord;
        import com.aliyun.odps.data.Record;
        import com.aliyun.odps.udf.DataAttributes;
        import com.aliyun.odps.udf.ExecutionContext;
        import com.aliyun.odps.udf.UDJ;
        import com.aliyun.odps.udf.annotation.Resolve;
        import java.util.ArrayList;
        import java.util.Iterator;
        /** Untuk setiap catatan di tabel kanan, temukan catatan terdekat di tabel kiri
         * dan gabungkan kedua catatan tersebut.
         */
        @Resolve("->string,bigint,string")
        public class PayUserLogMergeJoin extends UDJ {
          private Record outputRecord;
          /** Metode ini dipanggil sebelum fase pemrosesan data. Anda dapat mengimplementasikan metode ini untuk melakukan inisialisasi.
           */
          @Override
          public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
            //
            outputRecord = new ArrayRecord(new Column[]{
              new Column("user_id", OdpsType.STRING),
              new Column("time", OdpsType.BIGINT),
              new Column("content", OdpsType.STRING)
            });
          }
         /** Timpa metode ini untuk mengimplementasikan logika join.
           * @param key Kunci gabungan saat ini
           * @param left Kelompok catatan dari tabel kiri yang sesuai dengan kunci saat ini
           * @param right Kelompok catatan dari tabel kanan yang sesuai dengan kunci saat ini
           * @param output Digunakan untuk mengeluarkan hasil UDJ
           */
          @Override
          public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
            outputRecord.setString(0, key.getString(0));
            if (!right.hasNext()) {
              // Kelompok kanan kosong. Tidak melakukan apa-apa.
              return;
            } else if (!left.hasNext()) {
              // Kelompok kiri kosong. Keluarkan semua catatan dari kelompok kanan tanpa penggabungan.
              while (right.hasNext()) {
                Record logRecord = right.next();
                outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
                outputRecord.setString(2, logRecord.getString(1));
                output.yield(outputRecord);
              }
              return;
            }
            ArrayList<Record> pays = new ArrayList<>();
            // Kelompok catatan di sisi kiri akan diiterasi dari awal hingga akhir.
            // Iterator tidak dapat diatur ulang untuk setiap catatan di kelompok kanan.
            // Oleh karena itu, kami menyimpan setiap catatan dari tabel kiri ke dalam ArrayList.
            left.forEachRemaining(pay -> pays.add(pay.clone()));
            while (right.hasNext()) {
              Record log = right.next();
              long logTime = log.getDatetime(0).getTime();
              long minDelta = Long.MAX_VALUE;
              Record nearestPay = null;
              // Iterasi melalui semua catatan di sisi kiri untuk menemukan catatan dengan selisih waktu terkecil.
              for (Record pay: pays) {
                long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
                if (delta < minDelta) {
                  minDelta = delta;
                  nearestPay = pay;
                }
              }
              // Gabungkan catatan log dengan catatan pembayaran terdekat dan keluarkan hasilnya.
              outputRecord.setBigint(1, log.getDatetime(0).getTime());
              outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
              output.yield(outputRecord);
            }
          }
          String mergeLog(String payInfo, String logContent) {
            return logContent + ", pay " + payInfo;
          }
          @Override
          public void close() {
          }
        }
      3. Tambahkan sumber daya paket JAR ke MaxCompute.

        ADD jar odps-udj-example.jar;
      4. Daftarkan fungsi UDJ pay_user_log_merge_join di MaxCompute.

        CREATE FUNCTION pay_user_log_merge_join
          AS 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin'
          USING 'odps-udj-example.jar';
    2. Persiapkan data sampel.

      1. Buat tabel payment dan tabel user_client_log.

        CREATE TABLE payment(user_id STRING,time DATETIME,pay_info STRING);
        CREATE TABLE user_client_log(user_id STRING,time DATETIME,content STRING);
      2. Masukkan data ke dalam tabel.

        --Masukkan data ke dalam tabel payment.
        INSERT OVERWRITE TABLE payment VALUES
        ('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'),
        ('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'),
        ('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'),
        ('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'),
        ('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'),
        ('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'),
        ('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'),
        ('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa')
        ;
        --Masukkan data ke dalam tabel user_client_log.
        INSERT OVERWRITE TABLE user_client_log VALUES
        ('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'),
        ('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'),
        ('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'),
        ('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'),
        ('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'),
        ('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'),
        ('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'),
        ('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'),
        ('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'),
        ('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'),
        ('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'),
        ('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'),
        ('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'),
        ('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'),
        ('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'),
        ('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'),
        ('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ')
        ;
    3. Gunakan UDJ dalam SQL.

      SELECT r.user_id, FROM_UNIXTIME(time/1000) AS time, content FROM (
      SELECT user_id, time AS time, pay_info FROM payment
      ) p JOIN (
      SELECT user_id, time AS time, content FROM user_client_log
      ) u
      ON p.user_id = u.user_id
      USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
      r
      AS (user_id, time, content);

      Parameter dalam klausa USING:

      • pay_user_log_merge_join adalah nama fungsi UDJ yang telah didaftarkan.

      • (p.time, p.pay_info, u.time, u.content) menentukan kolom dari tabel kiri dan kanan yang digunakan dalam UDJ.

      • r adalah alias untuk hasil UDJ. Anda dapat mereferensikan alias ini di tempat lain.

      • (user_id, time, content) menentukan nama kolom untuk hasil yang dihasilkan oleh UDJ.

      Dalam contoh ini, hasil berikut dikembalikan:

      +---------+------------+---------+
      | user_id | time       | content |
      +---------+------------+---------+
      | 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB |
      | 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ |
      | 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn |
      | 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT |
      | 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT |
      | 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn |
      | 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb |
      | 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb |
      | 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier |
      | 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier |
      | 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT |
      | 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT |
      | 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko |
      | 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb |
      | 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa |
      +---------+------------+---------+

Pra-pengurutan UDJ

Untuk menemukan catatan dengan selisih waktu terkecil, Anda harus berulang kali melintasi data dalam tabel `payment` menggunakan iterator. Oleh karena itu, semua catatan pembayaran yang memiliki `user_id` yang sama dimuat ke dalam `ArrayList` terlebih dahulu. Metode ini berlaku untuk skenario di mana pengguna memiliki jumlah perilaku pembayaran yang kecil dalam sehari. Dalam skenario lain, jumlah data dalam kelompok yang sama mungkin terlalu besar untuk disimpan dalam memori. Dalam kasus ini, Anda dapat menggunakan pra-pengurutan `SORT BY` untuk mengatasi masalah tersebut.

Jika jumlah catatan pembayaran untuk pengguna terlalu besar sehingga tidak dapat disimpan dalam memori dan semua data dalam tabel diurutkan berdasarkan waktu, Anda hanya perlu membandingkan elemen pertama dalam ArrayList.

Contoh berikut menunjukkan kode Java UDJ.

@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
  outputRecord.setString(0, key.getString(0));
  if (!right.hasNext()) {
    return;
  } else if (!left.hasNext()) {
    while (right.hasNext()) {
      Record logRecord = right.next();
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      outputRecord.setString(2, logRecord.getString(1));
      output.yield(outputRecord);
    }
    return;
  }
  long prevDelta = Long.MAX_VALUE;
  Record logRecord = right.next();
  Record payRecord = left.next();
  Record lastPayRecord = payRecord.clone();
  while (true) {
    long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();
    if (left.hasNext() && delta > 0) {
      // Selisih waktu antara dua catatan semakin mengecil, jadi kita masih bisa melanjutkan.
      // Jelajahi kelompok kiri untuk mencoba mendapatkan delta yang lebih kecil.
      lastPayRecord = payRecord.clone();
      prevDelta = delta;
      payRecord = left.next();
    } else {
     // Titik delta minimum tercapai. Periksa catatan terakhir,
     // keluarkan hasil gabungan, dan siapkan untuk memproses catatan berikutnya dari
     // kelompok kanan.
      Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;
      outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
      String mergedString = mergeLog(nearestPay.getString(1), logRecord.getString(1));
      outputRecord.setString(2, mergedString);
      output.yield(outputRecord);
      if (right.hasNext()) {
        logRecord = right.next();
        prevDelta = Math.abs(
          logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime()
        );
      } else {
        break;
      }
    }
  }
}
Catatan

Setelah Anda memodifikasi kode Java UDJ, Anda harus memperbarui paket JAR yang sesuai agar logika baru berlaku.

Tambahkan klausa SORT BY di akhir pernyataan UDJ untuk mengurutkan tabel kiri dan kanan dalam grup UDJ berdasarkan bidang waktu masing-masing. Berikut adalah kode SQL-nya.

SELECT r.user_id, from_unixtime(time/1000) AS time, content FROM (
  SELECT user_id, time AS time, pay_info FROM payment
) p JOIN (
  SELECT user_id, time AS time, content FROM user_client_log
) u
ON p.user_id = u.user_id
USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)
r
AS (user_id, time, content)
SORT BY p.time, u.time;

Hasilnya konsisten dengan hasil contoh pada bagian Operasi cross join menggunakan UDJ. Metode ini menggunakan klausa `SORT BY` untuk melakukan pra-pengurutan data UDJ. Dalam proses ini, Anda hanya perlu menyimpan maksimal tiga catatan dalam cache sekaligus untuk mengimplementasikan fitur yang sama seperti algoritma sebelumnya.