全部产品
Search
文档中心

Realtime Compute for Apache Flink:Gunakan UDAF untuk mengurutkan dan menggabungkan data

更新时间:Jul 06, 2025

Fungsi agregat yang didefinisikan pengguna (UDAF) dapat digunakan untuk menggabungkan beberapa baris data menjadi satu baris dan mengurutkan data berdasarkan kolom tertentu. Topik ini menjelaskan cara menggunakan UDAF untuk menggabungkan dan mengurutkan data di konsol Realtime Compute for Apache Flink. Contoh ini menggunakan data terminal jaringan listrik perumahan.

Data sampel

Data terminal jaringan listrik perumahan disimpan dalam tabel electric_info, yang mencakup kolom event_id, user_id, event_time, dan status. Data pada kolom status perlu diurutkan secara menaik berdasarkan kolom event_time.

  • electric_info

    event_id

    user_id

    event_time

    status

    1

    1222

    2023-06-30 11:14:00

    LD

    2

    1333

    2023-06-30 11:12:00

    LD

    3

    1222

    2023-06-30 11:11:00

    TD

    4

    1333

    2023-06-30 11:12:00

    LD

    5

    1222

    2023-06-30 11:15:00

    TD

    6

    1333

    2023-06-30 11:18:00

    LD

    7

    1222

    2023-06-30 11:19:00

    TD

    8

    1333

    2023-06-30 11:10:00

    TD

    9

    1555

    2023-06-30 11:16:00

    TD

    10

    1555

    2023-06-30 11:17:00

    LD

  • Hasil yang Diharapkan

    user_id

    status

    1222

    TD,LD,TD,TD

    1333

    TD,LD,LD,LD

    1555

    TD,LD

Langkah 1: Siapkan sumber data

Contoh ini menggunakan sumber data ApsaraDB RDS.

  1. Buat instance ApsaraDB RDS for MySQL.

    Catatan

    Disarankan membuat instance ApsaraDB RDS for MySQL di virtual private cloud (VPC) yang sama dengan ruang kerja Realtime Compute for Apache Flink. Jika instance ApsaraDB RDS for MySQL dan ruang kerja Realtime Compute for Apache Flink berada di VPC yang berbeda, Anda harus membangun koneksi antara keduanya. Untuk informasi lebih lanjut, lihat FAQ tentang Konektivitas Jaringan.

  2. Buat database dan akun.

    Buat database bernama electric dan buat akun dengan hak istimewa atau akun standar yang memiliki izin baca dan tulis pada database electric.

  3. Masuk ke instance ApsaraDB RDS for MySQL menggunakan Data Management (DMS), buat tabel bernama electric_info dan electric_info_SortListAgg di database electric, dan masukkan data ke tabel electric_info.

    CREATE TABLE `electric_info` (
      event_id bigint NOT NULL PRIMARY KEY COMMENT 'ID Event',
      user_id bigint NOT NULL COMMENT 'ID Pengguna', 
      event_time timestamp NOT NULL COMMENT 'Waktu Event',
      status varchar(10) NOT NULL COMMENT 'Status terminal pengguna'
    );
    
    CREATE TABLE `electric_info_SortListAgg` (
      user_id bigint NOT NULL PRIMARY KEY COMMENT 'ID Pengguna', 
      status_sort varchar(50) NULL COMMENT 'Status terminal pengguna diurutkan secara menaik berdasarkan waktu event'
    );
    
    -- Persiapkan data.
    INSERT INTO electric_info VALUES 
    (1,1222,'2023-06-30 11:14','LD'),
    (2,1333,'2023-06-30 11:12','LD'),
    (3,1222,'2023-06-30 11:11','TD'),
    (4,1333,'2023-06-30 11:12','LD'),
    (5,1222,'2023-06-30 11:15','TD'),
    (6,1333,'2023-06-30 11:18','LD'),
    (7,1222,'2023-06-30 11:19','TD'),
    (8,1333,'2023-06-30 11:10','TD'),
    (9,1555,'2023-06-30 11:16','TD'),
    (10,1555,'2023-06-30 11:17','LD');

Langkah 2: Daftarkan UDF

  1. Unduh paket ASI_UDX-1.0-SNAPSHOT.jar.

    Informasi dependensi minimum yang dibutuhkan oleh fungsi yang didefinisikan pengguna (UDF) di Flink 1.17.1 dikonfigurasikan dalam file pom.xml. Untuk informasi lebih lanjut tentang cara menggunakan UDF, lihat UDFs.

  2. Gunakan ASI_UDAF untuk menggabungkan beberapa baris data menjadi satu baris dan mengurutkan data berdasarkan kolom yang ditentukan. Contoh kode berikut menunjukkan sebuah contoh. Anda dapat memodifikasi kode sesuai dengan kebutuhan bisnis Anda.

    package ASI_UDAF;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    public class ASI_UDAF{
    	/**Kelas Akumulator*/
    	public static class AcList {
    		public  List<String> list;
    	}
    
    	/**Kelas fungsi agregat*/
    	public static class SortListAgg extends AggregateFunction<String,AcList> {
    		public String getValue(AcList asc) {
    			/**Urutkan data dalam daftar sesuai aturan tertentu*/
    			asc.list.sort(new Comparator<String>() {
    				@Override
    				public int compare(String o1, String o2) {
    					return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]);
    				}
    			});
    			/**Iterasi daftar yang telah diurutkan, ekstrak bidang yang diperlukan, dan gabungkan menjadi string*/
    			List<String> ret = new ArrayList<String>();
    			Iterator<String> strlist = asc.list.iterator();
    			while (strlist.hasNext()) {
    				ret.add(strlist.next().split("#")[0]);
    			}
    			String str = StringUtils.join(ret, ',');
    			return str;
    		}
    
    		/**Metode untuk membuat akumulator*/
    		public AcList createAccumulator() {
    			AcList ac = new AcList();
    			List<String> list = new ArrayList<String>();
    			ac.list = list;
    			return ac;
    		}
    
    		/**Metode akumulasi: tambahkan data input ke akumulator*/
    		public void accumulate(AcList acc, String tuple1) {
    			acc.list.add(tuple1);
    		}
    
    		/**Metode penarikan kembali*/
    		public void retract(AcList acc, String num) {
    		}
    	}
    }
  3. Pergi ke kotak dialog Register UDF Artifact.

    Setelah Anda mendaftarkan UDF, kode UDF dapat digunakan kembali untuk pengembangan selanjutnya. Untuk Java UDF, Anda juga dapat mengunggah file JAR menggunakan dependensi untuk UDF. Untuk informasi lebih lanjut, lihat UDAFs.

    1. Masuk ke konsol Realtime Compute for Apache Flink.

    2. Temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.

    3. Di panel navigasi di sebelah kiri, klik Development > ETL.

    4. Di panel sebelah kiri halaman Editor SQL, klik tab UDFs dan klik Register UDF Artifact.

      image.png

  4. Klik Klik untuk memilih di bagian Select a file untuk mengunggah file JAR yang diperoleh di Langkah 1, dan klik Confirm.

    注册UDF

    Catatan
    • File JAR UDF diunggah ke direktori sql-artifacts dari Object Storage Service (OSS) bucket yang terkait dengan ruang kerja.

    • Realtime Compute for Apache Flink mem-parsing file JAR UDF dan memeriksa apakah kelas antarmuka Flink UDF, UDAF, dan fungsi tabel yang didefinisikan pengguna (UDTF) digunakan dalam file. Kemudian, Realtime Compute for Apache Flink secara otomatis mengekstrak nama kelas dan menentukan nama kelas di bidang Nama Fungsi.

  5. Di kotak dialog Manage Functions, klik Create Functions.

    Di tab UDFs di panel sebelah kiri halaman Editor SQL, Anda dapat melihat UDF yang telah didaftarkan.

Langkah 3: Buat draft Realtime Compute for Apache Flink

  1. Di panel navigasi di sebelah kiri, klik Development > ETL. Di sudut kiri atas halaman Editor SQL, klik New.

    image.png

  2. Di tab Skrip SQL kotak dialog Draft Baru, klik Blank Stream Draft dan klik Berikutnya.

  3. Klik Next.

  4. Di halaman yang muncul, konfigurasikan parameter draft. Tabel berikut menjelaskan parameter-parameter tersebut.

    Parameter

    Deskripsi

    Name

    Nama draft yang ingin Anda buat.

    Catatan

    Nama draft harus unik di namespace saat ini.

    Location

    Folder tempat file kode draft disimpan.

    Anda juga dapat mengklik ikon 新建文件夹 di sebelah kanan folder yang ada untuk membuat subfolder.

    Engine Version

    Versi mesin deployment Realtime Compute for Apache Flink. Nilainya harus sama dengan versi dalam file pom.xml yang digunakan.

    Untuk informasi lebih lanjut tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi Mesin.

  5. Tulis pernyataan DDL dan DML.

    -- Buat tabel sementara bernama electric_info.
    CREATE TEMPORARY TABLE electric_info (
      event_id bigint not null,
      `user_id` bigint not null, 
      event_time timestamp(6) not null,
      status string not null,
      primary key(event_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info'
    );
    
    CREATE TEMPORARY TABLE electric_info_sortlistagg (
      `user_id` bigint not null, 
      status_sort varchar(50) not null,
      primary key(user_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info_sortlistagg'
    );
    
    -- Agregasi data di tabel electric_info dan sisipkan data ke tabel electric_info_sortlistagg.
    -- Gabungkan bidang status dan event_time menjadi string baru dan lewatkan string baru sebagai parameter ke UDF terdaftar bernama ASI_UDAF$SortListAgg.
    INSERT INTO electric_info_sortlistagg 
    SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING)))
    FROM electric_info GROUP BY user_id;

    Tabel berikut menjelaskan parameter-parameter tersebut. Anda dapat memodifikasi parameter berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut tentang parameter konektor MySQL, lihat Konektor MySQL.

    Parameter

    Deskripsi

    Catatan

    connector

    Jenis konektor.

    Dalam contoh ini, nilai parameter ini adalah mysql.

    hostname

    Alamat IP atau nama host yang digunakan untuk mengakses database MySQL.

    Dalam contoh ini, endpoint internal instance ApsaraDB RDS for MySQL digunakan.

    username

    Nama pengguna yang digunakan untuk mengakses database MySQL.

    N/A

    password

    Kata sandi yang digunakan untuk mengakses database MySQL.

    Dalam contoh ini, kunci bernama mysql_pw digunakan untuk melindungi kata sandi agar informasi tidak bocor. Untuk informasi lebih lanjut, lihat Kelola variabel.

    database-name

    Nama database MySQL yang ingin Anda akses.

    Dalam contoh ini, database electric yang dibuat di Langkah 1: Siapkan sumber data digunakan.

    table-name

    Nama tabel MySQL.

    Dalam contoh ini, nama tabel adalah electric atau electric_info_sortlistagg.

    port

    Port yang digunakan untuk mengakses database MySQL.

    N/A

  6. Opsional. Klik Validate dan Debug secara berurutan di sudut kanan atas halaman Editor SQL. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL.

  7. Klik Deploy, lalu klik Confirm.

  8. Di halaman O&M > Deployments, temukan penyebaran yang ingin Anda kelola dan klik Start di kolom Actions. Di panel Mulai Pekerjaan, pilih Initial Mode.

Langkah 4: Lihat hasilnya

Di konsol DMS untuk instance ApsaraDB RDS for MySQL, jalankan pernyataan berikut di tab SQLConsole untuk melihat hasil pengurutan status terminal pengguna:

SELECT * FROM `electric_info_sortlistagg`;

Gambar berikut menunjukkan hasil kueri.

image.png

Referensi

  • Untuk informasi lebih lanjut tentang fungsi bawaan yang didukung oleh Realtime Compute for Apache Flink, lihat Fungsi Bawaan.

  • Untuk informasi lebih lanjut tentang cara membuat penyebaran, lihat Buat Penyebaran. Untuk informasi lebih lanjut tentang cara memulai penyebaran, lihat Mulai Penyebaran.

  • Untuk informasi lebih lanjut tentang cara memodifikasi konfigurasi parameter penyebaran, lihat Konfigurasikan Penyebaran. Anda dapat memperbarui konfigurasi parameter tertentu dari penyebaran secara dinamis. Ini mengurangi waktu gangguan layanan yang disebabkan oleh dimulainya dan pembatalan penyebaran. Untuk informasi lebih lanjut, lihat Perbarui Konfigurasi Parameter secara Dinamis untuk Penskalaan Dinamis.

  • Untuk informasi lebih lanjut tentang cara menggunakan UDF Python dalam penyebaran SQL, lihat UDFs.