すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:UDAF を使用してデータのソートと集計を行う

最終更新日:Jan 08, 2025

ユーザー定義集計関数(UDAF)を使用すると、複数のデータ行を 1 行に結合し、特定の列に基づいてデータをソートできます。このトピックでは、Realtime Compute for Apache Flink コンソールで UDAF を使用してデータを集計およびソートする方法について説明します。この例では、住宅電力網端末のデータを使用します。

サンプルデータ

住宅電力網端末のデータは electric_info テーブルに格納されており、event_id、user_id、event_time、および status 列が含まれています。status 列のデータは、event_time 列に基づいて昇順にソートする必要があります。

  • electric_info

    event_id

    user_id

    event_time

    status

    1

    1222

    2023-06-30 11:14:00

    LD

    2

    1333

    2023-06-30 11:12:00

    LD

    3

    1222

    2023-06-30 11:11:00

    TD

    4

    1333

    2023-06-30 11:12:00

    LD

    5

    1222

    2023-06-30 11:15:00

    TD

    6

    1333

    2023-06-30 11:18:00

    LD

    7

    1222

    2023-06-30 11:19:00

    TD

    8

    1333

    2023-06-30 11:10:00

    TD

    9

    1555

    2023-06-30 11:16:00

    TD

    10

    1555

    2023-06-30 11:17:00

    LD

  • 期待される結果

    user_id

    status

    1222

    TD,LD,TD,TD

    1333

    TD,LD,LD,LD

    1555

    TD,LD

ステップ 1: データソースを準備する

この例では、ApsaraDB RDS データソースを使用します。

  1. ApsaraDB for RDS MySQL インスタンスを作成する

    説明

    Realtime Compute for Apache Flink ワークスペースと同じ仮想プライベートクラウド(VPC)に ApsaraDB for RDS MySQL インスタンスを作成することをお勧めします。 ApsaraDB for RDS MySQL インスタンスと Realtime Compute for Apache Flink ワークスペースが異なる VPC にある場合は、それらの間に接続を確立する必要があります。詳細については、「ネットワーク接続に関する FAQ」をご参照ください。

  2. データベースとアカウントを作成する。

    electric という名前のデータベースを作成し、electric データベースに対する読み取りおよび書き込み権限を持つ権限アカウントまたは標準アカウントを作成します。

  3. データ管理(DMS)を使用して ApsaraDB for RDS MySQL インスタンスにログオンする、electric データベースに electric_info と electric_info_SortListAgg という名前のテーブルを作成し、electric_info テーブルにデータを挿入します。

    CREATE TABLE `electric_info` (
      event_id bigint NOT NULL PRIMARY KEY COMMENT 'イベント ID',
      user_id bigint NOT NULL COMMENT 'ユーザー ID',
      event_time timestamp NOT NULL COMMENT 'イベント時間',
      status varchar(10) NOT NULL COMMENT 'ユーザーターミナルステータス'
    );
    
    CREATE TABLE `electric_info_SortListAgg` (
      user_id bigint NOT NULL PRIMARY KEY COMMENT 'ユーザー ID',
      status_sort varchar(50) NULL COMMENT 'イベント時間に基づいて昇順にソートされたユーザーターミナルステータス'
    );
    
    -- データを準備します。
    INSERT INTO electric_info VALUES 
    (1,1222,'2023-06-30 11:14','LD'),
    (2,1333,'2023-06-30 11:12','LD'),
    (3,1222,'2023-06-30 11:11','TD'),
    (4,1333,'2023-06-30 11:12','LD'),
    (5,1222,'2023-06-30 11:15','TD'),
    (6,1333,'2023-06-30 11:18','LD'),
    (7,1222,'2023-06-30 11:19','TD'),
    (8,1333,'2023-06-30 11:10','TD'),
    (9,1555,'2023-06-30 11:16','TD'),
    (10,1555,'2023-06-30 11:17','LD');

ステップ 2: UDF を登録する

  1. ASI_UDX-1.0-SNAPSHOT.jar パッケージをダウンロードします。

    Flink 1.17.1 のユーザー定義関数(UDF)に必要な最小限の依存関係情報は、pom.xml ファイルに設定されています。 UDF の使用方法の詳細については、「UDF」をご参照ください。

  2. ASI_UDAF を使用して複数のデータ行を 1 行に結合し、指定された列に基づいてデータをソートします。次のサンプルコードは例を示しています。ビジネス要件に基づいてコードを変更できます。

    package ASI_UDAF;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    public class ASI_UDAF{
    	/**アキュムレータークラス*/
    	public static class AcList {
    		public  List<String> list;
    	}
    
    	/**集計関数クラス*/
    	public static class SortListAgg extends AggregateFunction<String,AcList> {
    		public String getValue(AcList asc) {
    			/**特定のルールに従ってリスト内のデータをソートする*/
    			asc.list.sort(new Comparator<String>() {
    				@Override
    				public int compare(String o1, String o2) {
    					return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]);
    				}
    			});
    			/**ソートされたリストをトラバースし、必要なフィールドを抽出し、それらを文字列に結合する*/
    			List<String> ret = new ArrayList<String>();
    			Iterator<String> strlist = asc.list.iterator();
    			while (strlist.hasNext()) {
    				ret.add(strlist.next().split("#")[0]);
    			}
    			String str = StringUtils.join(ret, ',');
    			return str;
    		}
    
    		/**アキュムレーターを作成するメソッド*/
    		public AcList createAccumulator() {
    			AcList ac = new AcList();
    			List<String> list = new ArrayList<String>();
    			ac.list = list;
    			return ac;
    		}
    
    		/**累積メソッド: 入力データをアキュムレーターに追加する*/
    		public void accumulate(AcList acc, String tuple1) {
    			acc.list.add(tuple1);
    		}
    
    		/**リトラクションメソッド*/
    		public void retract(AcList acc, String num) {
    		}
    	}
    }
  3. [ファイルを選択] ダイアログボックスに移動します。

    UDF を登録すると、UDF コードを後続の開発で再利用できます。 Java UDF の場合、UDF の依存関係を使用して JAR ファイルをアップロードすることもできます。詳細については、「UDAF」をご参照ください。

    1. Realtime Compute for Apache Flink コンソール にログオンします。

    2. 管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションペインで、[開発] > [ETL] をクリックします。

    4. SQL エディターページの左側のペインで、[UDF] タブをクリックし、[UDF アーティファクトの登録] をクリックします。

      image.png

  4. [ファイルを選択] セクションの [クリックして選択] をクリックして、ステップ 1 で取得した JAR ファイルをアップロードし、[確認] をクリックします。

    注册UDF

    説明
    • UDF の JAR ファイルは、ワークスペースに関連付けられているオブジェクトストレージサービス(OSS)バケットの sql-artifacts ディレクトリにアップロードされます。

    • Realtime Compute for Apache Flink は UDF JAR ファイルを解析し、ファイル内で Flink UDF、UDAF、およびユーザー定義テーブル値関数(UDTF)インターフェースのクラスが使用されているかどうかを確認します。次に、Realtime Compute for Apache Flink は自動的にクラス名を抽出し、[関数名] フィールドにクラス名を指定します。

  5. [関数の管理] ダイアログボックスで、[関数の作成] をクリックします。

    SQL エディターページの左側のペインにある [UDF] タブで、登録されている UDF を表示できます。

ステップ 3: Realtime Compute for Apache Flink ドラフトを作成する

  1. 左側のナビゲーションペインで、[開発] > [ETL] をクリックします。 SQL エディターページの左上隅にある [新規] をクリックします。

    image.png

  2. [新規ドラフト] ダイアログボックスの [SQL スクリプト] タブで、[空のストリームドラフト] をクリックし、[次へ] をクリックします。

  3. [次へ] をクリックします。

  4. 表示されるページで、ドラフトのパラメーターを設定します。次の表にパラメーターを示します。

    パラメーター

    説明

    名前

    作成するドラフトの名前。

    説明

    ドラフト名は、現在の名前空間で一意である必要があります。

    場所

    ドラフトのコードファイルが保存されるフォルダー。

    既存のフォルダーの右側にある 新建文件夹 アイコンをクリックして、サブフォルダーを作成することもできます。

    エンジンバージョン

    Realtime Compute for Apache Flink デプロイメントのエンジンバージョン。値は、使用されている pom.xml ファイルのバージョンと同じである必要があります。

    エンジンバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。

  5. DDL および DML ステートメントを記述します。

    -- electric_info という名前の一時テーブルを作成します。
    CREATE TEMPORARY TABLE electric_info (
      event_id bigint not null,
      `user_id` bigint not null,
      event_time timestamp(6) not null,
      status string not null,
      primary key(event_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info'
    );
    
    CREATE TEMPORARY TABLE electric_info_sortlistagg (
      `user_id` bigint not null,
      status_sort varchar(50) not null,
      primary key(user_id) not enforced
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'your_username',
      'password' = '${secret_values.mysql_pw}',
      'database-name' = 'electric',
      'table-name' = 'electric_info_sortlistagg'
    );
    
    -- electric_info テーブルのデータを集計し、electric_info_sortlistagg テーブルにデータを挿入します。
    -- status フィールドと event_time フィールドを連結して新しい文字列を作成し、新しい文字列を ASI_UDAF$SortListAgg という名前の登録済み UDF にパラメーターとして渡します。
    INSERT INTO electric_info_sortlistagg
    SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING)))
    FROM electric_info GROUP BY user_id;

    次の表にパラメーターを示します。ビジネス要件に基づいてパラメーターを変更できます。 MySQL コネクタのパラメーターの詳細については、「MySQL コネクタ」をご参照ください。

    パラメーター

    説明

    備考

    connector

    コネクタタイプ。

    この例では、このパラメーターの値は mysql です。

    hostname

    MySQL データベースへのアクセスに使用される IP アドレスまたはホスト名。

    この例では、ApsaraDB for RDS MySQL インスタンスの内部エンドポイントが使用されます。

    username

    MySQL データベースへのアクセスに使用されるユーザー名。

    該当なし

    password

    MySQL データベースへのアクセスに使用されるパスワード。

    この例では、mysql_pw という名前のキーを使用してパスワードを保護し、情報の漏洩を防ぎます。詳細については、「変数の管理」をご参照ください。

    database-name

    アクセスする MySQL データベースの名前。

    この例では、ステップ 1: データソースを準備する で作成された electric データベースが使用されます。

    table-name

    MySQL テーブルの名前。

    この例では、テーブル名は electric または electric_info_sortlistagg です。

    port

    MySQL データベースへのアクセスに使用されるポート。

    該当なし

  6. オプション。 SQL エディターページの右上隅にある [検証][デバッグ] を順番にクリックします。詳細については、「SQL ドラフトを開発する」をご参照ください。

  7. [デプロイ] をクリックし、[確認] をクリックします。

  8. [O&M] > [デプロイメント] ページで、管理するデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。 [ジョブの開始] パネルで、[初期モード] を選択します。

ステップ 4: 結果を表示する

ApsaraDB for RDS MySQL インスタンスの DMS コンソールで、[SQL コンソール] タブで次のステートメントを実行して、ユーザーの端末ステータスのソート結果を表示します。

SELECT * FROM `electric_info_sortlistagg`;

次の図は、クエリ結果を示しています。

image.png

参考文献

  • Realtime Compute for Apache Flink でサポートされている組み込み関数の詳細については、「組み込み関数」をご参照ください。

  • デプロイメントの作成方法の詳細については、「デプロイメントを作成する」をご参照ください。デプロイメントの開始方法の詳細については、「デプロイメントを開始する」をご参照ください。

  • デプロイメントのパラメーター設定を変更する方法の詳細については、「デプロイメントを設定する」をご参照ください。デプロイメントの特定のパラメーターの設定を動的に更新できます。これにより、デプロイメントの開始とキャンセルによるサービス中断時間が短縮されます。詳細については、「動的スケーリングのパラメーター設定を動的に更新する」をご参照ください。

  • SQL デプロイメントで Python UDF を使用する方法の詳細については、「UDF」をご参照ください。