All Products
Search
Document Center

Realtime Compute for Apache Flink:Java UDTF

Last Updated:Mar 10, 2026

Topik ini menjelaskan cara mengembangkan, mendaftarkan, dan menggunakan user-defined table-valued function (UDTF) untuk Flink.

Definisi

User-defined table-valued function (UDTF) menerima nol atau lebih nilai skalar dengan panjang variabel sebagai parameter input. Berbeda dengan user-defined scalar function yang mengembalikan satu nilai, UDTF dapat mengembalikan beberapa baris sebagai output. Baris yang dikembalikan dapat terdiri dari satu atau lebih kolom. Untuk informasi selengkapnya, lihat User-defined Functions.

Mengembangkan UDTF

Catatan

Flink menyediakan contoh user-defined function (UDF) untuk membantu Anda mengembangkan layanan dengan cepat. Contoh tersebut mencakup implementasi UDSF, user-defined aggregate function (UDAF), dan user-defined table-valued function (UDTF). Lingkungan pengembangan telah dikonfigurasi sebelumnya dalam contoh tersebut, sehingga tidak diperlukan penyiapan tambahan.

  1. Unduh dan ekstrak proyek contoh ASI_UDX_Demo ke mesin lokal Anda.

    Catatan

    Akses ke situs pihak ketiga ASI_UDX_Demo mungkin gagal atau mengalami penundaan.

    Setelah proses ekstraksi selesai, folder ASI_UDX-main akan dibuat. Folder tersebut berisi file-file berikut:

    • pom.xml: File konfigurasi tingkat proyek. File ini menjelaskan koordinat Maven, dependensi, aturan pengembang, sistem pelacakan bug, organisasi, lisensi, dan informasi terkait proyek lainnya.

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: Kode Java contoh untuk UDTF.

  2. Di IntelliJ IDEA, klik File > Open dan buka folder ASI_UDX-main yang telah diekstrak.

  3. Klik ganda folder \ASI_UDX-main\src\main\java\ASI_UDTF dan modifikasi file ASI_UDTF.java sesuai kebutuhan.

    Pada contoh ini, kode dalam file ASI_UDTF.java memisahkan sebuah string menjadi beberapa kolom string berdasarkan delimiter tanda pipa vertikal (|).

    package ASI_UDTF;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    
    public class ASI_UDTF extends TableFunction<Tuple2<String,String>> {
        public void eval(String str){
            String[] split = str.split("\\|");
            String name = split[0];
            String place = split[1];
            Tuple2<String,String> tuple2 = Tuple2.of(name,place);
            collect(tuple2);
        }
    }

    Untuk informasi selengkapnya mengenai tipe data dan mekanisme inferensi tipe yang didukung oleh TableFunction, lihat Data types in Flink dan Type inference.

    Catatan

    Tautan di atas merujuk pada dokumentasi Flink versi 1.15. Tipe data yang didukung dan mekanisme inferensi tipe dapat berbeda antarversi utama Flink. Periksa dokumentasi Flink yang sesuai dengan versi Ververica Runtime (VVR) Anda berdasarkan pemetaan versi VVR ke Flink. Untuk melihat versi Flink Anda, lihat FAQ Workspace dan namespace.

    Contoh berikut menunjukkan cara menggunakan tipe komposit umum Tuple dan Row:

    • Tipe Tuple

      TableFunction<Tuple2<String,Integer>
    • Tipe Row

      @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
      public static class SplitFunction extends TableFunction<Row> {
      
        public void eval(String str) {
          for (String s : str.split(" ")) {
            // gunakan collect(...) untuk mengeluarkan satu baris
            collect(Row.of(s, s.length()));
          }
        }
      }
  4. Klik ganda folder \ASI_UDX-main\. Konfigurasikan file pom.xml.

    Pada contoh ini, file pom.xml dikonfigurasi dengan dependensi paket JAR utama untuk Flink versi 1.11. Sesuaikan dengan kebutuhan layanan Anda:

    • Jika layanan Anda tidak bergantung pada paket JAR lain, Anda tidak perlu mengonfigurasi file pom.xml. Lanjutkan ke langkah berikutnya.

    • Jika layanan Anda bergantung pada paket JAR lain, tambahkan informasi dependensi yang diperlukan ke file pom.xml.

    Dependensi paket JAR utama untuk Flink versi 1.11 adalah sebagai berikut.

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. Di direktori yang berisi file pom.xml, jalankan perintah berikut untuk membuat paket proyek.

    mvn package -Dcheckstyle.skip

    Jika paket ASI_UDX-1.0-SNAPSHOT.jar berhasil dibuat di direktori \ASI_UDX-main\target\, pengembangan UDTF telah selesai.

Mendaftarkan UDTF

Untuk informasi selengkapnya mengenai cara mendaftarkan UDTF, lihat Mengelola user-defined function (UDF).

Menggunakan UDTF

Setelah Anda mendaftarkan UDTF, Anda dapat menggunakannya dalam pekerjaan Anda dengan mengikuti prosedur berikut.

  1. Kembangkan pekerjaan Flink SQL. Untuk informasi selengkapnya, lihat Peta pengembangan pekerjaan.

    Kode contoh berikut memisahkan string pada setiap baris bidang message di tabel ASI_UDTF_Source menjadi beberapa kolom. Tanda pipa vertikal (|) digunakan sebagai delimiter.

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
  2. Di halaman Operation Center > Job O&M, temukan pekerjaan yang dituju dan klik Start pada kolom Actions.

    Setelah pekerjaan dimulai, string pada bidang `message` tabel `ASI_UDTF_Source` akan dipisahkan berdasarkan delimiter tanda pipa vertikal. Kolom karakter hasil pemisahan kemudian dimasukkan ke dalam tabel `ASI_UDTF_Sink`.