Realtime Compute for Apache Flink memungkinkan Anda menggunakan fungsi yang ditentukan pengguna (UDF) berbasis Java dalam penyebaran SQL Flink. Topik ini menjelaskan tipe-tipe Java UDF serta cara melewatkan parameter ke UDF dan langkah-langkah pencegahan yang perlu dipertimbangkan saat mengembangkannya.
Langkah-langkah Pencegahan
UDF dapat menyebabkan konflik dependensi dalam file JAR yang dikemas. Perhatikan hal-hal berikut saat mengembangkan UDF:
Pastikan versi Realtime Compute for Apache Flink yang Anda konfigurasikan untuk penyebaran SQL sesuai dengan versi Apache Flink yang Anda tentukan dalam file pom.xml.
Tentukan
<scope>provided</scope>untuk dependensi terkait Apache Flink.Gunakan Plugin Shade untuk mengemas dependensi pihak ketiga lainnya. Untuk informasi lebih lanjut, lihat Apache Maven Shade Plugin.
Untuk informasi tentang cara menyelesaikan konflik dependensi, lihat FAQ tentang manajemen dan operasi ruang kerja serta namespace.
Pemanggilan UDF secara sering dalam penyebaran SQL dapat menyebabkan timeout. Kami merekomendasikan agar Anda mengunggah file JAR dari UDF sebagai dependensi tambahan dari penyebaran dan menggunakan pernyataan
CREATE TEMPORARY FUNCTIONuntuk mendeklarasikan UDF. Contoh:CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';.
Tipe UDF
Tabel berikut menjelaskan tipe UDF yang didukung di Realtime Compute for Apache Flink.
Tipe | Deskripsi |
User Defined Scalar Function (UDSF) | UDSF memetakan nol atau lebih nilai skalar menjadi nilai skalar baru. Pemetaan satu-satu dibuat antara input dan output. Anda dapat memanggil UDSF untuk menghasilkan nilai baru berdasarkan rekaman data. Untuk informasi lebih lanjut, lihat UDSFs. |
Fungsi agregat yang ditentukan pengguna (UDAF) | UDAF menggabungkan beberapa rekaman data menjadi satu rekaman data. Pemetaan banyak-ke-satu dibuat antara input dan output. Untuk informasi lebih lanjut, lihat UDAFs. |
Fungsi tabel yang ditentukan pengguna (UDTF) | UDTF menerima nol atau lebih nilai skalar sebagai parameter input. Parameter-parameter ini bisa memiliki panjang variabel. UDTF mirip dengan UDSF, kecuali UDTF dapat mengembalikan sejumlah baris daripada hanya satu nilai. Baris-baris yang dikembalikan dapat mencakup satu atau lebih kolom. Anda dapat memanggil UDTF untuk menghasilkan beberapa baris atau kolom. Untuk informasi lebih lanjut, lihat UDTFs. |
Daftarkan UDF
Untuk informasi tentang cara mendaftarkan UDF secara global, lihat Daftarkan catalog UDF.
Untuk informasi tentang cara mendaftarkan UDF untuk penyebaran tunggal, lihat Daftarkan UDF tingkat penyebaran.
Pass parameter UDF
Anda dapat mengonfigurasi parameter UDF di konsol pengembangan Realtime Compute for Apache Flink dan mendapatkan nilai parameter di dalam UDF.
Konfigurasi parameter di konsol dilewatkan ke UDF melalui metode open(FunctionContext context). Metode ini bersifat opsional untuk diimplementasikan dan menyediakan objek FunctionContext yang berisi informasi tentang konteks eksekusi UDF yang sesuai. Untuk melewatkan konfigurasi parameter, lakukan langkah-langkah berikut:
Di halaman di konsol pengembangan Realtime Compute for Apache Flink, klik nama penyebaran yang ingin Anda gunakan. Di tab Configuration, klik Edit di pojok kanan atas bagian Parameters. Kemudian, tentukan parameter pipeline.global-job-parameters di bidang Other Configuration. Contoh kode:
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'Hanya konfigurasi yang ditentukan dalam parameter pipeline.global-job-parameters yang dapat diperoleh oleh metode FunctionContext#getJobParameter. Pastikan Anda menentukan semua konfigurasi parameter untuk UDF dalam parameter pipeline.global-job-parameters. Tabel berikut menjelaskan cara mengonfigurasi parameter pipeline.global-job-parameters.
Langkah
Deskripsi
Operasi
Contoh
Langkah 1
Tentukan pasangan key-value.
Pisahkan key dan value dalam pasangan key-value dengan titik dua (:) dan tutup setiap pasangan key-value dengan tanda kutip tunggal (').
CatatanJika key atau value mengandung titik dua (:), tutup key atau value dengan tanda kutip ganda (").
Jika key atau value mengandung titik dua (:) dan tanda kutip ganda ("), escape tanda kutip ganda dengan menggunakan dua tanda kutip ganda ("").
Pasangan key-value
key = k1,value = {hi,hello}didefinisikan sebagai'k1:{hi,hello}'.Pasangan key-value
key = k2,value = str:ing,str:ingdidefinisikan sebagai'k2:"str:ing,str:ing"'.Pasangan key-value
key = k3,value = str"ing,str:ingdidefinisikan sebagai'k3:"str""ing,str:ing"'.
Langkah 2
Organisasikan pasangan key-value yang telah ditentukan dalam format YAML.
Tempatkan pasangan key-value yang berbeda di baris yang berbeda dan pisahkan baris dengan koma (,).
CatatanGaris vertikal (|) harus ditambahkan di awal string multi-baris.
Setiap baris string harus memiliki indentasi yang sama.
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'Dalam kode implementasi UDF, panggil metode FunctionContext#getJobParameter untuk mendapatkan isi dari pasangan key-value.
Contoh kode:
context.getJobParameter("k1", null); // String {hi,hello} dikembalikan. context.getJobParameter("k2", null); // String str:ing,str:ing dikembalikan. context.getJobParameter("k3", null); // String str"ing,str:ing dikembalikan. context.getJobParameter("pipeline.global-job-parameters", null); // null dikembalikan karena hanya pasangan key-value yang didefinisikan dalam parameter pipeline.global-job-parameters yang dapat diperoleh.
Parameter bernama
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.7 atau lebih baru yang memungkinkan Anda menggunakan parameter bernama untuk mengimplementasikan UDF.
Dalam SQL, nilai parameter harus dilewatkan ke fungsi sesuai dengan urutan parameter yang didefinisikan dan parameter opsional tidak dapat dihilangkan. Jika jumlah parameter besar, memastikan urutan parameter yang benar bisa menjadi tantangan. Untuk memastikan urutan parameter yang benar dan meningkatkan kemudahan penggunaan, Anda dapat menggunakan parameter bernama untuk menentukan hanya parameter yang ingin Anda gunakan. Contoh berikut menunjukkan cara menggunakan parameter bernama. Dalam contoh ini, UDSF bernama ScalarFunction digunakan.
// Implementasikan UDSF di mana parameter kedua dan ketiga bersifat opsional (isOptional = true).
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}Saat Anda memanggil fungsi dalam SQL, Anda dapat menentukan hanya parameter pertama yang wajib. Anda juga dapat menentukan parameter kedua dan ketiga yang opsional. Contoh kode:
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- Tentukan hanya parameter pertama yang wajib
MyNamedUdf(f1 => c) arg1_res,
-- Tentukan parameter pertama yang wajib dan parameter kedua yang opsional.
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- Tentukan parameter pertama yang wajib dan parameter ketiga yang opsional.
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;Referensi
Untuk informasi tentang cara menggunakan UDAF untuk mengurutkan dan menggabungkan data, lihat Gunakan UDAF untuk mengurutkan dan menggabungkan data.
Untuk informasi tentang cara mengembangkan dan menggunakan Java UDF, lihat UDAFs, UDSFs, dan UDTFs.
Untuk informasi tentang cara men-debug dan mengoptimalkan Python UDF, lihat Ikhtisar.
Untuk informasi tentang cara mengembangkan dan menggunakan Python UDF, lihat UDAFs, UDSFs, dan UDTFs.