このトピックでは、複数のデータ行を単一の行にマージし、指定した列でデータをソートするユーザー定義集約関数 (UDAF) について説明します。住宅電力グリッド端末データを例に、Realtime Compute for Apache Flink コンソールで UDAF を使用してデータを集約およびソートする方法を示します。
サンプルデータ
住宅電力グリッド端末からのデータは、electric_info テーブルに保存されます。このテーブルには、event_id、user_id、event_time、および status フィールドが含まれています。status フィールドのデータは、event_time フィールドに基づいて昇順にソートする必要があります。
electric_info
event_id
user_id
event_time
status
1
1222
2023-06-30 11:14:00
LD
2
1333
2023-06-30 11:12:00
LD
3
1222
2023-06-30 11:11:00
TD
4
1333
2023-06-30 11:12:00
LD
5
1222
2023-06-30 11:15:00
TD
6
1333
2023-06-30 11:18:00
LD
7
1222
2023-06-30 11:19:00
TD
8
1333
2023-06-30 11:10:00
TD
9
1555
2023-06-30 11:16:00
TD
10
1555
2023-06-30 11:17:00
LD
期待される結果
user_id
status
1222
TD,LD,TD,TD
1333
TD,LD,LD,LD
1555
TD,LD
ステップ 1: データソースの準備
この例では、ApsaraDB RDS を使用します。
ApsaraDB RDS for MySQL インスタンスを作成します。
説明ApsaraDB RDS for MySQL インスタンスは、Flink ワークスペースと同じ VPC に存在する必要があります。異なる VPC に存在する場合は、「ネットワーク接続のよくある質問」をご参照ください。
データベースとアカウントを作成します。
electric という名前のデータベースと、electric データベースに対する読み取りおよび書き込み権限を持つ特権アカウントまたは標準アカウントを作成します。
DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインします。electric データベースで、electric_info および electric_info_SortListAgg テーブルを作成し、データを挿入します。
CREATE TABLE `electric_info` ( event_id bigint NOT NULL PRIMARY KEY COMMENT 'Event ID', user_id bigint NOT NULL COMMENT 'User ID', event_time timestamp NOT NULL COMMENT 'Event time', status varchar(10) NOT NULL COMMENT 'User terminal status' ); CREATE TABLE `electric_info_SortListAgg` ( user_id bigint NOT NULL PRIMARY KEY COMMENT 'User ID', status_sort varchar(50) NULL COMMENT 'User terminal status sorted in ascending order by event time' ); -- Prepare data INSERT INTO electric_info VALUES (1,1222,'2023-06-30 11:14','LD'), (2,1333,'2023-06-30 11:12','LD'), (3,1222,'2023-06-30 11:11','TD'), (4,1333,'2023-06-30 11:12','LD'), (5,1222,'2023-06-30 11:15','TD'), (6,1333,'2023-06-30 11:18','LD'), (7,1222,'2023-06-30 11:19','TD'), (8,1333,'2023-06-30 11:10','TD'), (9,1555,'2023-06-30 11:16','TD'), (10,1555,'2023-06-30 11:17','LD');
ステップ 2: UDF の登録
ASI_UDX-1.0-SNAPSHOT.jar をダウンロードします。
pom.xml ファイルは、Flink 1.17.1 でこのユーザー定義関数 (UDF) に必要な最小限の依存関係で構成されています。ユーザー定義関数の詳細については、「ユーザー定義関数」をご参照ください。
この例では、ASI_UDAF は複数の行を単一の行にマージし、指定した列でソートします。必要に応じて、次のコードを変更できます。
package ASI_UDAF; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.AggregateFunction; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; public class ASI_UDAF{ /**Accumulator class*/ public static class AcList { public List<String> list; } /**Aggregate function class*/ public static class SortListAgg extends AggregateFunction<String,AcList> { public String getValue(AcList asc) { /**Sort the data in the list according to a specific rule*/ asc.list.sort(new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]); } }); /**Traverse the sorted list, extract the required fields, and join them into a string*/ List<String> ret = new ArrayList<String>(); Iterator<String> strlist = asc.list.iterator(); while (strlist.hasNext()) { ret.add(strlist.next().split("#")[0]); } String str = StringUtils.join(ret, ','); return str; } /**Method to create an accumulator*/ public AcList createAccumulator() { AcList ac = new AcList(); List<String> list = new ArrayList<String>(); ac.list = list; return ac; } /**Accumulation method: add the input data to the accumulator*/ public void accumulate(AcList acc, String tuple1) { acc.list.add(tuple1); } /**Retraction method*/ public void retract(AcList acc, String num) { } } }UDF 登録ページを開きます。
UDF を登録すると、後の開発でコードを簡単に再利用できます。Java UDF の場合、依存関係ファイルとしてアップロードすることもできます。詳細については、「ユーザー定義集約関数 (UDAF)」をご参照ください。
対象のワークスペースの[操作] 列で、[コンソール] をクリックします。
をクリックします。
左側の[関数]タブで、[UDF の登録]をクリックします。

[ファイルの選択]セクションで、ダウンロードした JAR ファイルをアップロードし、[OK]をクリックします。
説明UDF JAR ファイルは OSS バケットの sql-artifacts ディレクトリにアップロードされます。
Flink 開発コンソールは、UDF JAR ファイルを解析して、Flink UDF、UDAF、またはユーザー定義のテーブル値関数 (UDTF) インターフェイスのクラスを使用しているかどうかを確認します。その後、コンソールは自動的にクラス名を抽出し、関数名フィールドに入力します。
「関数の管理」ダイアログボックスで、[関数の作成] をクリックします。
正常に登録された UDF は、SQL エディターページの左側にある[関数] リストに表示されます。
ステップ 3: Flink ジョブの作成
「」ページで、[新規作成] をクリックします。

[ブランク ストリーム ドラフト] をクリックします。
[次へ] をクリックします。
[新規ジョブドラフト] ダイアログボックスで、ジョブパラメーターを設定します。
ジョブパラメーター
説明
ファイル名
ジョブの名前。
説明ジョブ名は現在のプロジェクト内で一意である必要があります。
保存先
ジョブのストレージの場所。
既存のフォルダーの横にある
アイコンをクリックして、サブフォルダーを作成することもできます。エンジンバージョン
ジョブで使用される Flink エンジンバージョン。これは pom.xml ファイルのバージョンと一致する必要があります。
エンジンバージョン番号、バージョンマッピング、およびライフサイクルマイルストーンの詳細については、「エンジンバージョン」をご参照ください。
DDL および DML コードを記述します。
--Create the temporary table electric_info. CREATE TEMPORARY TABLE electric_info ( event_id bigint not null, `user_id` bigint not null, event_time timestamp(6) not null, status string not null, primary key(event_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info' ); CREATE TEMPORARY TABLE electric_info_sortlistagg ( `user_id` bigint not null, status_sort varchar(50) not null, primary key(user_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info_sortlistagg' ); --Aggregate the data from the electric_info table and insert it into the electric_info_sortlistagg table. --Pass the string that is concatenated from status and event_time as a parameter to the registered user-defined function ASI_UDAF$SortListAgg. INSERT INTO electric_info_sortlistagg SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING))) FROM electric_info GROUP BY user_id;次の表はパラメーターについて説明しています。必要に応じて変更できます。MySQL コネクタパラメーターの詳細については、「MySQL コネクタ」をご参照ください。
パラメーター
説明
備考
connector
コネクタのタイプ。
この例では、値は
mysqlに固定されています。hostname
MySQL データベースの IP アドレスまたはホスト名。
このパラメーターを ApsaraDB RDS for MySQL インスタンスのプライベートエンドポイントに設定します。
username
MySQL データベースサービスに接続するためのユーザー名。
なし。
password
MySQL データベースサービスに接続するためのパスワード。
この例では、情報漏洩を防ぐために、パスワードに mysql_pw という名前の変数が使用されています。詳細については、「プロジェクト変数」をご参照ください。
database-name
MySQL データベースの名前。
この例では、このパラメーターは「ステップ 1: データソースの準備」で作成した electric データベースに設定されています。
table-name
MySQL テーブルの名前。
この例では、このパラメーターは electric または electric_info_sortlistagg に設定されています。
port
MySQL データベースサービスのポート。
なし。
(任意) 右上隅で、[ディープチェック] と [デバッグ] をクリックします。これらの機能の詳細については、「ジョブ開発マップ」をご参照ください。
[デプロイ] をクリックし、次に [OK] をクリックします。
ページで、目的のジョブを見つけ、[アクション] 列の [開始] をクリックし、次に [ステートレス開始] を選択します。
ステップ 4: 結果のクエリ
RDS で、次のステートメントを実行して結果を表示します。結果のユーザー端末ステータスは、イベント時間で昇順にソートされます。
SELECT * FROM `electric_info_sortlistagg`;結果は次のとおりです。

参照
Flink がサポートするビルトイン関数の詳細については、「ビルトイン関数」をご参照ください。
ジョブ実行パラメーターを変更するには、「ジョブデプロイ情報の構成」をご参照ください。一部のパラメーターは、ジョブの停止と開始によって発生するサービスダウンタイムを削減するために、動的更新もサポートしています。詳細については、「動的スケーリングとパラメーター更新」をご参照ください。
SQL ジョブでの Python ユーザー定義関数の使用方法の詳細については、「ユーザー定義関数」をご参照ください。