全部产品
Search
文档中心

Realtime Compute for Apache Flink:UDTFs

更新时间:Jul 02, 2025

Topik ini menjelaskan cara membuat, mendaftarkan, dan menggunakan fungsi bernilai tabel yang didefinisikan pengguna (UDTF) di Realtime Compute for Apache Flink.

Definisi

UDTF mengambil nol, satu, atau beberapa nilai skalar sebagai parameter input. Parameter ini dapat berupa parameter dengan panjang variabel. UDTF mirip dengan fungsi skalar yang didefinisikan pengguna (UDSF), kecuali bahwa UDTF dapat mengembalikan sejumlah baris alih-alih satu nilai tunggal. Baris yang dikembalikan terdiri dari satu atau lebih kolom. Beberapa baris atau kolom dikembalikan setiap kali UDTF dipanggil. Untuk informasi lebih lanjut, lihat Fungsi yang Didefinisikan Pengguna.

Buat UDTF

Catatan

Realtime Compute for Apache Flink menyediakan contoh-contoh fungsi yang didefinisikan pengguna (UDF) untuk memfasilitasi pengembangan bisnis Anda. Contoh-contoh tersebut mencakup implementasi UDSF, fungsi agregat yang didefinisikan pengguna (UDAF), dan fungsi bernilai tabel yang didefinisikan pengguna (UDTF). Lingkungan pengembangan versi terkait telah dikonfigurasi di setiap contoh.

  1. Unduh dan ekstrak ASI_UDX_Demo ke mesin lokal Anda.

    Catatan

    ASI_UDX_Demo disediakan di situs pihak ketiga. Saat mengakses situs web tersebut, akses mungkin gagal atau tertunda.

    Setelah mengekstrak paket, folder ASI_UDX-main akan dibuat. Parameter dalam path:

    • pom.xml: file konfigurasi tingkat proyek yang menjelaskan koordinat Maven, dependensi, aturan yang harus diikuti oleh pengembang, sistem manajemen cacat, organisasi, lisensi proyek, serta faktor lain terkait proyek.

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: kode Java untuk contoh UDTF.

  2. Buka IntelliJ IDEA dan pilih File > Open. Pilih folder ASI_UDX-main yang telah diekstrak dan klik OK.

  3. Klik dua kali file ASI_UDTF.java di direktori \ASI_UDX-main\src\main\java\ASI_UDTF, lalu buat konfigurasi dalam file sesuai kebutuhan bisnis Anda.

    Dalam contoh ini, ASI_UDTF.java dikonfigurasikan dengan kode untuk memisahkan string dalam satu baris menjadi beberapa kolom string dengan batang vertikal (|).

    package ASI_UDTF;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    
    public class ASI_UDTF extends TableFunction<Tuple2<String,String>> {
        public void eval(String str){
            String[] split = str.split("\\|");
            String name = split[0];
            String place = split[1];
            Tuple2<String,String> tuple2 = Tuple2.of(name,place);
            collect(tuple2);
        }
    }

    Untuk informasi lebih lanjut tentang tipe data dan mekanisme inferensi tipe yang didukung oleh UDTF, lihat Data Types dan Type Inference.

    Catatan

    Dokumen ini dirujuk dari dokumentasi Apache Flink 1.15. Tipe data dan mekanisme inferensi tipe yang didukung oleh UDTF dapat bervariasi berdasarkan versi utama Apache Flink. Untuk informasi lebih lanjut tentang tipe data dan mekanisme inferensi tipe yang didukung oleh UDTF di Apache Flink versi tertentu, lihat dokumentasi versi Apache Flink berdasarkan pemetaan antara Ververica Runtime (VVR) dan versi Apache Flink. Untuk informasi lebih lanjut tentang cara melihat versi mesin Apache Flink, lihat FAQ tentang manajemen dan operasi ruang kerja dan namespace.

    Contoh UDTF berikut mengembalikan nilai dari tipe komposit umum Tuple dan Row:

    • Tipe Tuple

      TableFunction<Tuple2<String,Integer>
    • Tipe Row

      @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
      public static class SplitFunction extends TableFunction<Row> {
      
        public void eval(String str) {
          for (String s : str.split(" ")) {
            // gunakan collect(...) untuk mengeluarkan baris
            collect(Row.of(s, s.length()));
          }
        }
      }
  4. Klik dua kali file pom.xml di direktori \ASI_UDX-main\ dan buat konfigurasi dalam file tersebut.

    Dalam contoh ini, pom.xml dikonfigurasikan dengan informasi dependensi JAR utama Apache Flink 1.11. Lakukan salah satu dari operasi berikut berdasarkan kebutuhan bisnis Anda:

    • Jika bisnis Anda tidak bergantung pada paket JAR lain, lanjutkan ke langkah berikutnya tanpa perlu mengonfigurasi file pom.xml.

    • Jika bisnis Anda bergantung pada paket JAR lain, tambahkan informasi paket JAR yang diperlukan ke file pom.xml.

    Apache Flink 1.11 bergantung pada paket JAR berikut:

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. Pergi ke direktori tempat file pom.xml disimpan. Kemudian, jalankan perintah berikut untuk mengemas file:

    mvn package -Dcheckstyle.skip

    Jika paket ASI_UDX-1.0-SNAPSHOT.jar muncul di direktori \ASI_UDX-main\target\, UDTF berhasil dibuat.

Daftarkan UDTF

Untuk informasi lebih lanjut tentang cara mendaftarkan UDTF, lihat Kelola UDF.

Gunakan UDTF

Setelah UDTF didaftarkan, Anda dapat melakukan langkah-langkah berikut untuk menggunakan UDTF:

  1. Gunakan Flink SQL untuk membuat deployment. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL.

    Di tabel ASI_UDTF_Source, string dalam setiap baris bidang pesan dipisahkan menjadi beberapa kolom oleh batang vertikal (|). Kode berikut memberikan contoh:

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
  2. Di halaman O&M > Deployments di konsol Flink yang sepenuhnya dikelola, temukan deployment yang ingin Anda mulai dan klik Start di kolom Actions.

    Setelah deployment dimulai, beberapa kolom string dari bidang pesan di tabel ASI_UDTF_Source dimasukkan ke dalam tabel ASI_UDTF_Sink. Kolom-kolom string ini dipisahkan oleh batang vertikal (|).