全部产品
Search
文档中心

Realtime Compute for Apache Flink:Java

更新时间:Jul 06, 2025

Realtime Compute for Apache Flink memungkinkan Anda menggunakan fungsi yang ditentukan pengguna (UDF) berbasis Java dalam penyebaran SQL Flink. Topik ini menjelaskan tipe-tipe Java UDF serta cara melewatkan parameter ke UDF dan langkah-langkah pencegahan yang perlu dipertimbangkan saat mengembangkannya.

Langkah-langkah Pencegahan

  • UDF dapat menyebabkan konflik dependensi dalam file JAR yang dikemas. Perhatikan hal-hal berikut saat mengembangkan UDF:

    • Pastikan versi Realtime Compute for Apache Flink yang Anda konfigurasikan untuk penyebaran SQL sesuai dengan versi Apache Flink yang Anda tentukan dalam file pom.xml.

    • Tentukan <scope>provided</scope> untuk dependensi terkait Apache Flink.

    • Gunakan Plugin Shade untuk mengemas dependensi pihak ketiga lainnya. Untuk informasi lebih lanjut, lihat Apache Maven Shade Plugin.

    Untuk informasi tentang cara menyelesaikan konflik dependensi, lihat FAQ tentang manajemen dan operasi ruang kerja serta namespace.

  • Pemanggilan UDF secara sering dalam penyebaran SQL dapat menyebabkan timeout. Kami merekomendasikan agar Anda mengunggah file JAR dari UDF sebagai dependensi tambahan dari penyebaran dan menggunakan pernyataan CREATE TEMPORARY FUNCTION untuk mendeklarasikan UDF. Contoh: CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';.

Tipe UDF

Tabel berikut menjelaskan tipe UDF yang didukung di Realtime Compute for Apache Flink.

Tipe

Deskripsi

User Defined Scalar Function (UDSF)

UDSF memetakan nol atau lebih nilai skalar menjadi nilai skalar baru. Pemetaan satu-satu dibuat antara input dan output. Anda dapat memanggil UDSF untuk menghasilkan nilai baru berdasarkan rekaman data. Untuk informasi lebih lanjut, lihat UDSFs.

Fungsi agregat yang ditentukan pengguna (UDAF)

UDAF menggabungkan beberapa rekaman data menjadi satu rekaman data. Pemetaan banyak-ke-satu dibuat antara input dan output. Untuk informasi lebih lanjut, lihat UDAFs.

Fungsi tabel yang ditentukan pengguna (UDTF)

UDTF menerima nol atau lebih nilai skalar sebagai parameter input. Parameter-parameter ini bisa memiliki panjang variabel. UDTF mirip dengan UDSF, kecuali UDTF dapat mengembalikan sejumlah baris daripada hanya satu nilai. Baris-baris yang dikembalikan dapat mencakup satu atau lebih kolom. Anda dapat memanggil UDTF untuk menghasilkan beberapa baris atau kolom. Untuk informasi lebih lanjut, lihat UDTFs.

Daftarkan UDF

Pass parameter UDF

Anda dapat mengonfigurasi parameter UDF di konsol pengembangan Realtime Compute for Apache Flink dan mendapatkan nilai parameter di dalam UDF.

Konfigurasi parameter di konsol dilewatkan ke UDF melalui metode open(FunctionContext context). Metode ini bersifat opsional untuk diimplementasikan dan menyediakan objek FunctionContext yang berisi informasi tentang konteks eksekusi UDF yang sesuai. Untuk melewatkan konfigurasi parameter, lakukan langkah-langkah berikut:

  1. Di halaman O&M > Deployments di konsol pengembangan Realtime Compute for Apache Flink, klik nama penyebaran yang ingin Anda gunakan. Di tab Configuration, klik Edit di pojok kanan atas bagian Parameters. Kemudian, tentukan parameter pipeline.global-job-parameters di bidang Other Configuration. Contoh kode:

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    Hanya konfigurasi yang ditentukan dalam parameter pipeline.global-job-parameters yang dapat diperoleh oleh metode FunctionContext#getJobParameter. Pastikan Anda menentukan semua konfigurasi parameter untuk UDF dalam parameter pipeline.global-job-parameters. Tabel berikut menjelaskan cara mengonfigurasi parameter pipeline.global-job-parameters.

    Langkah

    Deskripsi

    Operasi

    Contoh

    Langkah 1

    Tentukan pasangan key-value.

    Pisahkan key dan value dalam pasangan key-value dengan titik dua (:) dan tutup setiap pasangan key-value dengan tanda kutip tunggal (').

    Catatan
    • Jika key atau value mengandung titik dua (:), tutup key atau value dengan tanda kutip ganda (").

    • Jika key atau value mengandung titik dua (:) dan tanda kutip ganda ("), escape tanda kutip ganda dengan menggunakan dua tanda kutip ganda ("").

    • Pasangan key-value key = k1,value = {hi,hello} didefinisikan sebagai 'k1:{hi,hello}'.

    • Pasangan key-value key = k2,value = str:ing,str:ing didefinisikan sebagai 'k2:"str:ing,str:ing"'.

    • Pasangan key-value key = k3,value = str"ing,str:ing didefinisikan sebagai 'k3:"str""ing,str:ing"'.

    Langkah 2

    Organisasikan pasangan key-value yang telah ditentukan dalam format YAML.

    Tempatkan pasangan key-value yang berbeda di baris yang berbeda dan pisahkan baris dengan koma (,).

    Catatan
    • Garis vertikal (|) harus ditambahkan di awal string multi-baris.

    • Setiap baris string harus memiliki indentasi yang sama.

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. Dalam kode implementasi UDF, panggil metode FunctionContext#getJobParameter untuk mendapatkan isi dari pasangan key-value.

    Contoh kode:

    context.getJobParameter("k1", null); // String {hi,hello} dikembalikan.
    context.getJobParameter("k2", null); // String str:ing,str:ing dikembalikan.
    context.getJobParameter("k3", null); // String str"ing,str:ing dikembalikan.
    context.getJobParameter("pipeline.global-job-parameters", null); // null dikembalikan karena hanya pasangan key-value yang didefinisikan dalam parameter pipeline.global-job-parameters yang dapat diperoleh.

Parameter bernama

Catatan

Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.7 atau lebih baru yang memungkinkan Anda menggunakan parameter bernama untuk mengimplementasikan UDF.

Dalam SQL, nilai parameter harus dilewatkan ke fungsi sesuai dengan urutan parameter yang didefinisikan dan parameter opsional tidak dapat dihilangkan. Jika jumlah parameter besar, memastikan urutan parameter yang benar bisa menjadi tantangan. Untuk memastikan urutan parameter yang benar dan meningkatkan kemudahan penggunaan, Anda dapat menggunakan parameter bernama untuk menentukan hanya parameter yang ingin Anda gunakan. Contoh berikut menunjukkan cara menggunakan parameter bernama. Dalam contoh ini, UDSF bernama ScalarFunction digunakan.

// Implementasikan UDSF di mana parameter kedua dan ketiga bersifat opsional (isOptional = true).
public class MyFuncWithNamedArgs extends ScalarFunction  {
	private static final long serialVersionUID = 1L;

	public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
			@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
			@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {

		if (i2 != null) {
			return "i2#" + i2;
		}
		if (l3 != null) {
			return "l3#" + l3;
		}
		return "default#" + f1;
	}
}

Saat Anda memanggil fungsi dalam SQL, Anda dapat menentukan hanya parameter pertama yang wajib. Anda juga dapat menentukan parameter kedua dan ketiga yang opsional. Contoh kode:

CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';

CREATE temporary TABLE s1 (
    a INT,
    b BIGINT,
    c VARCHAR,
    d VARCHAR,
    PRIMARY KEY(a) NOT ENFORCED
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1'
);

CREATE temporary TABLE sink (
    a INT,
    b VARCHAR,
    c VARCHAR,
    d VARCHAR
) WITH (
    'connector' = 'print'
);

INSERT INTO sink
SELECT a,
    -- Tentukan hanya parameter pertama yang wajib
    MyNamedUdf(f1 => c) arg1_res,
    -- Tentukan parameter pertama yang wajib dan parameter kedua yang opsional.
    MyNamedUdf(f1 => c, f2 => a) arg2_res,
    -- Tentukan parameter pertama yang wajib dan parameter ketiga yang opsional.
    MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;

Referensi