Topik ini menyediakan fungsi agregat yang didefinisikan pengguna (UDAF) untuk menggabungkan beberapa baris data menjadi satu baris dan mengurutkan data berdasarkan kolom tertentu. Dengan menggunakan data terminal jaringan listrik perumahan sebagai contoh, topik ini menunjukkan cara menggunakan UDAF untuk mengagregasi dan mengurutkan data di Konsol Realtime Compute for Apache Flink.
Data contoh
Data dari terminal jaringan listrik perumahan disimpan dalam tabel electric_info, yang mencakup bidang event_id, user_id, event_time, dan status. Data pada bidang status harus diurutkan secara ascending berdasarkan bidang event_time.
electric_info
event_id
user_id
event_time
status
1
1222
2023-06-30 11:14:00
LD
2
1333
2023-06-30 11:12:00
LD
3
1222
2023-06-30 11:11:00
TD
4
1333
2023-06-30 11:12:00
LD
5
1222
2023-06-30 11:15:00
TD
6
1333
2023-06-30 11:18:00
LD
7
1222
2023-06-30 11:19:00
TD
8
1333
2023-06-30 11:10:00
TD
9
1555
2023-06-30 11:16:00
TD
10
1555
2023-06-30 11:17:00
LD
Hasil yang diharapkan
user_id
status
1222
TD,LD,TD,TD
1333
TD,LD,LD,LD
1555
TD,LD
Langkah 1: Siapkan sumber data
Contoh ini menggunakan ApsaraDB RDS.
Buat instans ApsaraDB RDS for MySQL.
CatatanInstans ApsaraDB RDS for MySQL harus berada dalam VPC yang sama dengan ruang kerja Flink. Jika berada di VPC yang berbeda, lihat FAQ konektivitas jaringan.
Buat database bernama electric serta akun istimewa atau akun standar dengan izin baca dan tulis pada database tersebut.
Masuk ke instans ApsaraDB RDS for MySQL menggunakan DMS. Di database electric, buat tabel electric_info dan electric_info_SortListAgg, lalu masukkan data.
CREATE TABLE `electric_info` ( event_id bigint NOT NULL PRIMARY KEY COMMENT 'Event ID', user_id bigint NOT NULL COMMENT 'User ID', event_time timestamp NOT NULL COMMENT 'Event time', status varchar(10) NOT NULL COMMENT 'User terminal status' ); CREATE TABLE `electric_info_SortListAgg` ( user_id bigint NOT NULL PRIMARY KEY COMMENT 'User ID', status_sort varchar(50) NULL COMMENT 'User terminal status sorted in ascending order by event time' ); -- Prepare data INSERT INTO electric_info VALUES (1,1222,'2023-06-30 11:14','LD'), (2,1333,'2023-06-30 11:12','LD'), (3,1222,'2023-06-30 11:11','TD'), (4,1333,'2023-06-30 11:12','LD'), (5,1222,'2023-06-30 11:15','TD'), (6,1333,'2023-06-30 11:18','LD'), (7,1222,'2023-06-30 11:19','TD'), (8,1333,'2023-06-30 11:10','TD'), (9,1555,'2023-06-30 11:16','TD'), (10,1555,'2023-06-30 11:17','LD');
Langkah 2: Daftarkan UDF
Unduh ASI_UDX-1.0-SNAPSHOT.jar.
File pom.xml dikonfigurasi dengan dependensi minimum yang diperlukan untuk user-defined function (UDF) ini di Flink 1.17.1. Untuk informasi lebih lanjut tentang user-defined function, lihat User-defined functions.
Pada contoh ini, ASI_UDAF menggabungkan beberapa baris menjadi satu baris tunggal dan mengurutkannya berdasarkan kolom tertentu. Anda dapat memodifikasi kode berikut sesuai kebutuhan.
package ASI_UDAF; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.AggregateFunction; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; public class ASI_UDAF{ /**Accumulator class*/ public static class AcList { public List<String> list; } /**Aggregate function class*/ public static class SortListAgg extends AggregateFunction<String,AcList> { public String getValue(AcList asc) { /**Sort the data in the list according to a specific rule*/ asc.list.sort(new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]); } }); /**Traverse the sorted list, extract the required fields, and join them into a string*/ List<String> ret = new ArrayList<String>(); Iterator<String> strlist = asc.list.iterator(); while (strlist.hasNext()) { ret.add(strlist.next().split("#")[0]); } String str = StringUtils.join(ret, ','); return str; } /**Method to create an accumulator*/ public AcList createAccumulator() { AcList ac = new AcList(); List<String> list = new ArrayList<String>(); ac.list = list; return ac; } /**Accumulation method: add the input data to the accumulator*/ public void accumulate(AcList acc, String tuple1) { acc.list.add(tuple1); } /**Retraction method*/ public void retract(AcList acc, String num) { } } }Buka halaman pendaftaran UDF.
Anda dapat mendaftarkan UDF agar mudah digunakan kembali dalam pengembangan selanjutnya. Untuk UDF Java, Anda juga dapat mengunggahnya sebagai file dependensi. Untuk informasi lebih lanjut, lihat User-defined aggregate functions (UDAFs).
Masuk ke Konsol Realtime Compute for Apache Flink.
Pada kolom Actions ruang kerja target, klik Console.
Klik .
Pada tab Functions di sebelah kiri, klik Register UDF.

Pada bagian Select File, unggah file JAR yang telah diunduh, lalu klik OK.
CatatanFile JAR UDF Anda diunggah ke direktori sql-artifacts Bucket OSS.
Konsol pengembangan Flink mengurai file JAR UDF Anda untuk memeriksa apakah menggunakan kelas dari antarmuka Flink UDF, UDAF, atau user-defined table-valued function (UDTF). Konsol kemudian secara otomatis mengekstrak nama kelas dan mengisi kolom Function Name.
Pada kotak dialog Manage Functions, klik Create Function.
UDF yang berhasil didaftarkan akan muncul dalam daftar Functions di sisi kiri halaman editor SQL.
Langkah 3: Buat pekerjaan Flink
Pada halaman , klik New.

Klik Blank Stream Draft.
Klik Next.
Pada dialog New Job Draft, konfigurasikan parameter pekerjaan.
Job Parameter
Description
File Name
Nama pekerjaan.
CatatanNama pekerjaan harus unik dalam proyek saat ini.
Storage Location
Lokasi penyimpanan pekerjaan.
Anda juga dapat mengklik ikon
di sebelah folder yang ada untuk membuat subfolder.Engine Version
Versi mesin Flink yang digunakan oleh pekerjaan. Versi ini harus sesuai dengan versi dalam file pom.xml.
Untuk informasi lebih lanjut tentang nomor versi mesin, pemetaan versi, dan tonggak siklus hidup, lihat Engine versions.
Tulis kode DDL dan DML.
--Create the temporary table electric_info. CREATE TEMPORARY TABLE electric_info ( event_id bigint not null, `user_id` bigint not null, event_time timestamp(6) not null, status string not null, primary key(event_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info' ); CREATE TEMPORARY TABLE electric_info_sortlistagg ( `user_id` bigint not null, status_sort varchar(50) not null, primary key(user_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info_sortlistagg' ); --Aggregate the data from the electric_info table and insert it into the electric_info_sortlistagg table. --Pass the string that is concatenated from status and event_time as a parameter to the registered user-defined function ASI_UDAF$SortListAgg. INSERT INTO electric_info_sortlistagg SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING))) FROM electric_info GROUP BY user_id;Tabel berikut menjelaskan parameter-parameter tersebut. Anda dapat memodifikasinya sesuai kebutuhan. Untuk informasi lebih lanjut tentang parameter konektor MySQL, lihat MySQL connector.
Parameter
Description
Remarks
connector
Jenis konektor.
Pada contoh ini, nilainya tetap
mysql.hostname
Alamat IP atau hostname database MySQL.
Atur parameter ini ke Titik akhir pribadi instans ApsaraDB RDS for MySQL.
username
Username untuk menghubungkan ke layanan database MySQL.
Tidak ada.
password
Password untuk menghubungkan ke layanan database MySQL.
Pada contoh ini, variabel bernama mysql_pw digunakan untuk password guna mencegah kebocoran informasi. Untuk informasi lebih lanjut, lihat Project variables.
database-name
Nama database MySQL.
Pada contoh ini, parameter ini diatur ke database electric yang Anda buat di Langkah 1: Siapkan sumber data.
table-name
Nama tabel MySQL.
Pada contoh ini, parameter ini diatur ke electric atau electric_info_sortlistagg.
port
Port layanan database MySQL.
Tidak ada.
(Opsional) Di pojok kanan atas, klik Deep Check dan Debug. Untuk informasi lebih lanjut tentang fitur-fitur ini, lihat Job development map.
Klik Deploy, lalu klik OK.
Pada halaman , temukan pekerjaan target, klik Start pada kolom Actions, lalu pilih Stateless Start.
Langkah 4: Kueri hasil
Di RDS, jalankan pernyataan berikut untuk melihat hasilnya. Status terminal pengguna dalam hasil diurutkan secara ascending berdasarkan waktu event.
SELECT * FROM `electric_info_sortlistagg`;Hasilnya sebagai berikut:

Referensi
Untuk informasi lebih lanjut tentang fungsi bawaan yang didukung Flink, lihat Built-in functions.
Untuk informasi lebih lanjut tentang penerapan dan startup pekerjaan, lihat Deploy a job dan Start a job.
Untuk memodifikasi parameter runtime pekerjaan, lihat Configure job deployment information. Beberapa parameter juga mendukung pembaruan dinamis untuk mengurangi waktu henti layanan akibat penghentian dan restart pekerjaan. Untuk informasi lebih lanjut, lihat Dynamic scaling and parameter updates.
Untuk informasi lebih lanjut tentang penggunaan user-defined function Python dalam pekerjaan SQL, lihat User-defined functions.