Topik ini menjelaskan cara menulis fungsi agregat yang didefinisikan pengguna (UDAF) dalam Java.
Struktur kode UDAF
Anda dapat menggunakan Maven di IntelliJ IDEA atau MaxCompute Studio untuk menulis UDAF dalam Java. Kode UDAF dapat mencakup informasi berikut:
Paket Java: opsional.
Anda dapat mengemas kelas Java yang telah didefinisikan ke dalam file JAR untuk digunakan di masa mendatang.
Kelas dasar UDAF: wajib.
Kelas UDAF yang diperlukan adalah com.aliyun.odps.udf.Aggregator dan
com.aliyun.odps.udf.annotation.Resolve. Kelascom.aliyun.odps.udf.annotation.Resolvesesuai dengan anotasi@Resolve. Kelascom.aliyun.odps.udf.UDFExceptionbersifat opsional dan digunakan untuk metode inisialisasi serta menghentikan tugas pada kelas Java. Jika Anda ingin menggunakan kelas UDAF lainnya atau tipe data kompleks, tambahkan kelas yang diperlukan sesuai petunjuk dalam Ikhtisar.Anotasi
@Resolve: wajib.Anotasi ini memiliki format
@Resolve(<signature>).signatureadalah tanda tangan fungsi yang mendefinisikan tipe data parameter input dan nilai balikan dari UDAF. UDAF tidak dapat memperoleh tanda tangan fungsi melalui fitur refleksi. Anda hanya dapat memperoleh tanda tangan fungsi melalui anotasi@Resolve, seperti@Resolve("smallint->varchar(10)"). Untuk informasi lebih lanjut tentang anotasi@Resolve, lihat Anotasi @Resolve dalam topik ini.Kelas Java kustom: wajib.
Kelas Java kustom merupakan unit organisasi dari kode UDAF. Kelas ini mendefinisikan variabel dan metode yang digunakan untuk memenuhi kebutuhan bisnis Anda.
Metode untuk mengimplementasikan kelas Java kustom: wajib.
Java UDAF harus mewarisi kelas
com.aliyun.odps.udf.Aggregatordan mengimplementasikan metode berikut:import com.aliyun.odps.udf.ContextFunction; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDFException; public abstract class Aggregator implements ContextFunction { // Metode inisialisasi. @Override public void setup(ExecutionContext ctx) throws UDFException { } // Metode terminate. @Override public void close() throws UDFException { } // Buat buffer agregasi. abstract public Writable newBuffer(); // Metode iterate. // Buffer adalah buffer agregasi, yang menyimpan data yang diagregasi pada fase tertentu. Data teragregasi merujuk pada dataset yang diperoleh setelah GROUP BY dilakukan untuk tugas Map yang berbeda. Satu buffer dibuat untuk setiap baris data yang diagregasi. // Writable[] menunjukkan satu baris data, yang menentukan kolom yang dilewati dalam kode. Misalnya, writable[0] menunjukkan kolom pertama, dan writable[1] menunjukkan kolom kedua. // args menentukan parameter yang digunakan untuk memanggil UDAF dalam SQL. Ini tidak boleh NULL, tetapi nilai dalam args bisa NULL, yang menunjukkan bahwa data input adalah NULL. abstract public void iterate(Writable buffer, Writable[] args) throws UDFException; // Metode terminate. abstract public Writable terminate(Writable buffer) throws UDFException; // Metode merge. abstract public void merge(Writable buffer, Writable partial) throws UDFException; }Metode utama yang harus diimplementasikan adalah
iterate,merge, danterminate. Metode-metode ini digunakan untuk mengimplementasikan logika inti UDAF. Selain itu, Anda harus mengimplementasikan buffer writable yang ditentukan pengguna.Buffer writable yang ditentukan pengguna mengonversi objek dalam memori menjadi urutan byte (atau protokol transmisi data lainnya) untuk penyimpanan persisten di disk dan transmisi jaringan. MaxCompute menggunakan komputasi terdistribusi untuk memproses UDAF. Oleh karena itu, MaxCompute harus melakukan serialisasi atau deserialisasi data sebelum data dapat ditransmisikan antar perangkat yang berbeda.
Saat menulis Java UDAF, Anda dapat menggunakan tipe data Java atau tipe data writable Java. Untuk informasi lebih lanjut tentang pemetaan antara tipe data yang didukung oleh proyek MaxCompute, tipe data Java, dan tipe data writable Java, lihat Tipe Data dalam topik ini.
Contoh kode:
// Kemas kelas Java yang didefinisikan ke dalam file bernama org.alidata.odps.udaf.examples.
package org.alidata.odps.udaf.examples;
// Kelas dasar UDAF.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
// Kelas Java kustom.
// Anotasi @Resolve.
@Resolve("double->double")
public class AggrAvg extends Aggregator {
// Metode yang digunakan untuk mengimplementasikan kelas Java kustom.
private static class AvgBuffer implements Writable {
private double sum = 0;
private long count = 0;
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(sum);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readDouble();
count = in.readLong();
}
}
private DoubleWritable ret = new DoubleWritable();
@Override
public Writable newBuffer() {
return new AvgBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
DoubleWritable arg = (DoubleWritable) args[0];
AvgBuffer buf = (AvgBuffer) buffer;
if (arg != null) {
buf.count += 1;
buf.sum += arg.get();
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
if (buf.count == 0) {
ret.set(0);
} else {
ret.set(buf.sum / buf.count);
}
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
AvgBuffer p = (AvgBuffer) partial;
buf.sum += p.sum;
buf.count += p.count;
}
}Dalam kode UDAF di atas, buffer yang sama digunakan untuk metode iterate dan merge. Anda dapat mengagregasi baris data input ke dalam buffer ini berdasarkan kode UDAF.
Batasan
Mengakses Internet menggunakan UDF
Secara default, MaxCompute tidak mengizinkan akses Internet menggunakan UDF. Jika Anda ingin mengakses Internet menggunakan UDF, isi formulir aplikasi koneksi jaringan formulir aplikasi koneksi jaringan sesuai kebutuhan bisnis Anda dan kirimkan aplikasi tersebut. Setelah aplikasi disetujui, tim dukungan teknis MaxCompute akan menghubungi Anda dan membantu membangun koneksi jaringan. Untuk informasi lebih lanjut tentang cara mengisi formulir aplikasi koneksi jaringan, lihat Proses Koneksi Jaringan.
Mengakses VPC menggunakan UDF
Secara default, MaxCompute tidak mengizinkan akses ke sumber daya dalam VPC menggunakan UDF. Untuk menggunakan UDF untuk mengakses sumber daya dalam VPC, Anda harus membangun koneksi jaringan antara MaxCompute dan VPC. Untuk informasi lebih lanjut tentang operasi terkait, lihat Gunakan UDF untuk Mengakses Sumber Daya dalam VPC.
Membaca data tabel menggunakan UDF, UDAF, atau UDTF
Anda tidak dapat menggunakan UDF, UDAF, atau UDTF untuk membaca data dari jenis tabel berikut:
Tabel tempat evolusi skema dilakukan.
Tabel yang berisi tipe data kompleks.
Tabel yang berisi tipe data JSON.
Tabel transaksional.
Catatan Penggunaan
Sebelum menulis Java UDAF, perhatikan hal-hal berikut:
Kami sarankan Anda tidak mengemas kelas dengan nama yang sama tetapi logika berbeda ke dalam file JAR UDAF yang berbeda. Sebagai contoh, file JAR UDAF 1 bernama udaf1.jar dan file JAR UDAF 2 bernama udaf2.jar. Kedua file JAR tersebut berisi kelas bernama com.aliyun.UserFunction.class, tetapi kelas tersebut memiliki logika yang berbeda dalam file-file tersebut. Jika UDAF 1 dan UDAF 2 dipanggil dalam pernyataan SQL yang sama, MaxCompute memuat
com.aliyun.UserFunction.classdari salah satu dari dua file tersebut. Akibatnya, UDAF tidak dapat berjalan sesuai harapan dan kompilasi mungkin gagal.Tipe data parameter input atau nilai balikan dalam Java UDAF adalah objek. Huruf pertama dari tipe data yang Anda tentukan dalam kode Java UDAF harus huruf besar, seperti String.
Nilai NULL dalam MaxCompute SQL direpresentasikan oleh NULL dalam Java. Tipe data primitif dalam Java tidak dapat merepresentasikan nilai NULL dalam MaxCompute SQL. Oleh karena itu, tipe data ini tidak dapat digunakan.
Anotasi @Resolve
Format anotasi @Resolve:
@Resolve(<signature>)Parameter signature adalah string yang menentukan tipe data parameter input dan nilai balikan. Saat menjalankan UDAF, tipe data parameter input dan nilai balikan UDAF harus konsisten dengan tipe data yang ditentukan dalam tanda tangan fungsi. Konsistensi tipe data diperiksa selama penguraian semantik. Jika tipe data tidak konsisten, kesalahan akan dikembalikan. Format tanda tangan:
'arg_type_list -> type'
arg_type_list: menentukan tipe data parameter input. Jika beberapa parameter input digunakan, tentukan beberapa tipe data dan pisahkan dengan koma (,). Tipe data berikut didukung: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, DECIMAL(presisi,skala), CHAR, VARCHAR, tipe data kompleks (ARRAY, MAP, dan STRUCT), dan tipe data kompleks bersarang.arg_type_listdapat direpresentasikan oleh asterisk (*) atau dibiarkan kosong ('').Jika
arg_type_listdirepresentasikan oleh asterisk (*), jumlah parameter input acak diperbolehkan.Jika
arg_type_listdibiarkan kosong (''), tidak ada parameter input yang digunakan.
Untuk informasi lebih lanjut tentang ekstensi sintaksis anotasi @Resolve, lihat Parameter Dinamis UDAF dan UDTF.
typemenentukan tipe data nilai balikan. Untuk UDAF, hanya satu kolom nilai yang dikembalikan. Tipe data berikut didukung: BIGINT, STRING, DOUBLE, BOOLEAN, DATETIME, DECIMAL, FLOAT, BINARY, DATE, dan DECIMAL(presisi, skala). Tipe data kompleks seperti ARRAY, MAP, dan STRUCT, serta tipe data kompleks bersarang juga didukung.
Saat menulis kode UDAF, Anda dapat memilih tipe data berdasarkan edisi tipe data yang digunakan oleh proyek MaxCompute Anda. Untuk informasi lebih lanjut tentang edisi tipe data dan tipe data yang didukung oleh setiap edisi, lihat Edisi Tipe Data.
Tabel berikut memberikan contoh anotasi @Resolve.
Anotasi @Resolve | Deskripsi |
| Tipe data parameter input adalah BIGINT atau DOUBLE, dan tipe data nilai balikan adalah STRING. |
| Jumlah parameter input acak digunakan dan tipe data nilai balikan adalah STRING. |
| Tidak ada parameter input yang digunakan dan tipe data nilai balikan adalah DOUBLE. |
| Tipe data parameter input adalah ARRAY<BIGINT> dan tipe data nilai balikan adalah STRUCT<x:STRING, y:INT>. |
Tipe data
Dalam MaxCompute, edisi tipe data yang berbeda mendukung tipe data yang berbeda. Dalam MaxCompute V2.0 dan versi berikutnya, lebih banyak tipe data dan tipe data kompleks seperti ARRAY, MAP, dan STRUCT didukung. Untuk informasi lebih lanjut tentang edisi tipe data MaxCompute, lihat Edisi Tipe Data.
Tabel berikut menjelaskan pemetaan antara tipe data yang didukung oleh proyek MaxCompute, tipe data Java, dan tipe data writable Java. Anda harus menulis Java UDAF berdasarkan pemetaan ini untuk memastikan konsistensi tipe data.
Tipe MaxCompute | Tipe Java | Tipe Java Writable |
TINYINT | java.lang.Byte | ByteWritable |
SMALLINT | java.lang.Short | ShortWritable |
INT | java.lang.Integer | IntWritable |
BIGINT | java.lang.Long | LongWritable |
FLOAT | java.lang.Float | FloatWritable |
DOUBLE | java.lang.Double | DoubleWritable |
DECIMAL | java.math.BigDecimal | BigDecimalWritable |
BOOLEAN | java.lang.Boolean | BooleanWritable |
STRING | java.lang.String | Text |
VARCHAR | com.aliyun.odps.data.Varchar | VarcharWritable |
BINARY | com.aliyun.odps.data.Binary | BytesWritable |
DATE | java.sql.Date | DateWritable |
DATETIME | java.util.Date | DatetimeWritable |
TIMESTAMP | java.sql.Timestamp | TimestampWritable |
INTERVAL_YEAR_MONTH | N/A | IntervalYearMonthWritable |
INTERVAL_DAY_TIME | N/A | IntervalDayTimeWritable |
ARRAY | java.util.List | N/A |
MAP | java.util.Map | N/A |
STRUCT | com.aliyun.odps.data.Struct | N/A |
Parameter input atau nilai balikan UDAF dapat berupa tipe data writable Java hanya jika proyek MaxCompute Anda menggunakan edisi tipe data MaxCompute V2.0.
Instruksi
Setelah mengembangkan Java UDAF, Anda dapat menggunakan MaxCompute SQL untuk memanggil UDAF ini. Untuk informasi lebih lanjut tentang cara mengembangkan Java UDAF, lihat bagian "Proses Pengembangan" dalam Ikhtisar. Anda dapat menggunakan salah satu metode berikut untuk memanggil Java UDAF:
Gunakan UDF dalam proyek MaxCompute: Metodenya mirip dengan penggunaan fungsi bawaan.
Gunakan UDF lintas proyek: Gunakan UDF Proyek B dalam Proyek A. Contoh pernyataan berikut menunjukkan contohnya:
select B:udf_in_other_project(arg0, arg1) as res from table_t;. Untuk informasi lebih lanjut tentang berbagi lintas proyek, lihat Akses Sumber Daya Lintas Proyek Berdasarkan Paket.
Untuk informasi lebih lanjut tentang cara menggunakan MaxCompute Studio untuk mengembangkan dan memanggil Java UDAF, lihat Contoh dalam topik ini.
Contoh
Contoh ini menjelaskan cara mengembangkan UDAF bernama AggrAvg menggunakan MaxCompute Studio. UDAF AggrAvg digunakan untuk menghitung nilai rata-rata. Gambar berikut menunjukkan logika dari UDAF AggrAvg.

Potong data input. MaxCompute memotong data input menjadi ukuran tertentu berdasarkan alur kerja pemrosesan MapReduce. Ukuran setiap potongan sesuai untuk pekerja agar dapat menyelesaikan perhitungan dalam periode waktu tertentu.
Anda dapat mengonfigurasi parameter
odps.stage.mapper.split.sizeuntuk menyesuaikan ukuran potongan. Untuk informasi lebih lanjut tentang logika pemotongan data, lihat bagian "Proses" dalam Ikhtisar.Setiap pekerja menghitung jumlah catatan data dan total volume data dalam satu potongan. Anda dapat menggunakan jumlah catatan data dan total volume data di setiap potongan sebagai hasil antara.
Setiap pekerja mengumpulkan informasi dari setiap potongan yang dihasilkan pada Langkah 2.
Dalam output akhir,
r.sum/r.countadalah nilai rata-rata dari semua data input.
Untuk menggunakan MaxCompute Studio untuk mengembangkan dan memanggil Java UDAF, lakukan langkah-langkah berikut:
Buat persiapan.
Sebelum menggunakan MaxCompute Studio untuk mengembangkan dan men-debug UDF, Anda harus menginstal MaxCompute Studio dan menghubungkan MaxCompute Studio ke proyek MaxCompute. Untuk informasi lebih lanjut tentang cara menginstal MaxCompute Studio dan menghubungkan MaxCompute Studio ke proyek MaxCompute, lihat topik-topik berikut:
Tulis kode UDAF.
Di panel navigasi sisi kiri tab Project, pilih , klik kanan java, lalu pilih .
Di kotak dialog Create new MaxCompute java class, klik UDAF, masukkan nama kelas di bidang Name, dan tekan Enter. Dalam contoh ini, kelas Java diberi nama AggrAvg.

Name: nama kelas Java MaxCompute. Jika Anda belum membuat paket, tentukan parameter ini dalam format packagename.classname. Sistem akan secara otomatis menghasilkan paket.
Tulis kode di editor kode.
Contoh kode UDAF:import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import com.aliyun.odps.io.DoubleWritable; import com.aliyun.odps.io.Writable; import com.aliyun.odps.udf.Aggregator; import com.aliyun.odps.udf.UDFException; import com.aliyun.odps.udf.annotation.Resolve; @Resolve("double->double") public class AggrAvg extends Aggregator { private static class AvgBuffer implements Writable { private double sum = 0; private long count = 0; @Override public void write(DataOutput out) throws IOException { out.writeDouble(sum); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { sum = in.readDouble(); count = in.readLong(); } } private DoubleWritable ret = new DoubleWritable(); @Override public Writable newBuffer() { return new AvgBuffer(); } @Override public void iterate(Writable buffer, Writable[] args) throws UDFException { DoubleWritable arg = (DoubleWritable) args[0]; AvgBuffer buf = (AvgBuffer) buffer; if (arg != null) { buf.count += 1; buf.sum += arg.get(); } } @Override public Writable terminate(Writable buffer) throws UDFException { AvgBuffer buf = (AvgBuffer) buffer; if (buf.count == 0) { ret.set(0); } else { ret.set(buf.sum / buf.count); } return ret; } @Override public void merge(Writable buffer, Writable partial) throws UDFException { AvgBuffer buf = (AvgBuffer) buffer; AvgBuffer p = (AvgBuffer) partial; buf.sum += p.sum; buf.count += p.count; } }
Debug UDAF di mesin lokal Anda dan verifikasi bahwa kode berjalan sesuai harapan.
Untuk informasi lebih lanjut tentang operasi debugging, lihat bagian "Lakukan run lokal untuk men-debug UDF" dalam Kembangkan UDF.
CatatanPengaturan parameter pada gambar di atas hanya untuk referensi.
Kemas kode UDAF ke dalam file JAR, unggah file tersebut ke proyek MaxCompute Anda, dan buat UDAF. Dalam contoh ini, UDAF bernama
user_udafdibuat.Untuk informasi lebih lanjut tentang cara mengemas UDAF, lihat bagian "Prosedur" dalam Kemas Program Java, Unggah Paket, dan Buat MaxCompute UDF.

Di panel navigasi kiri MaxCompute Studio, klik Project Explorer. Klik kanan proyek MaxCompute Anda untuk memulai klien MaxCompute, dan jalankan pernyataan SQL untuk memanggil UDAF yang telah dibuat.
Contoh berikut menunjukkan struktur data tabel my_table yang ingin Anda kueri.
+------------+------------+ | col0 | col1 | +------------+------------+ | 1.2 | 2.0 | | 1.6 | 2.1 | +------------+------------+Jalankan pernyataan SQL berikut untuk memanggil UDAF:
select user_udaf(col0) as c0 from my_table;Hasil berikut dikembalikan:
+----+ | c0 | +----+ | 1.4| +----+