All Products
Search
Document Center

Realtime Compute for Apache Flink:Java UDSFs

Last Updated:Mar 10, 2026

Topik ini menjelaskan cara mengembangkan, mendaftarkan, dan menggunakan Flink user-defined scalar function (UDSF).

Definisi

User-defined scalar function (UDSF) memetakan nol, satu, atau beberapa nilai skalar ke nilai skalar baru. Fungsi ini bersifat satu-ke-satu, di mana setiap baris data diproses untuk menghasilkan satu nilai output. Untuk informasi selengkapnya, lihat User-defined Functions.

Kembangkan UDSF

Catatan

Flink menyediakan contoh user-defined function (UDF) untuk membantu Anda mengembangkan layanan dengan cepat. Contoh tersebut mencakup implementasi untuk UDSF, user-defined aggregate functions (UDAF), dan user-defined table-valued functions (UDTF). Lingkungan pengembangan telah dikonfigurasi sebelumnya dalam contoh tersebut, sehingga tidak diperlukan penyiapan tambahan.

  1. Unduh dan ekstrak contoh ASI_UDX_Demo ke mesin lokal Anda.

    Catatan

    ASI_UDX_Demo dihosting di situs pihak ketiga. Anda mungkin mengalami kegagalan akses atau penundaan.

    Setelah diekstraksi, folder ASI_UDX-main akan dibuat. Folder tersebut berisi:

    • pom.xml: File konfigurasi tingkat proyek yang menjelaskan koordinat Maven proyek, dependensi, aturan untuk pengembang, sistem pelacakan bug, organisasi, lisensi, serta faktor terkait proyek lainnya.

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java: Kode Java contoh untuk UDSF.

  2. Di IntelliJ IDEA, klik File > Open, lalu pilih folder ASI_UDX-main yang telah diekstraksi.

  3. Klik ganda folder \ASI_UDX-main\src\main\java\ASI_UDF, lalu konfigurasikan file ASI_UDF.java sesuai kebutuhan.

    Pada contoh ini, file ASI_UDF.java dikonfigurasi dengan kode yang mengekstrak karakter dari posisi `begin` hingga `end` pada setiap string input.

    package ASI_UDF;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    public class ASI_UDF extends ScalarFunction {
        public String eval(String s, Integer begin, Integer end) {
            return s.substring(begin, end);
        }
    }
  4. Klik ganda folder \ASI_UDX-main\, lalu konfigurasikan file pom.xml.

    Pada contoh ini, file pom.xml dikonfigurasi dengan dependensi paket JAR utama untuk Flink versi 1.11. Berdasarkan kebutuhan layanan Anda:

    • Jika layanan Anda tidak bergantung pada paket JAR lain, Anda tidak perlu mengonfigurasi file pom.xml. Lanjutkan ke langkah berikutnya.

    • Jika layanan Anda bergantung pada paket JAR lain, tambahkan informasi dependensi yang diperlukan ke file pom.xml.

    Dependensi paket JAR utama untuk Flink versi 1.11 adalah sebagai berikut.

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. Di direktori yang berisi file pom.xml, jalankan perintah berikut untuk membuat paket proyek.

    mvn package -Dcheckstyle.skip

    Paket ASI_UDX-1.0-SNAPSHOT.jar dibuat di direktori \ASI_UDX-main\target\, yang menandakan bahwa pengembangan UDSF telah selesai.

Daftarkan UDSF

Untuk informasi selengkapnya tentang cara mendaftarkan UDSF, lihat Kelola user-defined functions (UDFs).

Gunakan UDSF

Setelah Anda mendaftarkan UDSF, Anda dapat menggunakannya dengan prosedur berikut.

  1. Kembangkan pekerjaan Flink SQL. Untuk informasi selengkapnya, lihat Peta pengembangan Pekerjaan.

    Kode contoh berikut mengekstrak karakter dari posisi kedua hingga keempat pada string di bidang `a` tabel `ASI_UDSF_Source`.

    CREATE TEMPORARY TABLE ASI_UDSF_Source (
      a VARCHAR,
      b INT,
      c INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDSF_Sink (
      a VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDSF_Sink
    SELECT ASI_UDSF(a,2,4)
    FROM ASI_UDSF_Source;
  2. Di halaman Operation Center > Job O&M, temukan pekerjaan target lalu klik Start di kolom Actions.

    Setelah pekerjaan dimulai, karakter dari posisi kedua hingga keempat pada string di bidang `a` setiap baris tabel `ASI_UDSF_Source` dimasukkan ke tabel `ASI_UDSF_Sink`.