All Products
Search
Document Center

Realtime Compute for Apache Flink:Urutkan dan agregasi data dengan UDAF

Last Updated:Mar 10, 2026

Topik ini menyediakan fungsi agregat yang didefinisikan pengguna (UDAF) untuk menggabungkan beberapa baris data menjadi satu baris dan mengurutkan data berdasarkan kolom tertentu. Dengan menggunakan data terminal jaringan listrik perumahan sebagai contoh, topik ini menunjukkan cara menggunakan UDAF untuk mengagregasi dan mengurutkan data di Konsol Realtime Compute for Apache Flink.

Data contoh

Data dari terminal jaringan listrik perumahan disimpan dalam tabel electric_info, yang mencakup bidang event_id, user_id, event_time, dan status. Data pada bidang status harus diurutkan secara ascending berdasarkan bidang event_time.

  • electric_info

    event_id

    user_id

    event_time

    status

    1

    1222

    2023-06-30 11:14:00

    LD

    2

    1333

    2023-06-30 11:12:00

    LD

    3

    1222

    2023-06-30 11:11:00

    TD

    4

    1333

    2023-06-30 11:12:00

    LD

    5

    1222

    2023-06-30 11:15:00

    TD

    6

    1333

    2023-06-30 11:18:00

    LD

    7

    1222

    2023-06-30 11:19:00

    TD

    8

    1333

    2023-06-30 11:10:00

    TD

    9

    1555

    2023-06-30 11:16:00

    TD

    10

    1555

    2023-06-30 11:17:00

    LD

  • Hasil yang diharapkan

    user_id

    status

    1222

    TD,LD,TD,TD

    1333

    TD,LD,LD,LD

    1555

    TD,LD

Langkah 1: Siapkan sumber data

Contoh ini menggunakan ApsaraDB RDS.

  1. Buat instans ApsaraDB RDS for MySQL.

    Catatan

    Instans ApsaraDB RDS for MySQL harus berada dalam VPC yang sama dengan ruang kerja Flink. Jika berada di VPC yang berbeda, lihat FAQ konektivitas jaringan.

  2. Buat database dan akun.

    Buat database bernama electric serta akun istimewa atau akun standar dengan izin baca dan tulis pada database tersebut.

  3. Masuk ke instans ApsaraDB RDS for MySQL menggunakan DMS. Di database electric, buat tabel electric_info dan electric_info_SortListAgg, lalu masukkan data.

    CREATE TABLE `electric_info` (
      event_id bigint NOT NULL PRIMARY KEY COMMENT 'Event ID',
      user_id bigint NOT NULL COMMENT 'User ID', 
      event_time timestamp NOT NULL COMMENT 'Event time',
      status varchar(10) NOT NULL COMMENT 'User terminal status'
    );
    
    CREATE TABLE `electric_info_SortListAgg` (
      user_id bigint NOT NULL PRIMARY KEY COMMENT 'User ID', 
      status_sort varchar(50) NULL COMMENT 'User terminal status sorted in ascending order by event time'
    );
    
    -- Prepare data
    INSERT INTO electric_info VALUES 
    (1,1222,'2023-06-30 11:14','LD'),
    (2,1333,'2023-06-30 11:12','LD'),
    (3,1222,'2023-06-30 11:11','TD'),
    (4,1333,'2023-06-30 11:12','LD'),
    (5,1222,'2023-06-30 11:15','TD'),
    (6,1333,'2023-06-30 11:18','LD'),
    (7,1222,'2023-06-30 11:19','TD'),
    (8,1333,'2023-06-30 11:10','TD'),
    (9,1555,'2023-06-30 11:16','TD'),
    (10,1555,'2023-06-30 11:17','LD');

Langkah 2: Daftarkan UDF

  1. Unduh ASI_UDX-1.0-SNAPSHOT.jar.

    File pom.xml dikonfigurasi dengan dependensi minimum yang diperlukan untuk user-defined function (UDF) ini di Flink 1.17.1. Untuk informasi lebih lanjut tentang user-defined function, lihat User-defined functions.

  2. Pada contoh ini, ASI_UDAF menggabungkan beberapa baris menjadi satu baris tunggal dan mengurutkannya berdasarkan kolom tertentu. Anda dapat memodifikasi kode berikut sesuai kebutuhan.

    package ASI_UDAF;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    public class ASI_UDAF{
    	/**Accumulator class*/
    	public static class AcList {
    		public  List<String> list;
    	}
    
    	/**Aggregate function class*/
    	public static class SortListAgg extends AggregateFunction<String,AcList> {
    		public String getValue(AcList asc) {
    			/**Sort the data in the list according to a specific rule*/
    			asc.list.sort(new Comparator<String>() {
    				@Override
    				public int compare(String o1, String o2) {
    					return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]);
    				}
    			});
    			/**Traverse the sorted list, extract the required fields, and join them into a string*/
    			List<String> ret = new ArrayList<String>();
    			Iterator<String> strlist = asc.list.iterator();
    			while (strlist.hasNext()) {
    				ret.add(strlist.next().split("#")[0]);
    			}
    			String str = StringUtils.join(ret, ',');
    			return str;
    		}
    
    		/**Method to create an accumulator*/
    		public AcList createAccumulator() {
    			AcList ac = new AcList();
    			List<String> list = new ArrayList<String>();
    			ac.list = list;
    			return ac;
    		}
    
    		/**Accumulation method: add the input data to the accumulator*/
    		public void accumulate(AcList acc, String tuple1) {
    			acc.list.add(tuple1);
    		}
    
    		/**Retraction method*/
    		public void retract(AcList acc, String num) {
    		}
    	}
    }
  3. Buka halaman pendaftaran UDF.

    Anda dapat mendaftarkan UDF agar mudah digunakan kembali dalam pengembangan selanjutnya. Untuk UDF Java, Anda juga dapat mengunggahnya sebagai file dependensi. Untuk informasi lebih lanjut, lihat User-defined aggregate functions (UDAFs).

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Pada kolom Actions ruang kerja target, klik Console.

    3. Klik Data Development > ETL.

    4. Pada tab Functions di sebelah kiri, klik Register UDF.

      image.png

  4. Pada bagian Select File, unggah file JAR yang telah diunduh, lalu klik OK.

    注册UDF

    Catatan
    • File JAR UDF Anda diunggah ke direktori sql-artifacts Bucket OSS.

    • Konsol pengembangan Flink mengurai file JAR UDF Anda untuk memeriksa apakah menggunakan kelas dari antarmuka Flink UDF, UDAF, atau user-defined table-valued function (UDTF). Konsol kemudian secara otomatis mengekstrak nama kelas dan mengisi kolom Function Name.

  5. Pada kotak dialog Manage Functions, klik Create Function.

    UDF yang berhasil didaftarkan akan muncul dalam daftar Functions di sisi kiri halaman editor SQL.

Langkah 3: Buat pekerjaan Flink

  1. Pada halaman Data Development > ETL, klik New.

    image.png

  2. Klik Blank Stream Draft.

  3. Klik Next.

  4. Pada dialog New Job Draft, konfigurasikan parameter pekerjaan.

    Job Parameter

    Description

    File Name

    Nama pekerjaan.

    Catatan

    Nama pekerjaan harus unik dalam proyek saat ini.

    Storage Location

    Lokasi penyimpanan pekerjaan.

    Anda juga dapat mengklik ikon 新建文件夹 di sebelah folder yang ada untuk membuat subfolder.

    Engine Version

    Versi mesin Flink yang digunakan oleh pekerjaan. Versi ini harus sesuai dengan versi dalam file pom.xml.

    Untuk informasi lebih lanjut tentang nomor versi mesin, pemetaan versi, dan tonggak siklus hidup, lihat Engine versions.

  5. Tulis kode DDL dan DML.

    --Create the temporary table electric_info.
    CREATE TEMPORARY TABLE electric_info (
      event_id bigint not null,
      `user_id` bigint not null, 
      event_time timestamp(6) not null,
      status string not null,
      primary key(event_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info'
    );
    
    CREATE TEMPORARY TABLE electric_info_sortlistagg (
      `user_id` bigint not null, 
      status_sort varchar(50) not null,
      primary key(user_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info_sortlistagg'
    );
    
    --Aggregate the data from the electric_info table and insert it into the electric_info_sortlistagg table.
    --Pass the string that is concatenated from status and event_time as a parameter to the registered user-defined function ASI_UDAF$SortListAgg.
    INSERT INTO electric_info_sortlistagg 
    SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING)))
    FROM electric_info GROUP BY user_id;

    Tabel berikut menjelaskan parameter-parameter tersebut. Anda dapat memodifikasinya sesuai kebutuhan. Untuk informasi lebih lanjut tentang parameter konektor MySQL, lihat MySQL connector.

    Parameter

    Description

    Remarks

    connector

    Jenis konektor.

    Pada contoh ini, nilainya tetap mysql.

    hostname

    Alamat IP atau hostname database MySQL.

    Atur parameter ini ke Titik akhir pribadi instans ApsaraDB RDS for MySQL.

    username

    Username untuk menghubungkan ke layanan database MySQL.

    Tidak ada.

    password

    Password untuk menghubungkan ke layanan database MySQL.

    Pada contoh ini, variabel bernama mysql_pw digunakan untuk password guna mencegah kebocoran informasi. Untuk informasi lebih lanjut, lihat Project variables.

    database-name

    Nama database MySQL.

    Pada contoh ini, parameter ini diatur ke database electric yang Anda buat di Langkah 1: Siapkan sumber data.

    table-name

    Nama tabel MySQL.

    Pada contoh ini, parameter ini diatur ke electric atau electric_info_sortlistagg.

    port

    Port layanan database MySQL.

    Tidak ada.

  6. (Opsional) Di pojok kanan atas, klik Deep Check dan Debug. Untuk informasi lebih lanjut tentang fitur-fitur ini, lihat Job development map.

  7. Klik Deploy, lalu klik OK.

  8. Pada halaman Operation Center > Jobs, temukan pekerjaan target, klik Start pada kolom Actions, lalu pilih Stateless Start.

Langkah 4: Kueri hasil

Di RDS, jalankan pernyataan berikut untuk melihat hasilnya. Status terminal pengguna dalam hasil diurutkan secara ascending berdasarkan waktu event.

SELECT * FROM `electric_info_sortlistagg`;

Hasilnya sebagai berikut:

image.png

Referensi