Topik ini menjelaskan cara mengembangkan, mendaftarkan, dan menggunakan Flink user-defined scalar function (UDSF).
Definisi
User-defined scalar function (UDSF) memetakan nol, satu, atau beberapa nilai skalar ke nilai skalar baru. Fungsi ini bersifat satu-ke-satu, di mana setiap baris data diproses untuk menghasilkan satu nilai output. Untuk informasi selengkapnya, lihat User-defined Functions.
Kembangkan UDSF
Flink menyediakan contoh user-defined function (UDF) untuk membantu Anda mengembangkan layanan dengan cepat. Contoh tersebut mencakup implementasi untuk UDSF, user-defined aggregate functions (UDAF), dan user-defined table-valued functions (UDTF). Lingkungan pengembangan telah dikonfigurasi sebelumnya dalam contoh tersebut, sehingga tidak diperlukan penyiapan tambahan.
Unduh dan ekstrak contoh ASI_UDX_Demo ke mesin lokal Anda.
CatatanASI_UDX_Demo dihosting di situs pihak ketiga. Anda mungkin mengalami kegagalan akses atau penundaan.
Setelah diekstraksi, folder ASI_UDX-main akan dibuat. Folder tersebut berisi:
pom.xml: File konfigurasi tingkat proyek yang menjelaskan koordinat Maven proyek, dependensi, aturan untuk pengembang, sistem pelacakan bug, organisasi, lisensi, serta faktor terkait proyek lainnya.
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java: Kode Java contoh untuk UDSF.
Di IntelliJ IDEA, klik , lalu pilih folder ASI_UDX-main yang telah diekstraksi.
Klik ganda folder \ASI_UDX-main\src\main\java\ASI_UDF, lalu konfigurasikan file ASI_UDF.java sesuai kebutuhan.
Pada contoh ini, file ASI_UDF.java dikonfigurasi dengan kode yang mengekstrak karakter dari posisi `begin` hingga `end` pada setiap string input.
package ASI_UDF; import org.apache.flink.table.functions.ScalarFunction; public class ASI_UDF extends ScalarFunction { public String eval(String s, Integer begin, Integer end) { return s.substring(begin, end); } }Klik ganda folder \ASI_UDX-main\, lalu konfigurasikan file pom.xml.
Pada contoh ini, file pom.xml dikonfigurasi dengan dependensi paket JAR utama untuk Flink versi 1.11. Berdasarkan kebutuhan layanan Anda:
Jika layanan Anda tidak bergantung pada paket JAR lain, Anda tidak perlu mengonfigurasi file pom.xml. Lanjutkan ke langkah berikutnya.
Jika layanan Anda bergantung pada paket JAR lain, tambahkan informasi dependensi yang diperlukan ke file pom.xml.
Dependensi paket JAR utama untuk Flink versi 1.11 adalah sebagai berikut.
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>Di direktori yang berisi file pom.xml, jalankan perintah berikut untuk membuat paket proyek.
mvn package -Dcheckstyle.skipPaket ASI_UDX-1.0-SNAPSHOT.jar dibuat di direktori \ASI_UDX-main\target\, yang menandakan bahwa pengembangan UDSF telah selesai.
Daftarkan UDSF
Untuk informasi selengkapnya tentang cara mendaftarkan UDSF, lihat Kelola user-defined functions (UDFs).
Gunakan UDSF
Setelah Anda mendaftarkan UDSF, Anda dapat menggunakannya dengan prosedur berikut.
Kembangkan pekerjaan Flink SQL. Untuk informasi selengkapnya, lihat Peta pengembangan Pekerjaan.
Kode contoh berikut mengekstrak karakter dari posisi kedua hingga keempat pada string di bidang `a` tabel `ASI_UDSF_Source`.
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;Di halaman , temukan pekerjaan target lalu klik Start di kolom Actions.
Setelah pekerjaan dimulai, karakter dari posisi kedua hingga keempat pada string di bidang `a` setiap baris tabel `ASI_UDSF_Source` dimasukkan ke tabel `ASI_UDSF_Sink`.