全部产品
Search
文档中心

MaxCompute:Java UDAFs

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menulis fungsi agregat yang didefinisikan pengguna (UDAF) dalam Java.

Struktur kode UDAF

Anda dapat menggunakan Maven di IntelliJ IDEA atau MaxCompute Studio untuk menulis UDAF dalam Java. Kode UDAF dapat mencakup informasi berikut:

  • Paket Java: opsional.

    Anda dapat mengemas kelas Java yang telah didefinisikan ke dalam file JAR untuk digunakan di masa mendatang.

  • Kelas dasar UDAF: wajib.

    Kelas UDAF yang diperlukan adalah com.aliyun.odps.udf.Aggregator dan com.aliyun.odps.udf.annotation.Resolve. Kelas com.aliyun.odps.udf.annotation.Resolve sesuai dengan anotasi @Resolve. Kelas com.aliyun.odps.udf.UDFException bersifat opsional dan digunakan untuk metode inisialisasi serta menghentikan tugas pada kelas Java. Jika Anda ingin menggunakan kelas UDAF lainnya atau tipe data kompleks, tambahkan kelas yang diperlukan sesuai petunjuk dalam Ikhtisar.

  • Anotasi @Resolve: wajib.

    Anotasi ini memiliki format @Resolve(<signature>). signature adalah tanda tangan fungsi yang mendefinisikan tipe data parameter input dan nilai balikan dari UDAF. UDAF tidak dapat memperoleh tanda tangan fungsi melalui fitur refleksi. Anda hanya dapat memperoleh tanda tangan fungsi melalui anotasi @Resolve, seperti @Resolve("smallint->varchar(10)"). Untuk informasi lebih lanjut tentang anotasi @Resolve, lihat Anotasi @Resolve dalam topik ini.

  • Kelas Java kustom: wajib.

    Kelas Java kustom merupakan unit organisasi dari kode UDAF. Kelas ini mendefinisikan variabel dan metode yang digunakan untuk memenuhi kebutuhan bisnis Anda.

  • Metode untuk mengimplementasikan kelas Java kustom: wajib.

    Java UDAF harus mewarisi kelas com.aliyun.odps.udf.Aggregator dan mengimplementasikan metode berikut:

    import com.aliyun.odps.udf.ContextFunction;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.UDFException;
    public abstract class Aggregator implements ContextFunction {
        // Metode inisialisasi. 
        @Override
        public void setup(ExecutionContext ctx) throws UDFException {
        }
        // Metode terminate. 
        @Override
        public void close() throws UDFException {
        }
        // Buat buffer agregasi. 
        abstract public Writable newBuffer();
        // Metode iterate. 
        // Buffer adalah buffer agregasi, yang menyimpan data yang diagregasi pada fase tertentu. Data teragregasi merujuk pada dataset yang diperoleh setelah GROUP BY dilakukan untuk tugas Map yang berbeda. Satu buffer dibuat untuk setiap baris data yang diagregasi. 
        // Writable[] menunjukkan satu baris data, yang menentukan kolom yang dilewati dalam kode. Misalnya, writable[0] menunjukkan kolom pertama, dan writable[1] menunjukkan kolom kedua. 
        // args menentukan parameter yang digunakan untuk memanggil UDAF dalam SQL. Ini tidak boleh NULL, tetapi nilai dalam args bisa NULL, yang menunjukkan bahwa data input adalah NULL. 
        abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
        // Metode terminate. 
        abstract public Writable terminate(Writable buffer) throws UDFException;
        // Metode merge. 
        abstract public void merge(Writable buffer, Writable partial) throws UDFException;
    }

    Metode utama yang harus diimplementasikan adalah iterate, merge, dan terminate. Metode-metode ini digunakan untuk mengimplementasikan logika inti UDAF. Selain itu, Anda harus mengimplementasikan buffer writable yang ditentukan pengguna.

    Buffer writable yang ditentukan pengguna mengonversi objek dalam memori menjadi urutan byte (atau protokol transmisi data lainnya) untuk penyimpanan persisten di disk dan transmisi jaringan. MaxCompute menggunakan komputasi terdistribusi untuk memproses UDAF. Oleh karena itu, MaxCompute harus melakukan serialisasi atau deserialisasi data sebelum data dapat ditransmisikan antar perangkat yang berbeda.

    Saat menulis Java UDAF, Anda dapat menggunakan tipe data Java atau tipe data writable Java. Untuk informasi lebih lanjut tentang pemetaan antara tipe data yang didukung oleh proyek MaxCompute, tipe data Java, dan tipe data writable Java, lihat Tipe Data dalam topik ini.

Contoh kode:

// Kemas kelas Java yang didefinisikan ke dalam file bernama org.alidata.odps.udaf.examples. 
package org.alidata.odps.udaf.examples;
// Kelas dasar UDAF. 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
// Kelas Java kustom. 
// Anotasi @Resolve. 
@Resolve("double->double")
public class AggrAvg extends Aggregator {
// Metode yang digunakan untuk mengimplementasikan kelas Java kustom. 
  private static class AvgBuffer implements Writable {
    private double sum = 0;
    private long count = 0;
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeDouble(sum);
      out.writeLong(count);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
      sum = in.readDouble();
      count = in.readLong();
    }
  }
  private DoubleWritable ret = new DoubleWritable();
  @Override
  public Writable newBuffer() {
    return new AvgBuffer();
  }
  @Override
  public void iterate(Writable buffer, Writable[] args) throws UDFException {
    DoubleWritable arg = (DoubleWritable) args[0];
    AvgBuffer buf = (AvgBuffer) buffer;
    if (arg != null) {
      buf.count += 1;
      buf.sum += arg.get();
    }
  }
  @Override
  public Writable terminate(Writable buffer) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    if (buf.count == 0) {
      ret.set(0);
    } else {
      ret.set(buf.sum / buf.count);
    }
    return ret;
  }
  @Override
  public void merge(Writable buffer, Writable partial) throws UDFException {
    AvgBuffer buf = (AvgBuffer) buffer;
    AvgBuffer p = (AvgBuffer) partial;
    buf.sum += p.sum;
    buf.count += p.count;
  }
}
Catatan

Dalam kode UDAF di atas, buffer yang sama digunakan untuk metode iterate dan merge. Anda dapat mengagregasi baris data input ke dalam buffer ini berdasarkan kode UDAF.

Batasan

  • Mengakses Internet menggunakan UDF

    Secara default, MaxCompute tidak mengizinkan akses Internet menggunakan UDF. Jika Anda ingin mengakses Internet menggunakan UDF, isi formulir aplikasi koneksi jaringan formulir aplikasi koneksi jaringan sesuai kebutuhan bisnis Anda dan kirimkan aplikasi tersebut. Setelah aplikasi disetujui, tim dukungan teknis MaxCompute akan menghubungi Anda dan membantu membangun koneksi jaringan. Untuk informasi lebih lanjut tentang cara mengisi formulir aplikasi koneksi jaringan, lihat Proses Koneksi Jaringan.

  • Mengakses VPC menggunakan UDF

    Secara default, MaxCompute tidak mengizinkan akses ke sumber daya dalam VPC menggunakan UDF. Untuk menggunakan UDF untuk mengakses sumber daya dalam VPC, Anda harus membangun koneksi jaringan antara MaxCompute dan VPC. Untuk informasi lebih lanjut tentang operasi terkait, lihat Gunakan UDF untuk Mengakses Sumber Daya dalam VPC.

  • Membaca data tabel menggunakan UDF, UDAF, atau UDTF

    Anda tidak dapat menggunakan UDF, UDAF, atau UDTF untuk membaca data dari jenis tabel berikut:

    • Tabel tempat evolusi skema dilakukan.

    • Tabel yang berisi tipe data kompleks.

    • Tabel yang berisi tipe data JSON.

    • Tabel transaksional.

Catatan Penggunaan

Sebelum menulis Java UDAF, perhatikan hal-hal berikut:

  • Kami sarankan Anda tidak mengemas kelas dengan nama yang sama tetapi logika berbeda ke dalam file JAR UDAF yang berbeda. Sebagai contoh, file JAR UDAF 1 bernama udaf1.jar dan file JAR UDAF 2 bernama udaf2.jar. Kedua file JAR tersebut berisi kelas bernama com.aliyun.UserFunction.class, tetapi kelas tersebut memiliki logika yang berbeda dalam file-file tersebut. Jika UDAF 1 dan UDAF 2 dipanggil dalam pernyataan SQL yang sama, MaxCompute memuat com.aliyun.UserFunction.class dari salah satu dari dua file tersebut. Akibatnya, UDAF tidak dapat berjalan sesuai harapan dan kompilasi mungkin gagal.

  • Tipe data parameter input atau nilai balikan dalam Java UDAF adalah objek. Huruf pertama dari tipe data yang Anda tentukan dalam kode Java UDAF harus huruf besar, seperti String.

  • Nilai NULL dalam MaxCompute SQL direpresentasikan oleh NULL dalam Java. Tipe data primitif dalam Java tidak dapat merepresentasikan nilai NULL dalam MaxCompute SQL. Oleh karena itu, tipe data ini tidak dapat digunakan.

Anotasi @Resolve

Format anotasi @Resolve:

@Resolve(<signature>)

Parameter signature adalah string yang menentukan tipe data parameter input dan nilai balikan. Saat menjalankan UDAF, tipe data parameter input dan nilai balikan UDAF harus konsisten dengan tipe data yang ditentukan dalam tanda tangan fungsi. Konsistensi tipe data diperiksa selama penguraian semantik. Jika tipe data tidak konsisten, kesalahan akan dikembalikan. Format tanda tangan:

'arg_type_list -> type'

  • arg_type_list: menentukan tipe data parameter input. Jika beberapa parameter input digunakan, tentukan beberapa tipe data dan pisahkan dengan koma (,). Tipe data berikut didukung: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, DECIMAL(presisi,skala), CHAR, VARCHAR, tipe data kompleks (ARRAY, MAP, dan STRUCT), dan tipe data kompleks bersarang.

    arg_type_list dapat direpresentasikan oleh asterisk (*) atau dibiarkan kosong ('').

    • Jika arg_type_list direpresentasikan oleh asterisk (*), jumlah parameter input acak diperbolehkan.

    • Jika arg_type_list dibiarkan kosong (''), tidak ada parameter input yang digunakan.

    Untuk informasi lebih lanjut tentang ekstensi sintaksis anotasi @Resolve, lihat Parameter Dinamis UDAF dan UDTF.

  • type menentukan tipe data nilai balikan. Untuk UDAF, hanya satu kolom nilai yang dikembalikan. Tipe data berikut didukung: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, dan DECIMAL(presisi, skala). Tipe data kompleks seperti ARRAY, MAP, dan STRUCT, serta tipe data kompleks bersarang juga didukung.

Catatan

Saat menulis kode UDAF, Anda dapat memilih tipe data berdasarkan edisi tipe data yang digunakan oleh proyek MaxCompute Anda. Untuk informasi lebih lanjut tentang edisi tipe data dan tipe data yang didukung oleh setiap edisi, lihat Edisi Tipe Data.

Tabel berikut memberikan contoh anotasi @Resolve.

Anotasi @Resolve

Deskripsi

@Resolve('bigint,double->string')

Tipe data parameter input adalah BIGINT atau DOUBLE, dan tipe data nilai balikan adalah STRING.

@Resolve('*->string')

Jumlah parameter input acak digunakan dan tipe data nilai balikan adalah STRING.

@Resolve('->double')

Tidak ada parameter input yang digunakan dan tipe data nilai balikan adalah DOUBLE.

@Resolve('array<bigint>->struct<x:string, y:int>')

Tipe data parameter input adalah ARRAY<BIGINT> dan tipe data nilai balikan adalah STRUCT<x:STRING, y:INT>.

Tipe data

Dalam MaxCompute, edisi tipe data yang berbeda mendukung tipe data yang berbeda. Dalam MaxCompute V2.0 dan versi berikutnya, lebih banyak tipe data dan tipe data kompleks seperti ARRAY, MAP, dan STRUCT didukung. Untuk informasi lebih lanjut tentang edisi tipe data MaxCompute, lihat Edisi Tipe Data.

Tabel berikut menjelaskan pemetaan antara tipe data yang didukung oleh proyek MaxCompute, tipe data Java, dan tipe data writable Java. Anda harus menulis Java UDAF berdasarkan pemetaan ini untuk memastikan konsistensi tipe data.

Tipe MaxCompute

Tipe Java

Tipe Java Writable

TINYINT

java.lang.Byte

ByteWritable

SMALLINT

java.lang.Short

ShortWritable

INT

java.lang.Integer

IntWritable

BIGINT

java.lang.Long

LongWritable

FLOAT

java.lang.Float

FloatWritable

DOUBLE

java.lang.Double

DoubleWritable

DECIMAL

java.math.BigDecimal

BigDecimalWritable

BOOLEAN

java.lang.Boolean

BooleanWritable

STRING

java.lang.String

Text

VARCHAR

com.aliyun.odps.data.Varchar

VarcharWritable

BINARY

com.aliyun.odps.data.Binary

BytesWritable

DATE

java.sql.Date

DateWritable

DATETIME

java.util.Date

DatetimeWritable

TIMESTAMP

java.sql.Timestamp

TimestampWritable

INTERVAL_YEAR_MONTH

N/A

IntervalYearMonthWritable

INTERVAL_DAY_TIME

N/A

IntervalDayTimeWritable

ARRAY

java.util.List

N/A

MAP

java.util.Map

N/A

STRUCT

com.aliyun.odps.data.Struct

N/A

Catatan

Parameter input atau nilai balikan UDAF dapat berupa tipe data writable Java hanya jika proyek MaxCompute Anda menggunakan edisi tipe data MaxCompute V2.0.

Instruksi

Setelah mengembangkan Java UDAF, Anda dapat menggunakan MaxCompute SQL untuk memanggil UDAF ini. Untuk informasi lebih lanjut tentang cara mengembangkan Java UDAF, lihat bagian "Proses Pengembangan" dalam Ikhtisar. Anda dapat menggunakan salah satu metode berikut untuk memanggil Java UDAF:

  • Gunakan UDF dalam proyek MaxCompute: Metodenya mirip dengan penggunaan fungsi bawaan.

  • Gunakan UDF lintas proyek: Gunakan UDF Proyek B dalam Proyek A. Contoh pernyataan berikut menunjukkan contohnya: select B:udf_in_other_project(arg0, arg1) as res from table_t;. Untuk informasi lebih lanjut tentang berbagi lintas proyek, lihat Akses Sumber Daya Lintas Proyek Berdasarkan Paket.

Untuk informasi lebih lanjut tentang cara menggunakan MaxCompute Studio untuk mengembangkan dan memanggil Java UDAF, lihat Contoh dalam topik ini.

Contoh

Contoh ini menjelaskan cara mengembangkan UDAF bernama AggrAvg menggunakan MaxCompute Studio. UDAF AggrAvg digunakan untuk menghitung nilai rata-rata. Gambar berikut menunjukkan logika dari UDAF AggrAvg.

求平均值逻辑

  1. Potong data input. MaxCompute memotong data input menjadi ukuran tertentu berdasarkan alur kerja pemrosesan MapReduce. Ukuran setiap potongan sesuai untuk pekerja agar dapat menyelesaikan perhitungan dalam periode waktu tertentu.

    Anda dapat mengonfigurasi parameter odps.stage.mapper.split.size untuk menyesuaikan ukuran potongan. Untuk informasi lebih lanjut tentang logika pemotongan data, lihat bagian "Proses" dalam Ikhtisar.

  2. Setiap pekerja menghitung jumlah catatan data dan total volume data dalam satu potongan. Anda dapat menggunakan jumlah catatan data dan total volume data di setiap potongan sebagai hasil antara.

  3. Setiap pekerja mengumpulkan informasi dari setiap potongan yang dihasilkan pada Langkah 2.

  4. Dalam output akhir, r.sum/r.count adalah nilai rata-rata dari semua data input.

Untuk menggunakan MaxCompute Studio untuk mengembangkan dan memanggil Java UDAF, lakukan langkah-langkah berikut:

  1. Buat persiapan.

    Sebelum menggunakan MaxCompute Studio untuk mengembangkan dan men-debug UDF, Anda harus menginstal MaxCompute Studio dan menghubungkan MaxCompute Studio ke proyek MaxCompute. Untuk informasi lebih lanjut tentang cara menginstal MaxCompute Studio dan menghubungkan MaxCompute Studio ke proyek MaxCompute, lihat topik-topik berikut:

    1. Instal MaxCompute Studio

    2. Hubungkan ke Proyek MaxCompute

    3. Buat Modul Java MaxCompute

  2. Tulis kode UDAF.

    1. Di panel navigasi sisi kiri tab Project, pilih src > main > java, klik kanan java, lalu pilih New > MaxCompute Java.新建Java Class

    2. Di kotak dialog Create new MaxCompute java class, klik UDAF, masukkan nama kelas di bidang Name, dan tekan Enter. Dalam contoh ini, kelas Java diberi nama AggrAvg.创建Java Class

      Name: nama kelas Java MaxCompute. Jika Anda belum membuat paket, tentukan parameter ini dalam format packagename.classname. Sistem akan secara otomatis menghasilkan paket.

    3. Tulis kode di editor kode. 编写代码Contoh kode UDAF:

      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      import com.aliyun.odps.io.DoubleWritable;
      import com.aliyun.odps.io.Writable;
      import com.aliyun.odps.udf.Aggregator;
      import com.aliyun.odps.udf.UDFException;
      import com.aliyun.odps.udf.annotation.Resolve;
      @Resolve("double->double")
      public class AggrAvg extends Aggregator {
        private static class AvgBuffer implements Writable {
          private double sum = 0;
          private long count = 0;
          @Override
          public void write(DataOutput out) throws IOException {
            out.writeDouble(sum);
            out.writeLong(count);
          }
          @Override
          public void readFields(DataInput in) throws IOException {
            sum = in.readDouble();
            count = in.readLong();
          }
        }
        private DoubleWritable ret = new DoubleWritable();
        @Override
        public Writable newBuffer() {
          return new AvgBuffer();
        }
        @Override
        public void iterate(Writable buffer, Writable[] args) throws UDFException {
          DoubleWritable arg = (DoubleWritable) args[0];
          AvgBuffer buf = (AvgBuffer) buffer;
          if (arg != null) {
            buf.count += 1;
            buf.sum += arg.get();
          }
        }
        @Override
        public Writable terminate(Writable buffer) throws UDFException {
          AvgBuffer buf = (AvgBuffer) buffer;
          if (buf.count == 0) {
            ret.set(0);
          } else {
            ret.set(buf.sum / buf.count);
          }
          return ret;
        }
        @Override
        public void merge(Writable buffer, Writable partial) throws UDFException {
          AvgBuffer buf = (AvgBuffer) buffer;
          AvgBuffer p = (AvgBuffer) partial;
          buf.sum += p.sum;
          buf.count += p.count;
        }
      }
  3. Debug UDAF di mesin lokal Anda dan verifikasi bahwa kode berjalan sesuai harapan.

    Untuk informasi lebih lanjut tentang operasi debugging, lihat bagian "Lakukan run lokal untuk men-debug UDF" dalam Kembangkan UDF.

    调试UDAF

    Catatan

    Pengaturan parameter pada gambar di atas hanya untuk referensi.

  4. Kemas kode UDAF ke dalam file JAR, unggah file tersebut ke proyek MaxCompute Anda, dan buat UDAF. Dalam contoh ini, UDAF bernama user_udaf dibuat.

    Untuk informasi lebih lanjut tentang cara mengemas UDAF, lihat bagian "Prosedur" dalam Kemas Program Java, Unggah Paket, dan Buat MaxCompute UDF.

    打包

  5. Di panel navigasi kiri MaxCompute Studio, klik Project Explorer. Klik kanan proyek MaxCompute Anda untuk memulai klien MaxCompute, dan jalankan pernyataan SQL untuk memanggil UDAF yang telah dibuat.

    Contoh berikut menunjukkan struktur data tabel my_table yang ingin Anda kueri.

    +------------+------------+
    | col0       | col1       |
    +------------+------------+
    | 1.2        | 2.0        |
    | 1.6        | 2.1        |
    +------------+------------+

    Jalankan pernyataan SQL berikut untuk memanggil UDAF:

    select user_udaf(col0) as c0 from my_table;

    Hasil berikut dikembalikan:

    +----+
    | c0 |
    +----+
    | 1.4|
    +----+