Topik ini menjelaskan cara membuat, mendaftarkan, dan menggunakan fungsi bernilai tabel yang didefinisikan pengguna (UDTF) di Realtime Compute for Apache Flink.
Definisi
UDTF mengambil nol, satu, atau beberapa nilai skalar sebagai parameter input. Parameter ini dapat berupa parameter dengan panjang variabel. UDTF mirip dengan fungsi skalar yang didefinisikan pengguna (UDSF), kecuali bahwa UDTF dapat mengembalikan sejumlah baris alih-alih satu nilai tunggal. Baris yang dikembalikan terdiri dari satu atau lebih kolom. Beberapa baris atau kolom dikembalikan setiap kali UDTF dipanggil. Untuk informasi lebih lanjut, lihat Fungsi yang Didefinisikan Pengguna.
Buat UDTF
Realtime Compute for Apache Flink menyediakan contoh-contoh fungsi yang didefinisikan pengguna (UDF) untuk memfasilitasi pengembangan bisnis Anda. Contoh-contoh tersebut mencakup implementasi UDSF, fungsi agregat yang didefinisikan pengguna (UDAF), dan fungsi bernilai tabel yang didefinisikan pengguna (UDTF). Lingkungan pengembangan versi terkait telah dikonfigurasi di setiap contoh.
Unduh dan ekstrak ASI_UDX_Demo ke mesin lokal Anda.
CatatanASI_UDX_Demo disediakan di situs pihak ketiga. Saat mengakses situs web tersebut, akses mungkin gagal atau tertunda.
Setelah mengekstrak paket, folder ASI_UDX-main akan dibuat. Parameter dalam path:
pom.xml: file konfigurasi tingkat proyek yang menjelaskan koordinat Maven, dependensi, aturan yang harus diikuti oleh pengembang, sistem manajemen cacat, organisasi, lisensi proyek, serta faktor lain terkait proyek.
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: kode Java untuk contoh UDTF.
Buka IntelliJ IDEA dan pilih . Pilih folder ASI_UDX-main yang telah diekstrak dan klik OK.
Klik dua kali file ASI_UDTF.java di direktori \ASI_UDX-main\src\main\java\ASI_UDTF, lalu buat konfigurasi dalam file sesuai kebutuhan bisnis Anda.
Dalam contoh ini, ASI_UDTF.java dikonfigurasikan dengan kode untuk memisahkan string dalam satu baris menjadi beberapa kolom string dengan batang vertikal (|).
package ASI_UDTF; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class ASI_UDTF extends TableFunction<Tuple2<String,String>> { public void eval(String str){ String[] split = str.split("\\|"); String name = split[0]; String place = split[1]; Tuple2<String,String> tuple2 = Tuple2.of(name,place); collect(tuple2); } }Untuk informasi lebih lanjut tentang tipe data dan mekanisme inferensi tipe yang didukung oleh UDTF, lihat Data Types dan Type Inference.
CatatanDokumen ini dirujuk dari dokumentasi Apache Flink 1.15. Tipe data dan mekanisme inferensi tipe yang didukung oleh UDTF dapat bervariasi berdasarkan versi utama Apache Flink. Untuk informasi lebih lanjut tentang tipe data dan mekanisme inferensi tipe yang didukung oleh UDTF di Apache Flink versi tertentu, lihat dokumentasi versi Apache Flink berdasarkan pemetaan antara Ververica Runtime (VVR) dan versi Apache Flink. Untuk informasi lebih lanjut tentang cara melihat versi mesin Apache Flink, lihat FAQ tentang manajemen dan operasi ruang kerja dan namespace.
Contoh UDTF berikut mengembalikan nilai dari tipe komposit umum Tuple dan Row:
Tipe Tuple
TableFunction<Tuple2<String,Integer>Tipe Row
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public static class SplitFunction extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { // gunakan collect(...) untuk mengeluarkan baris collect(Row.of(s, s.length())); } } }
Klik dua kali file pom.xml di direktori \ASI_UDX-main\ dan buat konfigurasi dalam file tersebut.
Dalam contoh ini, pom.xml dikonfigurasikan dengan informasi dependensi JAR utama Apache Flink 1.11. Lakukan salah satu dari operasi berikut berdasarkan kebutuhan bisnis Anda:
Jika bisnis Anda tidak bergantung pada paket JAR lain, lanjutkan ke langkah berikutnya tanpa perlu mengonfigurasi file pom.xml.
Jika bisnis Anda bergantung pada paket JAR lain, tambahkan informasi paket JAR yang diperlukan ke file pom.xml.
Apache Flink 1.11 bergantung pada paket JAR berikut:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>Pergi ke direktori tempat file pom.xml disimpan. Kemudian, jalankan perintah berikut untuk mengemas file:
mvn package -Dcheckstyle.skipJika paket ASI_UDX-1.0-SNAPSHOT.jar muncul di direktori \ASI_UDX-main\target\, UDTF berhasil dibuat.
Daftarkan UDTF
Untuk informasi lebih lanjut tentang cara mendaftarkan UDTF, lihat Kelola UDF.
Gunakan UDTF
Setelah UDTF didaftarkan, Anda dapat melakukan langkah-langkah berikut untuk menggunakan UDTF:
Gunakan Flink SQL untuk membuat deployment. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL.
Di tabel ASI_UDTF_Source, string dalam setiap baris bidang pesan dipisahkan menjadi beberapa kolom oleh batang vertikal (|). Kode berikut memberikan contoh:
CREATE TEMPORARY TABLE ASI_UDTF_Source ( `message` VARCHAR ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE ASI_UDTF_Sink ( name VARCHAR, place VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDTF_Sink SELECT name,place FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);Di halaman di konsol Flink yang sepenuhnya dikelola, temukan deployment yang ingin Anda mulai dan klik Start di kolom Actions.
Setelah deployment dimulai, beberapa kolom string dari bidang pesan di tabel ASI_UDTF_Source dimasukkan ke dalam tabel ASI_UDTF_Sink. Kolom-kolom string ini dipisahkan oleh batang vertikal (|).