全部产品
Search
文档中心

Realtime Compute for Apache Flink:UDSFs

更新时间:Jul 02, 2025

Topik ini menjelaskan cara membuat, mendaftarkan, dan menggunakan fungsi skalar yang didefinisikan pengguna (UDSF) di Realtime Compute for Apache Flink.

Definisi

Sebuah UDSF memetakan nol, satu, atau lebih nilai skalar ke nilai skalar baru. Data input dan output dari sebuah UDSF dipetakan dalam hubungan satu-satu. Setiap kali UDSF membaca satu baris data, ia menulis satu nilai output. Untuk informasi lebih lanjut, lihat Fungsi yang Didefinisikan Pengguna.

Buat UDSF

Catatan

Realtime Compute for Apache Flink menyediakan contoh-contoh fungsi yang didefinisikan pengguna (UDF) untuk memfasilitasi pengembangan bisnis Anda. Contoh-contoh tersebut mencakup implementasi UDSF, fungsi agregat yang didefinisikan pengguna (UDAF), dan fungsi bernilai tabel yang didefinisikan pengguna (UDTF). Lingkungan pengembangan versi terkait telah dikonfigurasi di setiap contoh.

  1. Unduh dan ekstrak ASI_UDX_Demo ke mesin lokal Anda.

    Catatan

    ASI_UDX_Demo disediakan di situs pihak ketiga. Saat mengakses situs web tersebut, akses mungkin gagal atau tertunda.

    Setelah mengekstrak paket, folder ASI_UDX-main akan dibuat. Parameter dalam jalur:

    • pom.xml: File konfigurasi tingkat proyek yang menjelaskan koordinat Maven, dependensi, aturan yang harus diikuti oleh pengembang, sistem manajemen cacat, organisasi, lisensi, serta faktor lain terkait proyek.

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java: Kode Java untuk contoh UDSF.

  2. Buka IntelliJ IDEA dan pilih File > Open. Pilih folder ASI_UDX-main yang telah diekstrak dan klik OK.

  3. Klik dua kali file ASI_UDF.java di direktori \ASI_UDX-main\src\main\java\ASI_UDF, lalu buat konfigurasi dalam file berdasarkan kebutuhan bisnis Anda.

    Dalam contoh ini, ASI_UDF.java dikonfigurasikan dengan kode untuk mendapatkan karakter dari posisi awal hingga posisi akhir di setiap rekaman data.

    package ASI_UDF;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    public class ASI_UDF extends ScalarFunction {
        public String eval(String s, Integer begin, Integer end) {
            return s.substring(begin, end);
        }
    }
  4. Klik dua kali file pom.xml di direktori \ASI_UDX-main\ dan buat konfigurasi dalam file tersebut.

    Dalam contoh ini, pom.xml dikonfigurasikan dengan informasi dependensi utama JAR Apache Flink 1.11. Lakukan salah satu operasi berikut berdasarkan kebutuhan bisnis Anda:

    • Jika bisnis Anda tidak bergantung pada paket JAR lainnya, lanjutkan ke langkah berikutnya tanpa perlu mengonfigurasi file pom.xml.

    • Jika bisnis Anda bergantung pada paket JAR lainnya, tambahkan informasi paket JAR yang diperlukan ke file pom.xml.

    Apache Flink 1.11 bergantung pada paket JAR berikut:

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. Pergi ke direktori tempat file pom.xml disimpan. Kemudian, jalankan perintah berikut untuk mengemas file:

    mvn package -Dcheckstyle.skip

    Jika paket ASI_UDX-1.0-SNAPSHOT.jar muncul di direktori \ASI_UDX-main\target\, maka UDSF berhasil dibuat.

Daftarkan UDSF

Untuk informasi lebih lanjut tentang cara mendaftarkan UDSF, lihat Kelola UDF.

Gunakan UDSF

Setelah mendaftarkan UDSF, Anda dapat menggunakannya. Untuk menggunakan UDSF, ikuti langkah-langkah berikut:

  1. Gunakan Flink SQL untuk membuat deployment. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL.

    Kode berikut memberikan contoh cara mendapatkan karakter dari karakter kedua hingga karakter keempat dari string di setiap baris bidang a dalam tabel ASI_UDSF_Source:

    CREATE TEMPORARY TABLE ASI_UDSF_Source (
      a VARCHAR,
      b INT,
      c INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDSF_Sink (
      a VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDSF_Sink
    SELECT ASI_UDSF(a,2,4)
    FROM ASI_UDSF_Source;
  2. Di halaman O&M > Deployments di konsol Flink yang sepenuhnya dikelola, temukan deployment yang ingin dimulai dan klik Start di kolom Actions.

    Setelah deployment dimulai, karakter kedua hingga keempat dari string di setiap baris bidang a dalam tabel ASI_UDSF_Source dimasukkan ke setiap baris tabel ASI_UDSF_Sink.