All Products
Search
Document Center

Realtime Compute for Apache Flink:Java UDAFs

Last Updated:Mar 10, 2026

Topik ini menjelaskan cara membuat, mendaftarkan, dan menggunakan fungsi agregat yang didefinisikan pengguna (user-defined aggregate function/UDAF) di Realtime Compute for Apache Flink.

Definisi

UDAF mengagregasi beberapa nilai menjadi satu nilai, membentuk pemetaan many-to-one antara input dan output. Beberapa nilai input diagregasi untuk menghasilkan satu nilai output. Untuk informasi selengkapnya, lihat User-defined Functions.

Catatan

User-defined Functions dan ASI_UDX_Demo disediakan di situs web pihak ketiga. Akses ke situs tersebut mungkin gagal atau mengalami penundaan.

Buat UDAF

Catatan

Flink menyediakan contoh user-defined function (UDF) untuk membantu Anda mengembangkan layanan dengan cepat. Contoh tersebut mencakup implementasi untuk UDSF, user-defined aggregate functions (UDAF), dan user-defined table-valued functions (UDTF). Lingkungan pengembangan telah dikonfigurasi sebelumnya dalam contoh ini, sehingga tidak diperlukan pengaturan tambahan.

  1. Unduh dan ekstrak ASI_UDX_Demo ke mesin lokal Anda.

    Setelah mengekstrak paket tersebut, folder ASI_UDX-main akan dihasilkan. Folder tersebut berisi item-item berikut:

    • pom.xml: file konfigurasi tingkat proyek yang menjelaskan koordinat Maven, dependensi, aturan yang harus diikuti pengembang, sistem manajemen cacat, organisasi, lisensi proyek, serta semua faktor terkait proyek lainnya.

    • \ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java: kode Java untuk contoh UDAF.

  2. Di IntelliJ IDEA, pilih File > Open, lalu pilih folder ASI_UDX-main yang telah diekstrak.

  3. Klik ganda file pom.xml di direktori \ASI_UDX-main\ dan konfigurasikan informasi dalam file tersebut sesuai kebutuhan.

    Dalam contoh ini, file pom.xml berisi dependensi minimum yang diperlukan untuk mengembangkan user-defined functions untuk Flink 1.12. Jika bisnis Anda:

    • Tidak memiliki dependensi lain, Anda dapat melanjutkan ke langkah berikutnya tanpa mengonfigurasi file pom.xml.

    • Memiliki dependensi lain, tambahkan dependensi yang diperlukan ke file pom.xml.

    Contoh berikut menunjukkan dependensi minimum untuk Flink 1.12:

     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
             <version>1.12.7</version>
             <scope>provided</scope>
         </dependency>
    </dependencies>
    Catatan

    Kami menyarankan Anda menggunakan versi minor terbaru dari versi utama Apache Flink yang sesuai dengan versi Ververica Runtime (VVR) pada penerapan Anda. Untuk informasi lebih lanjut tentang pemetaan versi antara VVR dan Apache Flink, lihat Ikhtisar.

  4. Klik ganda \ASI_UDX-main\src\main\java\ASI_UDAF untuk membukanya, lalu konfigurasikan ASI_UDAF.java sesuai kebutuhan.

    Contoh ini menunjukkan kode untuk operasi penjumlahan kumulatif dalam file ASI_UDAF.java.

    package ASI_UDAF;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.Iterator;
    
    public class ASI_UDAF{
        public static class AccSum{
            public long sum;
        }
    
        public static class MySum extends AggregateFunction<Long, AccSum>{
    
            @Override
            public Long getValue(AccSum acSum){
                return acSum.sum;
            }
    
            @Override
            public AccSum createAccumulator(){
                AccSum acCount= new AccSum();
                acCount.sum=0;
                return acCount;
            }
    
            public void accumulate(AccSum acc,long num){
                acc.sum += num;
            }
    
            /**
            * Mendukung penarikan kembali paket yang dihasilkan oleh operator hulu.
            */
            public void retract(AccSum acc,long num){
                acc.sum -= num;
            }
    
            /**
            * Mendukung optimasi agregasi dua tahap local-global.
            */
            public void merge(AccSum acc,Iterable<AccSum> it){
                Iterator<AccSum> iter=it.iterator();
                while(iter.hasNext()){
                    AccSum accSum=iter.next();
                    if(null!=accSum){
                        acc.sum+=accSum.sum;
                    }
                }
            }
        }
    }

    UDAF ini melakukan operasi penjumlahan kumulatif sederhana. Misalnya, jika nilai input untuk kunci grup yang sama (bidang GROUP BY) adalah 1, 2, dan 3, output dapat berupa salah satu dari dua kasus berikut:

    • Jika miniBatch tidak diaktifkan, nilai kembali adalah 1, 3, dan 6. Secara default, miniBatch tidak diaktifkan.

    • Jika miniBatch diaktifkan, nilai kembali adalah 6 dan jumlah nilai kembali antara tidak dapat ditentukan karena bervariasi berdasarkan pengaturan miniBatch dan distribusi data masukan.

    Catatan

    Untuk informasi selengkapnya tentang miniBatch, lihat Optimize Flink SQL.

  5. Buka direktori tempat file pom.xml disimpan, lalu jalankan perintah berikut untuk membuat paket:

    mvn package -Dcheckstyle.skip

    Jika paket ASI_UDX-1.0-SNAPSHOT.jar muncul di direktori \ASI_UDX-main\target\, berarti UDAF telah berhasil dibuat.

Gunakan UDAF

Anda dapat menggunakan salah satu metode berikut untuk menggunakan UDAF dalam penerapan SQL:

  • Metode 1: Daftarkan UDAF dan gunakan UDAF yang telah didaftarkan dalam penerapan Anda.

    Metode ini memungkinkan penggunaan ulang kode untuk pengembangan penerapan selanjutnya. Untuk informasi selengkapnya tentang cara mendaftarkan UDAF, lihat Manage UDFs. Jika UDAF yang didaftarkan diberi nama ASI_UDAF$MySum, gunakan contoh kode berikut dalam penerapan Anda:

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a BIGINT NOT NULL
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `ASI_UDAF$MySum`(a)
    FROM ASI_UDAF_Source;
  • Metode 2: Di halaman Data Studio > ETL Flink, unggah paket JAR fungsi kustom menggunakan opsi Additional dependency files di bawah More configurations. Kemudian, tambahkan pernyataan untuk membuat fungsi temporary dalam pernyataan SQL pekerjaan dan gunakan fungsi tersebut.

    Setelah mengunggah paket JAR UDAF di kolom Additional Dependencies untuk penerapan Anda, UDAF tersebut hanya dapat digunakan dalam penerapan ini dan tidak dapat digunakan dalam penerapan lain. Jika fungsi temporary yang Anda buat diberi nama mysum, kode berikut menunjukkan cara menggunakannya dalam penerapan:

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; -- Buat fungsi temporary mysum.
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `mysum`(a)
    FROM ASI_UDAF_Source;
Catatan

Setelah mengembangkan pekerjaan SQL, buka halaman Operation Center > Job O&M. Temukan pekerjaan yang dituju dan klik Start di kolom Operation. Setelah pekerjaan dimulai, jumlah kumulatif data dari bidang `a` di tabel ASI_UDAF_Source akan dimasukkan ke tabel ASI_UDAF_Sink.