All Products
Search
Document Center

Realtime Compute for Apache Flink:Java

Last Updated:Mar 10, 2026

Realtime Compute for Apache Flink mendukung user-defined function (UDF) Java dalam pekerjaan Flink SQL. Topik ini menjelaskan jenis-jenis UDF Java, cara melakukan pass parameter, serta menyediakan catatan penggunaan penting.

Catatan penting

  • Untuk menghindari konflik dependensi paket JAR saat mengembangkan UDF, pertimbangkan hal berikut:

    • Pastikan versi Flink yang dipilih pada halaman pengembangan SQL sesuai dengan versi Flink dalam dependensi Pom.

    • Untuk dependensi terkait Flink, atur cakupannya ke `provided` dengan menambahkan <scope>provided</scope>.

    • Paketkan dependensi pihak ketiga lainnya menggunakan metode Shade. Untuk informasi selengkapnya, lihat Apache Maven Shade Plugin.

    Untuk informasi lebih lanjut tentang konflik dependensi Flink, lihat Bagaimana cara menyelesaikan konflik dependensi Flink?.

  • Untuk mencegah timeout akibat pemanggilan UDF yang terlalu sering dalam pekerjaan SQL, unggah paket JAR UDF sebagai file dependensi, lalu deklarasikan fungsi tersebut dalam pekerjaan menggunakan sintaksis CREATE TEMPORARY FUNCTION. Contohnya: CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';

Klasifikasi UDF

Flink mendukung tiga jenis UDF berikut.

Klasifikasi

Deskripsi

User-defined scalar function (UDSF)

UDSF memetakan nol, satu, atau beberapa nilai skalar ke nilai skalar baru. Hubungan antara input dan output bersifat satu-ke-satu, artinya fungsi ini membaca satu baris data dan menghasilkan satu nilai output. Untuk informasi selengkapnya, lihat User-defined scalar functions (UDSFs).

User-defined aggregate function (UDAF)

UDAF mengagregasi beberapa record menjadi satu record. Hubungan antara input dan output bersifat banyak-ke-satu, artinya fungsi ini menggabungkan beberapa record input menjadi satu nilai output. Untuk informasi selengkapnya, lihat User-defined aggregate functions (UDAFs).

User-defined table-valued function (UDTF)

UDTF menerima nol, satu, atau beberapa nilai skalar sebagai parameter input. Parameter tersebut dapat memiliki panjang variabel. Fungsi ini mirip dengan UDSF, tetapi dapat mengembalikan sejumlah baris apa pun sebagai output, bukan hanya satu nilai tunggal. Baris yang dikembalikan dapat terdiri dari satu atau beberapa kolom. Satu pemanggilan fungsi dapat menghasilkan beberapa baris atau kolom. Untuk informasi selengkapnya, lihat User-defined table-valued functions (UDTFs).

Pendaftaran UDF

  • Untuk informasi tentang cara mendaftarkan UDF global, lihat Global UDFs.

  • Untuk informasi tentang cara mendaftarkan UDF tingkat pekerjaan, lihat Job-level UDFs.

Pass parameter ke UDF

Anda dapat mengonfigurasi parameter untuk UDF di Konsol pengembangan Flink dan menggunakannya dalam kode UDF. Hal ini memungkinkan Anda mengubah nilai parameter UDF secara langsung di konsol dengan cepat.

UDF menyediakan metode opsional open(FunctionContext context). Objek FunctionContext digunakan untuk melakukan pass item konfigurasi kustom sebagai parameter. Prosesnya adalah sebagai berikut:

  1. Pada tab Deployment Details di halaman Operation Center > Job O&M di Konsol pengembangan Flink, tambahkan item konfigurasi pipeline.global-job-parameters pada bagian Other Configurations dalam Running Parameter Settings.

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    FunctionContext#getJobParameter hanya dapat mengambil nilai item konfigurasi pipeline.global-job-parameters. Oleh karena itu, Anda harus menuliskan semua item konfigurasi yang digunakan UDF ke dalam pipeline.global-job-parameters. Langkah-langkah berikut menjelaskan cara mengonfigurasi item ini.

    Langkah

    Tindakan

    Prosedur

    Contoh

    Langkah 1

    Definisikan pasangan kunci-nilai.

    Pisahkan kunci dan nilai dengan tanda titik dua (:) dan bungkus setiap pasangan kunci-nilai dengan tanda kutip tunggal (').

    Catatan
    • Jika kunci atau nilai berisi tanda titik dua (:), bungkus kunci atau nilai tersebut dengan tanda kutip ganda (").

    • Jika kunci atau nilai berisi tanda titik dua (:) atau tanda kutip ganda ("), Anda harus melakukan escape dengan dua tanda kutip ganda berturut-turut ("").

    • Jika kunci = k1 dan value = {hi,hello}, definisikan pasangan tersebut sebagai 'k1:{hi,hello}'.

    • Jika kunci = k2 dan value = str:ing,str:ing, definisikan pasangan tersebut sebagai 'k2:"str:ing,str:ing"'

    • Jika kunci = k3 dan value = str"ing,str:ing, definisikan pasangan tersebut sebagai 'k3:"str""ing,str:ing"'

    Langkah 2

    Format nilai akhir pipeline.global-job-parameters sebagai File YAML.

    Letakkan setiap pasangan kunci-nilai pada baris baru dan pisahkan dengan koma (,).

    Catatan
    • String multi-baris dalam File YAML dimulai dengan tanda vertikal (|).

    • Setiap baris string multi-baris dalam File YAML harus memiliki indentasi yang sama.

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. Dalam kode UDF, gunakan FunctionContext#getJobParameter untuk mengambil nilai setiap item. Kode berikut merupakan contohnya.

    Kode contoh berikut disediakan.

    context.getJobParameter("k1", null); // Mengembalikan string {hi,hello}.
    context.getJobParameter("k2", null); // Mengembalikan string str:ing,str:ing.
    context.getJobParameter("k3", null); // Mengembalikan string str"ing,str:ing.
    context.getJobParameter("pipeline.global-job-parameters", null); // Mengembalikan null. Anda hanya dapat mengambil konten yang didefinisikan dalam pipeline.global-job-parameters, bukan item konfigurasi pekerjaan lainnya.

Parameter bernama

Catatan

Hanya Ververica Runtime (VVR) 8.0.7 dan versi yang lebih baru yang mendukung penggunaan parameter bernama untuk mengimplementasikan UDF.

Saat memanggil fungsi dalam SQL, Anda harus menentukan semua parameter dalam urutan yang benar. Jika suatu fungsi memiliki banyak parameter, sangat mudah membuat kesalahan dalam jumlah atau urutan parameternya. Anda juga tidak dapat mengabaikan parameter opsional. Parameter bernama memungkinkan Anda hanya menentukan parameter yang diperlukan, sehingga mengurangi kemungkinan kesalahan dan lebih nyaman. Contoh berikut dari user-defined scalar function (ScalarFunction) menunjukkan cara menggunakan parameter bernama.

// Implementasikan user-defined scalar function. Dua parameter input terakhir bersifat opsional (isOptional = true).
public class MyFuncWithNamedArgs extends ScalarFunction  {
	private static final long serialVersionUID = 1L;

	public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
			@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
			@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {

		if (i2 != null) {
			return "i2#" + i2;
		}
		if (l3 != null) {
			return "l3#" + l3;
		}
		return "default#" + f1;
	}
}

Saat menggunakan UDF ini dalam SQL, Anda dapat hanya menentukan parameter pertama yang wajib atau secara selektif menentukan parameter opsional. Kode berikut merupakan contohnya.

CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';

CREATE temporary TABLE s1 (
    a INT,
    b BIGINT,
    c VARCHAR,
    d VARCHAR,
    PRIMARY KEY(a) NOT ENFORCED
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1'
);

CREATE temporary TABLE sink (
    a INT,
    b VARCHAR,
    c VARCHAR,
    d VARCHAR
) WITH (
    'connector' = 'print'
);

INSERT INTO sink
SELECT a,
    -- Tentukan hanya parameter pertama yang wajib
    MyNamedUdf(f1 => c) arg1_res,
    -- Tentukan parameter pertama yang wajib dan parameter opsional kedua
    MyNamedUdf(f1 => c, f2 => a) arg2_res,
    -- Tentukan parameter pertama yang wajib dan parameter opsional ketiga
    MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;

Referensi