All Products
Search
Document Center

MaxCompute:Java UDAFs

Last Updated:Mar 26, 2026

Fungsi agregat yang didefinisikan pengguna (UDAF) mengurangi beberapa baris input menjadi satu nilai output—mirip dengan fungsi bawaan seperti SUM atau AVG, tetapi menggunakan logika kustom yang Anda definisikan dalam Java.

Cara kerja

MaxCompute memproses UDAF dalam pipeline terdistribusi melalui tiga fase:

FaseMetode yang dipanggilApa yang terjadi
Map (iterate)iterate()Setiap worker membaca partisi data masukannya dan memanggil iterate() sekali per baris, mengakumulasi hasil ke dalam buffer lokal.
Combine/shuffle (merge)merge()Worker saling bertukar buffer parsial. merge() menggabungkan buffer parsial dari satu worker ke buffer utama, mengkonsolidasikan hasil dari berbagai tugas map.
Reduce (terminate)terminate()Setelah semua buffer digabung, terminate() mengekstraksi nilai output akhir dari buffer.

Karena data berpindah antar worker terdistribusi, buffer agregasi harus dapat diserialisasi. Implementasikan antarmuka Writable untuk mengonversi objek dalam memori menjadi urutan byte guna transmisi jaringan dan penyimpanan disk.

Struktur kode UDAF

UDAF Java terdiri dari komponen-komponen berikut:

KomponenWajibDeskripsi
Paket JavaTidakMengemas kelas Anda ke dalam file JAR
Kelas dasarYacom.aliyun.odps.udf.Aggregator dan com.aliyun.odps.udf.annotation.Resolve
com.aliyun.odps.udf.UDFExceptionTidakDigunakan dalam metode inisialisasi dan terminasi
@Resolve annotationYaMendeklarasikan tipe data input dan output
Kelas Java kustomYaMemperluas Aggregator; mendefinisikan buffer dan logika Anda

Metode wajib

Perluas com.aliyun.odps.udf.Aggregator dan implementasikan metode-metode berikut:

import com.aliyun.odps.udf.ContextFunction;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;

public abstract class Aggregator implements ContextFunction {
    // Dipanggil sekali sebelum pemrosesan dimulai. Gunakan untuk inisialisasi.
    @Override
    public void setup(ExecutionContext ctx) throws UDFException {}

    // Dipanggil sekali setelah pemrosesan selesai. Gunakan untuk pembersihan.
    @Override
    public void close() throws UDFException {}

    // Membuat buffer agregasi baru yang kosong untuk setiap kelompok.
    abstract public Writable newBuffer();

    // Fase Map: dipanggil sekali per baris input. Akumulasikan data ke dalam buffer.
    // args tidak boleh null, tetapi nilai individual dalam args bisa null (menunjukkan input SQL NULL).
    abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;

    // Fase Combine/reduce: menggabungkan buffer parsial (partial) ke dalam buffer utama.
    abstract public void merge(Writable buffer, Writable partial) throws UDFException;

    // Fase Reduce: mengekstraksi nilai output akhir dari buffer yang telah digabung sepenuhnya.
    abstract public Writable terminate(Writable buffer) throws UDFException;
}

Metode wajib vs opsional:

MetodeWajibKapan dijalankan
newBuffer()SelaluSebelum memproses setiap kelompok
iterate()SelaluFase Map — sekali per baris input
merge()SelaluFase Combine/shuffle — menggabungkan buffer parsial dari worker berbeda
terminate()SelaluFase Reduce — menghasilkan output akhir
setup()Hanya jika Anda memerlukan inisialisasiSebelum tugas dimulai
close()Hanya jika Anda memerlukan pembersihanSetelah tugas selesai
iterate() dan merge() berbagi instance buffer yang sama dalam satu kelompok. Rancang buffer Anda agar dapat mengakumulasi data dari keduanya tanpa konflik.

Buffer agregasi

Buffer agregasi menyimpan hasil antara saat data mengalir melalui pipeline terdistribusi. Implementasikan antarmuka Writable agar dapat diserialisasi:

import com.aliyun.odps.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

private static class MyBuffer implements Writable {
    // Menyerialisasi field buffer ke aliran byte untuk transmisi jaringan.
    @Override
    public void write(DataOutput out) throws IOException {
        // Tulis setiap field dalam urutan tetap.
    }

    // Mendeserialisasi field kembali dari aliran byte.
    @Override
    public void readFields(DataInput in) throws IOException {
        // Baca setiap field dalam urutan yang sama seperti write().
    }
}

Baca dan tulis field dalam urutan yang sama. Ketidaksesuaian antara write() dan readFields() menyebabkan korupsi data diam-diam yang sulit didebug.

@Resolve annotation

Deklarasikan tipe input dan tipe kembalian menggunakan anotasi @Resolve. MaxCompute memeriksa konsistensi tipe selama penguraian semantik—ketidaksesuaian akan mengembalikan error sebelum pekerjaan dijalankan.

Format:

@Resolve('<arg_type_list> -> <return_type>')
BagianDeskripsi
arg_type_listDaftar tipe input yang dipisahkan koma. Atur ke * untuk menerima jumlah argumen apa pun, atau biarkan kosong untuk tidak menerima argumen.
return_typeSatu tipe kembalian. UDAF selalu mengembalikan satu kolom.

Tipe yang didukung meliputi: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, DECIMAL(presisi,skala), CHAR, VARCHAR, ARRAY, MAP, STRUCT, dan tipe kompleks bersarang.

Pilih tipe data berdasarkan edisi tipe data proyek MaxCompute Anda. Untuk detailnya, lihat Edisi tipe data.

Contoh:

AnotasiTipe inputTipe kembalian
@Resolve('bigint,double->string')BIGINT atau DOUBLESTRING
@Resolve('*->string')Apa sajaSTRING
@Resolve('->double')Tidak adaDOUBLE
@Resolve('array<bigint>->struct<x:string, y:int>')ARRAY\<BIGINT\>STRUCT\<x:STRING, y:INT\>

Tipe data

Tulis UDAF menggunakan tipe objek Java atau tipe Writable Java. Huruf pertama nama tipe Java harus kapital (misalnya, String, bukan string). Jangan gunakan tipe primitif—tipe tersebut tidak dapat merepresentasikan nilai SQL NULL, yang dipetakan MaxCompute ke null Java.

Tipe Writable Java sebagai tipe input atau kembalian memerlukan edisi tipe data MaxCompute V2.0 atau lebih baru.
Tipe MaxComputeTipe JavaTipe Writable Java
TINYINTjava.lang.ByteByteWritable
SMALLINTjava.lang.ShortShortWritable
INTjava.lang.IntegerIntWritable
BIGINTjava.lang.LongLongWritable
FLOATjava.lang.FloatFloatWritable
DOUBLEjava.lang.DoubleDoubleWritable
DECIMALjava.math.BigDecimalBigDecimalWritable
BOOLEANjava.lang.BooleanBooleanWritable
STRINGjava.lang.StringText
VARCHARcom.aliyun.odps.data.VarcharVarcharWritable
BINARYcom.aliyun.odps.data.BinaryBytesWritable
DATEjava.sql.DateDateWritable
DATETIMEjava.util.DateDatetimeWritable
TIMESTAMPjava.sql.TimestampTimestampWritable
INTERVAL_YEAR_MONTHN/AIntervalYearMonthWritable
INTERVAL_DAY_TIMEN/AIntervalDayTimeWritable
ARRAYjava.util.ListN/A
MAPjava.util.MapN/A
STRUCTcom.aliyun.odps.data.StructN/A

Menulis UDAF

Contoh 1: rata-rata (AggrAvg)

Contoh ini mengembangkan AggrAvg, sebuah UDAF yang menghitung rata-rata kolom DOUBLE, menggunakan MaxCompute Studio.

Buffer menyimpan dua field: jumlah berjalan dan jumlah baris. iterate() mengakumulasi setiap baris input selama fase map, merge() menggabungkan buffer parsial dari worker berbeda selama fase combine, dan terminate() membagi jumlah total dengan jumlah baris selama fase reduce.

求平均值逻辑

Data input diiris dan didistribusikan ke worker. Anda dapat mengonfigurasi parameter odps.stage.mapper.split.size untuk menyesuaikan ukuran setiap irisan. Setiap worker menghitung jumlah catatan dan total dalam irisannya (hasil antara), worker mengumpulkan informasi irisan, dan output akhir menghitung r.sum / r.count sebagai rata-rata.

Prasyarat

Sebelum memulai, pastikan Anda telah:

Buat kelas UDAF

  1. Pada tab Project, navigasi ke src > main > java, klik kanan java, lalu pilih New > MaxCompute Java.

    新建Java Class

  2. Pada kotak dialog Create new MaxCompute java class, klik UDAF, masukkan nama kelas pada field Name, lalu tekan Enter. Contoh ini menggunakan AggrAvg. Jika Anda belum membuat paket, tentukan field Name dalam format packagename.classname. Sistem akan menghasilkan paket secara otomatis.

    创建Java Class

  3. Tulis kode UDAF di editor.

    编写代码

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import com.aliyun.odps.io.Text;
    import com.aliyun.odps.io.Writable;
    import com.aliyun.odps.udf.Aggregator;
    import com.aliyun.odps.udf.UDFException;
    import com.aliyun.odps.udf.annotation.Resolve;
    
    // Masukan: kolom STRING, pemisah STRING. Keluaran: STRING yang digabungkan.
    @Resolve("string,string->string")
    public class AggrConcat extends Aggregator {
    
      // Penyangga: teks yang terakumulasi dan pemisah yang digunakan di antara nilai-nilai.
      private static class ConcatBuffer implements Writable {
        private StringBuilder sb = new StringBuilder();
        private String separator = ",";
    
        @Override
        public void write(DataOutput out) throws IOException {
          // Serialisasi StringBuilder sebagai String agar tetap utuh selama transfer jaringan.
          out.writeUTF(sb.toString());
          out.writeUTF(separator);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
          sb = new StringBuilder(in.readUTF());
          separator = in.readUTF();
        }
      }
    
      private Text ret = new Text();
    
      @Override
      public Writable newBuffer() {
        return new ConcatBuffer();
      }
    
      // Fase Map: tambahkan setiap nilai yang tidak bernilai null ke penyangga.
      @Override
      public void iterate(Writable buffer, Writable[] args) throws UDFException {
        Text value = (Text) args[0];
        Text sep   = (Text) args[1];
        ConcatBuffer buf = (ConcatBuffer) buffer;
        if (value != null) {
          if (sep != null) {
            buf.separator = sep.toString();
          }
          if (buf.sb.length() > 0) {
            buf.sb.append(buf.separator);
          }
          buf.sb.append(value.toString());
        }
      }
    
      // Fase Combine: gabungkan penyangga parsial dari pekerja lain.
      @Override
      public void merge(Writable buffer, Writable partial) throws UDFException {
        ConcatBuffer buf = (ConcatBuffer) buffer;
        ConcatBuffer p   = (ConcatBuffer) partial;
        if (p.sb.length() > 0) {
          if (buf.sb.length() > 0) {
            buf.sb.append(buf.separator);
          }
          buf.sb.append(p.sb);
        }
      }
    
      // Fase Reduce: kembalikan hasil penggabungan.
      @Override
      public Writable terminate(Writable buffer) throws UDFException {
        ConcatBuffer buf = (ConcatBuffer) buffer;
        ret.set(buf.sb.toString());
        return ret;
      }
    }

Debug secara lokal

Jalankan UDAF di mesin lokal Anda untuk memverifikasi logika sebelum deployment. Untuk langkah debugging, lihat bagian "Perform a local run to debug the UDF" dalam Develop a UDF.

调试UDAF
Pengaturan parameter pada gambar di atas hanya untuk referensi.

Paket dan daftarkan UDAF

Paket kode UDAF ke dalam file JAR, unggah ke proyek MaxCompute Anda, dan buat UDAF tersebut. Untuk langkah pengemasan, lihat bagian "Procedure" dalam Package a Java program, upload the package, and create a MaxCompute UDF.

Contoh ini mendaftarkan UDAF sebagai user_udaf.

打包

Panggil UDAF

Pada Project Explorer, klik kanan proyek MaxCompute Anda untuk membuka klien MaxCompute, lalu jalankan pernyataan SQL untuk memanggil UDAF.

Diberikan tabel input berikut (my_table):

+------------+------------+
| col0       | col1       |
+------------+------------+
| 1.2        | 2.0        |
| 1.6        | 2.1        |
+------------+------------+

Jalankan pernyataan berikut:

SELECT user_udaf(col0) AS c0 FROM my_table;

Hasil:

+----+
| c0 |
+----+
| 1.4|
+----+

Contoh 2: penggabungan string (AggrConcat)

AggrAvg menggunakan buffer numerik sederhana. Contoh ini menunjukkan pola yang lebih realistis: mengumpulkan nilai string dari beberapa baris dan menggabungkannya dengan pemisah.

Buffer menyimpan StringBuilder dan string pemisah. Karena StringBuilder tidak dapat diserialisasi secara langsung, buffer menyerialisasikannya sebagai String biasa. Ini adalah pola yang harus diikuti ketika buffer Anda menyimpan tipe non-primitif—serialisasi ke bentuk yang aman untuk transmisi dalam write() dan rekonstruksi bentuk dalam memori dalam readFields().

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;

// Input: kolom STRING, pemisah STRING. Output: STRING yang digabungkan.
@Resolve("string,string->string")
public class AggrConcat extends Aggregator {

  // Buffer: teks yang terakumulasi dan pemisah yang digunakan antar nilai.
  private static class ConcatBuffer implements Writable {
    private StringBuilder sb = new StringBuilder();
    private String separator = ",";

    @Override
    public void write(DataOutput out) throws IOException {
      // Serialisasi StringBuilder sebagai String agar bertahan dalam transfer jaringan.
      out.writeUTF(sb.toString());
      out.writeUTF(separator);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      sb = new StringBuilder(in.readUTF());
      separator = in.readUTF();
    }
  }

  private Text ret = new Text();

  @Override
  public Writable newBuffer() {
    return new ConcatBuffer();
  }

  // Fase Map: tambahkan setiap nilai non-null ke dalam buffer.
  @Override
  public void iterate(Writable buffer, Writable[] args) throws UDFException {
    Text value = (Text) args[0];
    Text sep   = (Text) args[1];
    ConcatBuffer buf = (ConcatBuffer) buffer;
    if (value != null) {
      if (sep != null) {
        buf.separator = sep.toString();
      }
      if (buf.sb.length() > 0) {
        buf.sb.append(buf.separator);
      }
      buf.sb.append(value.toString());
    }
  }

  // Fase Combine: gabungkan buffer parsial dari worker lain.
  @Override
  public void merge(Writable buffer, Writable partial) throws UDFException {
    ConcatBuffer buf = (ConcatBuffer) buffer;
    ConcatBuffer p   = (ConcatBuffer) partial;
    if (p.sb.length() > 0) {
      if (buf.sb.length() > 0) {
        buf.sb.append(buf.separator);
      }
      buf.sb.append(p.sb);
    }
  }

  // Fase Reduce: kembalikan hasil penggabungan.
  @Override
  public Writable terminate(Writable buffer) throws UDFException {
    ConcatBuffer buf = (ConcatBuffer) buffer;
    ret.set(buf.sb.toString());
    return ret;
  }
}

Keputusan desain utama:

KeputusanPenjelasan
Serialisasi StringBuilder sebagai StringStringBuilder tidak dapat diserialisasi. Konversi ke String dalam write() dan bangun ulang dalam readFields().
Simpan separator dalam bufferPemisah diberikan sebagai argumen ke iterate() tetapi harus bertahan dalam serialisasi hingga merge(). Simpan dalam buffer.
Lindungi dari buffer parsial kosong dalam merge()Jika partisi worker tidak memiliki baris yang cocok, buffernya kosong. Periksa sebelum menambahkan untuk menghindari pemisah di awal.
Urutan penggabungan string tidak dijamin antar worker karena penugasan partisi fase map ditentukan oleh mesin eksekusi, bukan oleh urutan baris. Jika urutan penting, gunakan klausa ORDER BY dalam pernyataan SQL Anda dan strategi single-reducer, atau urutkan output penggabungan setelahnya.

Panggil UDAF dalam SQL

Setelah mengembangkan dan mendaftarkan UDAF Java, panggil dalam SQL MaxCompute. Untuk proses pengembangan lengkap, lihat bagian "Development process" dalam Ikhtisar.

Dua metode pemanggilan tersedia:

  • Dalam proyek: Panggil UDAF seperti fungsi bawaan.

  • Antar proyek: Panggil UDAF dari proyek B di dalam proyek A menggunakan sintaks di bawah. Untuk detail berbagi antar proyek, lihat Akses resource antar proyek berbasis paket.

    SELECT B:udf_in_other_project(arg0, arg1) AS res FROM table_t;

Batasan

Akses Internet

Secara default, MaxCompute tidak mengizinkan UDAF mengakses Internet. Untuk mengaktifkan akses Internet, ajukan formulir permohonan koneksi jaringan sesuai kebutuhan bisnis Anda. Setelah disetujui, tim dukungan teknis MaxCompute akan membantu Anda membuat koneksi tersebut. Untuk petunjuk pengisian formulir, lihat Proses koneksi jaringan.

Akses VPC

Secara default, MaxCompute tidak mengizinkan UDAF mengakses resource di virtual private cloud (VPC). Untuk mengaktifkan akses VPC, buat koneksi jaringan antara MaxCompute dan VPC. Lihat Gunakan UDF untuk mengakses resource di VPC.

Batasan pembacaan tabel

UDF, UDAF, dan fungsi bernilai tabel yang didefinisikan pengguna (UDTF) tidak dapat membaca data dari jenis tabel berikut:

  • Tabel yang telah menjalani evolusi skema

  • Tabel yang berisi tipe data kompleks

  • Tabel yang berisi tipe data JSON

  • Tabel transaksional

Catatan penggunaan

  • Jangan memaketkan kelas dengan nama yang sama tetapi logika berbeda ke dalam file JAR untuk UDAF berbeda. Jika dua UDAF berbagi nama kelas (misalnya, com.aliyun.UserFunction.class) di file JAR terpisah dan dipanggil dalam pernyataan SQL yang sama, MaxCompute akan memuat kelas dari salah satu file secara tidak terduga—menyebabkan hasil salah atau kegagalan kompilasi.

  • Tipe data Java dalam UDAF adalah tipe objek. Huruf pertama harus kapital (misalnya, String, bukan string).

  • Nilai NULL dalam SQL MaxCompute dipetakan ke null dalam Java. Jangan gunakan tipe primitif Java—tipe tersebut tidak dapat merepresentasikan null.

Lanjutan

  • Untuk menambahkan UDF yang lebih kompleks seperti fungsi bernilai tabel, lihat Ikhtisar.

  • Untuk mempelajari semua tipe data dan edisi yang didukung, lihat Edisi tipe data.