Topik ini menjelaskan cara mengembangkan, mendaftarkan, dan menggunakan user-defined table-valued function (UDTF) untuk Flink.
Definisi
User-defined table-valued function (UDTF) menerima nol atau lebih nilai skalar dengan panjang variabel sebagai parameter input. Berbeda dengan user-defined scalar function yang mengembalikan satu nilai, UDTF dapat mengembalikan beberapa baris sebagai output. Baris yang dikembalikan dapat terdiri dari satu atau lebih kolom. Untuk informasi selengkapnya, lihat User-defined Functions.
Mengembangkan UDTF
Flink menyediakan contoh user-defined function (UDF) untuk membantu Anda mengembangkan layanan dengan cepat. Contoh tersebut mencakup implementasi UDSF, user-defined aggregate function (UDAF), dan user-defined table-valued function (UDTF). Lingkungan pengembangan telah dikonfigurasi sebelumnya dalam contoh tersebut, sehingga tidak diperlukan penyiapan tambahan.
Unduh dan ekstrak proyek contoh ASI_UDX_Demo ke mesin lokal Anda.
CatatanAkses ke situs pihak ketiga ASI_UDX_Demo mungkin gagal atau mengalami penundaan.
Setelah proses ekstraksi selesai, folder ASI_UDX-main akan dibuat. Folder tersebut berisi file-file berikut:
pom.xml: File konfigurasi tingkat proyek. File ini menjelaskan koordinat Maven, dependensi, aturan pengembang, sistem pelacakan bug, organisasi, lisensi, dan informasi terkait proyek lainnya.
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: Kode Java contoh untuk UDTF.
Di IntelliJ IDEA, klik dan buka folder ASI_UDX-main yang telah diekstrak.
Klik ganda folder \ASI_UDX-main\src\main\java\ASI_UDTF dan modifikasi file ASI_UDTF.java sesuai kebutuhan.
Pada contoh ini, kode dalam file ASI_UDTF.java memisahkan sebuah string menjadi beberapa kolom string berdasarkan delimiter tanda pipa vertikal (|).
package ASI_UDTF; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class ASI_UDTF extends TableFunction<Tuple2<String,String>> { public void eval(String str){ String[] split = str.split("\\|"); String name = split[0]; String place = split[1]; Tuple2<String,String> tuple2 = Tuple2.of(name,place); collect(tuple2); } }Untuk informasi selengkapnya mengenai tipe data dan mekanisme inferensi tipe yang didukung oleh TableFunction, lihat Data types in Flink dan Type inference.
CatatanTautan di atas merujuk pada dokumentasi Flink versi 1.15. Tipe data yang didukung dan mekanisme inferensi tipe dapat berbeda antarversi utama Flink. Periksa dokumentasi Flink yang sesuai dengan versi Ververica Runtime (VVR) Anda berdasarkan pemetaan versi VVR ke Flink. Untuk melihat versi Flink Anda, lihat FAQ Workspace dan namespace.
Contoh berikut menunjukkan cara menggunakan tipe komposit umum Tuple dan Row:
Tipe Tuple
TableFunction<Tuple2<String,Integer>Tipe Row
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public static class SplitFunction extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { // gunakan collect(...) untuk mengeluarkan satu baris collect(Row.of(s, s.length())); } } }
Klik ganda folder \ASI_UDX-main\. Konfigurasikan file pom.xml.
Pada contoh ini, file pom.xml dikonfigurasi dengan dependensi paket JAR utama untuk Flink versi 1.11. Sesuaikan dengan kebutuhan layanan Anda:
Jika layanan Anda tidak bergantung pada paket JAR lain, Anda tidak perlu mengonfigurasi file pom.xml. Lanjutkan ke langkah berikutnya.
Jika layanan Anda bergantung pada paket JAR lain, tambahkan informasi dependensi yang diperlukan ke file pom.xml.
Dependensi paket JAR utama untuk Flink versi 1.11 adalah sebagai berikut.
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>Di direktori yang berisi file pom.xml, jalankan perintah berikut untuk membuat paket proyek.
mvn package -Dcheckstyle.skipJika paket ASI_UDX-1.0-SNAPSHOT.jar berhasil dibuat di direktori \ASI_UDX-main\target\, pengembangan UDTF telah selesai.
Mendaftarkan UDTF
Untuk informasi selengkapnya mengenai cara mendaftarkan UDTF, lihat Mengelola user-defined function (UDF).
Menggunakan UDTF
Setelah Anda mendaftarkan UDTF, Anda dapat menggunakannya dalam pekerjaan Anda dengan mengikuti prosedur berikut.
Kembangkan pekerjaan Flink SQL. Untuk informasi selengkapnya, lihat Peta pengembangan pekerjaan.
Kode contoh berikut memisahkan string pada setiap baris bidang message di tabel ASI_UDTF_Source menjadi beberapa kolom. Tanda pipa vertikal (|) digunakan sebagai delimiter.
CREATE TEMPORARY TABLE ASI_UDTF_Source ( `message` VARCHAR ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE ASI_UDTF_Sink ( name VARCHAR, place VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDTF_Sink SELECT name,place FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);Di halaman , temukan pekerjaan yang dituju dan klik Start pada kolom Actions.
Setelah pekerjaan dimulai, string pada bidang `message` tabel `ASI_UDTF_Source` akan dipisahkan berdasarkan delimiter tanda pipa vertikal. Kolom karakter hasil pemisahan kemudian dimasukkan ke dalam tabel `ASI_UDTF_Sink`.