Topik ini menjelaskan cara membuat, mendaftarkan, dan menggunakan fungsi agregat yang didefinisikan pengguna (UDAF) di Realtime Compute for Apache Flink.
Definisi
UDAF menggabungkan beberapa nilai menjadi satu nilai. Pemetaan banyak-ke-satu dibentuk antara input dan output dari UDAF. Beberapa nilai input digabungkan untuk menghasilkan satu nilai output. Untuk informasi lebih lanjut, lihat Fungsi yang Didefinisikan Pengguna.
Fungsi yang Didefinisikan Pengguna dan ASI_UDX_Demo disediakan di situs pihak ketiga. Saat Anda mengakses situs tersebut, situs mungkin gagal diakses atau akses ke situs mungkin tertunda.
Buat UDAF
Realtime Compute for Apache Flink menyediakan contoh fungsi yang didefinisikan pengguna (UDF) untuk memfasilitasi pengembangan bisnis Anda. Contoh-contoh tersebut mencakup cara mengimplementasikan UDSF, fungsi agregat yang didefinisikan pengguna (UDAF), dan fungsi bernilai tabel yang didefinisikan pengguna (UDTF). Lingkungan pengembangan versi terkait dikonfigurasi dalam setiap contoh.
Unduh dan ekstrak ASI_UDX_Demo ke mesin lokal Anda.
Setelah mengekstrak paket tersebut, folder ASI_UDX-main akan dibuat. Folder tersebut berisi item-item berikut:
pom.xml: file konfigurasi tingkat proyek yang menjelaskan koordinat Maven, dependensi, aturan yang harus diikuti oleh pengembang, sistem manajemen cacat, organisasi, dan lisensi proyek, serta semua faktor lain yang terkait dengan proyek.
\ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java: kode Java untuk contoh UDAF.
Buka IntelliJ IDEA dan pilih . Pilih folder ASI_UDX-main yang telah diekstrak dan klik OK.
Klik dua kali file pom.xml di direktori \ASI_UDX-main\ dan konfigurasikan informasi dalam file tersebut sesuai kebutuhan bisnis Anda.
Dalam contoh ini, informasi dependensi minimum yang diperlukan untuk mengembangkan UDF di Flink 1.12 dikonfigurasi dalam file pom.xml. Lakukan salah satu dari operasi berikut berdasarkan kebutuhan bisnis Anda:
Jika bisnis Anda tidak memiliki dependensi, lanjutkan ke langkah berikutnya tanpa perlu mengonfigurasi informasi dalam file pom.xml.
Jika bisnis Anda memiliki dependensi, tambahkan informasi dependensi yang Anda butuhkan ke file pom.xml.
Contoh berikut menunjukkan dependensi minimum Flink 1.12:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.12.7</version> <scope>provided</scope> </dependency> </dependencies>CatatanKami merekomendasikan agar Anda memasukkan versi minor terbaru untuk versi utama Apache Flink yang sesuai dengan versi Ververica Runtime (VVR) penyebaran Anda. Untuk informasi lebih lanjut tentang pemetaan versi antara VVR dan Apache Flink, lihat Ikhtisar.
Klik dua kali file ASI_UDAF.java di direktori \ASI_UDX-main\src\main\java\ASI_UDAF, dan tulis kode dalam file tersebut sesuai kebutuhan bisnis Anda.
Contoh ini menunjukkan kode untuk penjumlahan kumulatif dalam file ASI_UDAF.java.
package ASI_UDAF; import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; public class ASI_UDAF{ public static class AccSum{ public long sum; } public static class MySum extends AggregateFunction<Long, AccSum>{ @Override public Long getValue(AccSum acSum){ return acSum.sum; } @Override public AccSum createAccumulator(){ AccSum acCount= new AccSum(); acCount.sum=0; return acCount; } public void accumulate(AccSum acc,long num){ acc.sum += num; } /** * Mendukung pencabutan pesan yang dihasilkan oleh operator hulu. */ public void retract(AccSum acc,long num){ acc.sum -= num; } /** * Mendukung optimasi agregasi dua tahap lokal-global. */ public void merge(AccSum acc,Iterable<AccSum> it){ Iterator<AccSum> iter=it.iterator(); while(iter.hasNext()){ AccSum accSum=iter.next(); if(null!=accSum){ acc.sum+=accSum.sum; } } } } }Dalam contoh ini, UDAF digunakan untuk melakukan operasi kumulatif sederhana. Misalnya, jika tiga nilai dalam klausa GROUP BY adalah 1, 2, dan 3, hasil yang dikembalikan bervariasi berdasarkan apakah miniBatch diaktifkan.
Jika miniBatch tidak diaktifkan, nilai yang dikembalikan adalah 1, 3, dan 6. Secara default, miniBatch tidak diaktifkan.
Jika miniBatch diaktifkan, nilai yang dikembalikan adalah 6 dan jumlah nilai perantara yang dikembalikan tidak dapat ditentukan. Hal ini karena jumlah nilai yang dikembalikan bervariasi berdasarkan pengaturan miniBatch dan distribusi data input.
CatatanUntuk informasi lebih lanjut tentang miniBatch, lihat Optimalkan Flink SQL.
Pergi ke direktori tempat file pom.xml disimpan. Kemudian, jalankan perintah berikut untuk mengemas file:
mvn package -Dcheckstyle.skipJika paket ASI_UDX-1.0-SNAPSHOT.jar muncul di direktori \ASI_UDX-main\target\, UDAF telah dibuat.
Gunakan UDAF
Anda dapat menggunakan salah satu metode berikut untuk menggunakan UDAF dalam penyebaran SQL:
Metode 1: Daftarkan UDAF dan gunakan UDAF yang terdaftar dalam penyebaran Anda.
Metode ini membantu Anda menggunakan kembali kode untuk pengembangan penyebaran selanjutnya. Untuk informasi lebih lanjut tentang cara mendaftarkan UDAF, lihat Kelola UDF. Jika UDAF yang terdaftar bernama ASI_UDAF$MySum, gunakan kode contoh berikut dalam penyebaran Anda:
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT NOT NULL ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( sum BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO ASI_UDAF_Sink SELECT `ASI_UDAF$MySum`(a) FROM ASI_UDAF_Source;Metode 2: Unggah paket JAR dari UDAF di bidang Additional Dependencies pada tab Advanced di halaman Draft Editor. Selanjutnya, tambahkan pernyataan untuk membuat fungsi sementara ke dalam pernyataan SQL penyebaran Anda dan gunakan fungsi tersebut.
Setelah Anda mengunggah paket JAR UDAF di bidang Dependensi Tambahan untuk penyebaran Anda, Anda hanya dapat menggunakan UDAF dalam penyebaran ini. Anda tidak dapat menggunakan paket JAR ini di penyebaran lain. Jika fungsi sementara yang Anda buat bernama mysum, kode berikut menunjukkan cara menggunakan fungsi tersebut dalam penyebaran:
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( sum BIGINT ) WITH ( 'connector' = 'print' ); CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; --Buat fungsi sementara bernama mysum. INSERT INTO ASI_UDAF_Sink SELECT `mysum`(a) FROM ASI_UDAF_Source;
Setelah pengembangan penyebaran SQL selesai, pergi ke halaman , temukan penyebaran SQL, dan kemudian klik Start di kolom Actions. Setelah penyebaran dimulai, jumlah data dari bidang a dalam tabel ASI_UDAF_Source dimasukkan ke dalam tabel ASI_UDAF_Sink.