Fungsi agregat yang didefinisikan pengguna (UDAF) dapat digunakan untuk menggabungkan beberapa baris data menjadi satu baris dan mengurutkan data berdasarkan kolom tertentu. Topik ini menjelaskan cara menggunakan UDAF untuk menggabungkan dan mengurutkan data di konsol Realtime Compute for Apache Flink. Contoh ini menggunakan data terminal jaringan listrik perumahan.
Data sampel
Data terminal jaringan listrik perumahan disimpan dalam tabel electric_info, yang mencakup kolom event_id, user_id, event_time, dan status. Data pada kolom status perlu diurutkan secara menaik berdasarkan kolom event_time.
electric_info
event_id
user_id
event_time
status
1
1222
2023-06-30 11:14:00
LD
2
1333
2023-06-30 11:12:00
LD
3
1222
2023-06-30 11:11:00
TD
4
1333
2023-06-30 11:12:00
LD
5
1222
2023-06-30 11:15:00
TD
6
1333
2023-06-30 11:18:00
LD
7
1222
2023-06-30 11:19:00
TD
8
1333
2023-06-30 11:10:00
TD
9
1555
2023-06-30 11:16:00
TD
10
1555
2023-06-30 11:17:00
LD
Hasil yang Diharapkan
user_id
status
1222
TD,LD,TD,TD
1333
TD,LD,LD,LD
1555
TD,LD
Langkah 1: Siapkan sumber data
Contoh ini menggunakan sumber data ApsaraDB RDS.
Buat instance ApsaraDB RDS for MySQL.
CatatanDisarankan membuat instance ApsaraDB RDS for MySQL di virtual private cloud (VPC) yang sama dengan ruang kerja Realtime Compute for Apache Flink. Jika instance ApsaraDB RDS for MySQL dan ruang kerja Realtime Compute for Apache Flink berada di VPC yang berbeda, Anda harus membangun koneksi antara keduanya. Untuk informasi lebih lanjut, lihat FAQ tentang Konektivitas Jaringan.
Buat database bernama electric dan buat akun dengan hak istimewa atau akun standar yang memiliki izin baca dan tulis pada database electric.
Masuk ke instance ApsaraDB RDS for MySQL menggunakan Data Management (DMS), buat tabel bernama electric_info dan electric_info_SortListAgg di database electric, dan masukkan data ke tabel electric_info.
CREATE TABLE `electric_info` ( event_id bigint NOT NULL PRIMARY KEY COMMENT 'ID Event', user_id bigint NOT NULL COMMENT 'ID Pengguna', event_time timestamp NOT NULL COMMENT 'Waktu Event', status varchar(10) NOT NULL COMMENT 'Status terminal pengguna' ); CREATE TABLE `electric_info_SortListAgg` ( user_id bigint NOT NULL PRIMARY KEY COMMENT 'ID Pengguna', status_sort varchar(50) NULL COMMENT 'Status terminal pengguna diurutkan secara menaik berdasarkan waktu event' ); -- Persiapkan data. INSERT INTO electric_info VALUES (1,1222,'2023-06-30 11:14','LD'), (2,1333,'2023-06-30 11:12','LD'), (3,1222,'2023-06-30 11:11','TD'), (4,1333,'2023-06-30 11:12','LD'), (5,1222,'2023-06-30 11:15','TD'), (6,1333,'2023-06-30 11:18','LD'), (7,1222,'2023-06-30 11:19','TD'), (8,1333,'2023-06-30 11:10','TD'), (9,1555,'2023-06-30 11:16','TD'), (10,1555,'2023-06-30 11:17','LD');
Langkah 2: Daftarkan UDF
Unduh paket ASI_UDX-1.0-SNAPSHOT.jar.
Informasi dependensi minimum yang dibutuhkan oleh fungsi yang didefinisikan pengguna (UDF) di Flink 1.17.1 dikonfigurasikan dalam file pom.xml. Untuk informasi lebih lanjut tentang cara menggunakan UDF, lihat UDFs.
Gunakan ASI_UDAF untuk menggabungkan beberapa baris data menjadi satu baris dan mengurutkan data berdasarkan kolom yang ditentukan. Contoh kode berikut menunjukkan sebuah contoh. Anda dapat memodifikasi kode sesuai dengan kebutuhan bisnis Anda.
package ASI_UDAF; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.AggregateFunction; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; public class ASI_UDAF{ /**Kelas Akumulator*/ public static class AcList { public List<String> list; } /**Kelas fungsi agregat*/ public static class SortListAgg extends AggregateFunction<String,AcList> { public String getValue(AcList asc) { /**Urutkan data dalam daftar sesuai aturan tertentu*/ asc.list.sort(new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]); } }); /**Iterasi daftar yang telah diurutkan, ekstrak bidang yang diperlukan, dan gabungkan menjadi string*/ List<String> ret = new ArrayList<String>(); Iterator<String> strlist = asc.list.iterator(); while (strlist.hasNext()) { ret.add(strlist.next().split("#")[0]); } String str = StringUtils.join(ret, ','); return str; } /**Metode untuk membuat akumulator*/ public AcList createAccumulator() { AcList ac = new AcList(); List<String> list = new ArrayList<String>(); ac.list = list; return ac; } /**Metode akumulasi: tambahkan data input ke akumulator*/ public void accumulate(AcList acc, String tuple1) { acc.list.add(tuple1); } /**Metode penarikan kembali*/ public void retract(AcList acc, String num) { } } }Pergi ke kotak dialog Register UDF Artifact.
Setelah Anda mendaftarkan UDF, kode UDF dapat digunakan kembali untuk pengembangan selanjutnya. Untuk Java UDF, Anda juga dapat mengunggah file JAR menggunakan dependensi untuk UDF. Untuk informasi lebih lanjut, lihat UDAFs.
Masuk ke konsol Realtime Compute for Apache Flink.
Temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi di sebelah kiri, klik .
Di panel sebelah kiri halaman Editor SQL, klik tab UDFs dan klik Register UDF Artifact.

Klik Klik untuk memilih di bagian Select a file untuk mengunggah file JAR yang diperoleh di Langkah 1, dan klik Confirm.
CatatanFile JAR UDF diunggah ke direktori sql-artifacts dari Object Storage Service (OSS) bucket yang terkait dengan ruang kerja.
Realtime Compute for Apache Flink mem-parsing file JAR UDF dan memeriksa apakah kelas antarmuka Flink UDF, UDAF, dan fungsi tabel yang didefinisikan pengguna (UDTF) digunakan dalam file. Kemudian, Realtime Compute for Apache Flink secara otomatis mengekstrak nama kelas dan menentukan nama kelas di bidang Nama Fungsi.
Di kotak dialog Manage Functions, klik Create Functions.
Di tab UDFs di panel sebelah kiri halaman Editor SQL, Anda dapat melihat UDF yang telah didaftarkan.
Langkah 3: Buat draft Realtime Compute for Apache Flink
Di panel navigasi di sebelah kiri, klik . Di sudut kiri atas halaman Editor SQL, klik New.

Di tab Skrip SQL kotak dialog Draft Baru, klik Blank Stream Draft dan klik Berikutnya.
Klik Next.
Di halaman yang muncul, konfigurasikan parameter draft. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Name
Nama draft yang ingin Anda buat.
CatatanNama draft harus unik di namespace saat ini.
Location
Folder tempat file kode draft disimpan.
Anda juga dapat mengklik ikon
di sebelah kanan folder yang ada untuk membuat subfolder. Engine Version
Versi mesin deployment Realtime Compute for Apache Flink. Nilainya harus sama dengan versi dalam file pom.xml yang digunakan.
Untuk informasi lebih lanjut tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi Mesin.
Tulis pernyataan DDL dan DML.
-- Buat tabel sementara bernama electric_info. CREATE TEMPORARY TABLE electric_info ( event_id bigint not null, `user_id` bigint not null, event_time timestamp(6) not null, status string not null, primary key(event_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info' ); CREATE TEMPORARY TABLE electric_info_sortlistagg ( `user_id` bigint not null, status_sort varchar(50) not null, primary key(user_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info_sortlistagg' ); -- Agregasi data di tabel electric_info dan sisipkan data ke tabel electric_info_sortlistagg. -- Gabungkan bidang status dan event_time menjadi string baru dan lewatkan string baru sebagai parameter ke UDF terdaftar bernama ASI_UDAF$SortListAgg. INSERT INTO electric_info_sortlistagg SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING))) FROM electric_info GROUP BY user_id;Tabel berikut menjelaskan parameter-parameter tersebut. Anda dapat memodifikasi parameter berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut tentang parameter konektor MySQL, lihat Konektor MySQL.
Parameter
Deskripsi
Catatan
connector
Jenis konektor.
Dalam contoh ini, nilai parameter ini adalah
mysql.hostname
Alamat IP atau nama host yang digunakan untuk mengakses database MySQL.
Dalam contoh ini, endpoint internal instance ApsaraDB RDS for MySQL digunakan.
username
Nama pengguna yang digunakan untuk mengakses database MySQL.
N/A
password
Kata sandi yang digunakan untuk mengakses database MySQL.
Dalam contoh ini, kunci bernama mysql_pw digunakan untuk melindungi kata sandi agar informasi tidak bocor. Untuk informasi lebih lanjut, lihat Kelola variabel.
database-name
Nama database MySQL yang ingin Anda akses.
Dalam contoh ini, database electric yang dibuat di Langkah 1: Siapkan sumber data digunakan.
table-name
Nama tabel MySQL.
Dalam contoh ini, nama tabel adalah electric atau electric_info_sortlistagg.
port
Port yang digunakan untuk mengakses database MySQL.
N/A
Opsional. Klik Validate dan Debug secara berurutan di sudut kanan atas halaman Editor SQL. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL.
Klik Deploy, lalu klik Confirm.
Di halaman , temukan penyebaran yang ingin Anda kelola dan klik Start di kolom Actions. Di panel Mulai Pekerjaan, pilih Initial Mode.
Langkah 4: Lihat hasilnya
Di konsol DMS untuk instance ApsaraDB RDS for MySQL, jalankan pernyataan berikut di tab SQLConsole untuk melihat hasil pengurutan status terminal pengguna:
SELECT * FROM `electric_info_sortlistagg`;Gambar berikut menunjukkan hasil kueri.

Referensi
Untuk informasi lebih lanjut tentang fungsi bawaan yang didukung oleh Realtime Compute for Apache Flink, lihat Fungsi Bawaan.
Untuk informasi lebih lanjut tentang cara membuat penyebaran, lihat Buat Penyebaran. Untuk informasi lebih lanjut tentang cara memulai penyebaran, lihat Mulai Penyebaran.
Untuk informasi lebih lanjut tentang cara memodifikasi konfigurasi parameter penyebaran, lihat Konfigurasikan Penyebaran. Anda dapat memperbarui konfigurasi parameter tertentu dari penyebaran secara dinamis. Ini mengurangi waktu gangguan layanan yang disebabkan oleh dimulainya dan pembatalan penyebaran. Untuk informasi lebih lanjut, lihat Perbarui Konfigurasi Parameter secara Dinamis untuk Penskalaan Dinamis.
Untuk informasi lebih lanjut tentang cara menggunakan UDF Python dalam penyebaran SQL, lihat UDFs.