すべてのプロダクト
Search
ドキュメントセンター

Function Compute:Function Compute を使用した ApsaraMQ for RocketMQ メッセージデータのクレンジング

最終更新日:Mar 20, 2026

Function Compute が提供するデータクレンジングテンプレートを使用して、メッセージデータを処理します。また、テンプレートコードを修正して、カスタムクレンジング要件を満たすことも可能です。このトピックでは、ApsaraMQ for RocketMQ メッセージのコンテンツを分割する例を使用して、メッセージ処理テンプレートの種類と使用方法について説明します。

機能紹介

メッセージデータクレンジングタスクは、基本的なオペレーター機能を提供します。基盤となるロジックには Function Compute が使用されます。ApsaraMQ for RocketMQ メッセージのデータクレンジングタスクを作成した後、Function Compute コンソールにログインして、コードをカスタマイズし、関数構成を変更できます。

オペレーター

オペレーターの説明

メッセージフィルタリング

メッセージ本文を正規表現と照合し、一致するメッセージを送信先に送信します。詳細については、「イベントパターン」をご参照ください。

メッセージ変換

文字列の一致に基づいてメッセージ本文を置き換えます (文字のケース変換など)。変換されたメッセージを送信先に送信します。詳細については、「イベントコンテンツの変換」をご参照ください。

コンテンツ分割

正規表現に基づいてメッセージ本文を分割し、分割された各部分を個別のメッセージとして送信先に送信します。

動的ルーティング

メッセージ本文を正規表現と照合します。一致するメッセージは対応する送信先にルーティングされ、一致しないメッセージはデフォルトの送信先にルーティングされます。

コンテンツエンリッチメント

エンリッチメントソースに基づいてメッセージ本文をエンリッチします。たとえば、元のメッセージに AccountID が含まれている場合、AccountID を使用してデータベースをクエリし、顧客リージョンを取得して、そのリージョンを元のメッセージ本文に追加し、メッセージを送信先サービスに送信できます。

コンテンツマッピング

正規表現に基づいてメッセージ本文をマッピングします。たとえば、メッセージ内の機密フィールドをマスクしたり、メッセージサイズを最小基準に削減したりできます。

シナリオ例

コンテンツ分割

たとえば、元のメッセージである学生リスト [田中一郎, 男性, Class 4|佐藤花子, 女性, Class 3|山田健太, 男性, Class 4] を3つの個別のメッセージに分割し、それぞれを送信先サービスにプッシュする必要があります。これを行うには、コンテンツ分割オペレーターを使用します。分割されたメッセージは次のとおりです。

メッセージ:
    [張 三、男性、クラス 4]
メッセージ:
    [李 四、女性、クラス 3]
メッセージ:
    [王 五、男性、クラス 4]

次の図に結果を示します。

image

動的ルーティング

たとえば、以下は歯磨き粉情報のリストです。

メッセージ:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

カスタム動的ルールに基づいて、リストを送信先トピックにルーティングする必要があります。ルールは次のように記述されます。

  • メッセージがブランドAで始まる場合、BrandA-item-topic と BrandA-discount-topic に送信します。

  • メッセージがブランドBで始まる場合、BrandB-item-topic と BrandB-discount-topic に送信します。

  • その他のすべてのメッセージは Unknown-brand-topic に送信します。

ルールは JSON 形式で次のように記述されます。

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

次の図に結果を示します。

image

コンテンツエンリッチメント

このトピックでは、IPアドレスセグメント処理シナリオからのデータエンリッチメントの例を使用します。サービスへのアクセスログが次のようになっていると仮定します。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}

IPアドレスのソースをカウントする必要があります。マッピング関係は MySQL データベースに保存されています。

CREATE TABLE `tb_ip` (
    ->      `IP` VARCHAR(256) NOT NULL,
    ->     `Region` VARCHAR(256) NOT NULL,
    ->      `ISP` VARCHAR(256) NOT NULL,
    ->      PRIMARY KEY (`IP`)
    -> );

処理されたメッセージは次のとおりです。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "Beijing"
}

次の図に結果を示します。

image

コンテンツマッピング

たとえば、以下は会社の従業員の登録情報であり、従業員IDや電話番号などの個人情報が含まれています。

Zhang San, Employee ID 1, 131 1111 1111
Li Si, Employee ID 2, 132 2222 2222
Wang Wu, Employee ID 3, 133 3333 3333

メッセージ内の従業員の個人情報をマスクし、送信先サービスにプッシュする必要があります。結果は次のとおりです。

Zhang*、従業員ID *、*** **** ****
Li*、従業員ID *、*** **** ****
Wang*、従業員ID *、*** **** ****

次の図に結果を示します。

image

操作手順

1. ApsaraMQ for RocketMQ インスタンスとトピックを作成する

  1. Message Queue for Apache RocketMQ コンソール」にログインします。左側のナビゲーションウィンドウで、[インスタンス] を選択します。メニューバーでリージョンを選択し、[インスタンスの作成] をクリックします。

  2. RocketMQ インスタンスの作成」パネルで、「インスタンスバージョン」を選択し、たとえば「4.0 シリーズ」を指定します。「インスタンスタイプ」には「標準インスタンス」を選択します。インスタンス名(例:test)とインスタンスの説明を入力し、「OK」をクリックします。

  3. [インスタンスリスト] ページで、ターゲットインスタンスをクリックします。インスタンス詳細ページの左側にあるナビゲーションウィンドウで、[トピック管理] を選択し、[トピックの作成] をクリックします。

  4. トピックの作成」パネルで、トピック名(例:source-topictarget-topic)を指定します。[説明] を入力し、[メッセージタイプ][通常メッセージ] を選択して、[OK] をクリックします。

    説明

    少なくとも2つのトピックを作成する必要があります。1つのトピックは元のメッセージを送信するイベントソースとして機能し、もう1つのトピックはクレンジングされたデータを受信するイベントターゲットとして機能します。これら2つのトピックは、同じ ApsaraMQ for RocketMQ インスタンスに属することも、異なるインスタンスに属することもできます。

2. イベントストリームの作成

  1. EventBridge コンソールにログインします。左側のナビゲーションウィンドウで、[イベントストリーム] を選択します。トップメニューバーでリージョンを選択し、[イベントストリームの作成] をクリックします。

  2. [イベントストリームの作成] ページで、[ソース][フィルタリング][変換]、および [シンク] 設定ウィザードを使用して、それぞれイベントソース、フィルタリングルール、データクレンジングテンプレート、イベントターゲットを設定します。その後、[保存] をクリックします。

    image

    ソース と ④シンク

    • ソース」では、ApsaraMQ for RocketMQ インスタンスの test とトピックの source-topic を選択します。

    • [Sink] では、名前が test の ApsaraMQ for RocketMQ インスタンスと、名前が target-topic のトピックを選択します。

    その他の設定項目はデフォルト値を保持します。

    フィルタリング

    この構成はオプションです。デフォルト値を保持し、[次へ]をクリックします。

    変換

    • Alibaba Cloud サービスの選択: Function Compute。

    • [新しい関数テンプレート]」を選択します。イベントストリームを作成すると、FC 関数 EventStreaming_Transform_Customized_**** が作成されます。

    • 関数テンプレート: 本トピックでは、コンテンツ分割テンプレートを例に説明します。利用可能なテンプレートには、[コンテンツ分割][コンテンツマッピング][コンテンツエンリッチメント]、および [動的ルート] があります。必要に応じて、これらのテンプレートのいずれかを選択できます。各テンプレートは、そのまま使用したりカスタマイズしたりできる基本的なデータ処理ロジックを提供します。

    image

3. テストと検証

3.1 ソース ApsaraMQ for RocketMQ インスタンスのトピックから元のメッセージを送信

  1. Message Queue for Apache RocketMQ コンソールにログインします。Message Queue for Apache RocketMQ コンソール で、イベントストリームを作成するときに設定した Source インスタンスのトピック source-topic を検索します。[アクション] 列で、[クイックエクスペリエンス] をクリックします。

  2. [メッセージの生成と消費のクイック体験] パネルで、raw メッセージ [Zhang San, Male, Class 4|Li Si, Female, Class 3|Wang Wu, Male, Class 4] を入力し、[OK] をクリックしてメッセージを送信します。

3.2 送信先 ApsaraMQ for RocketMQ インスタンスのトピックでメッセージが正しく分割されていることを確認

  1. Message Queue for Apache RocketMQ コンソールイベントストリームを作成したときに設定した Sink (送信先) インスタンスのトピック target-topic を見つけます。トピック名をクリックし、次に [メッセージ照会] タブを選択します。

  2. [クエリメソッド][トピックによるクエリ] として選択し、[クエリ] をクリックします。 クエリ結果で、前のステップで送信されたメッセージが送信先に配信される際に 3 つのメッセージに分割されたことを確認できます。 各メッセージ行の [詳細] をクリックすると、3 つのメッセージである "data": "Zhang San, male, Class 4""data": "Li Si, female, Class 3"、および "data": "Wang Wu, male, Class 4" を表示できます。

    image

4. リソースのクリーンアップ

テストが完了したら、この機能を今後使用しない場合は、不要な課金を避けるために、作成したリソースをすみやかにリリースしてください。詳細については、「トピックの削除」、「ApsaraMQ for RocketMQ インスタンスの削除」、および「関数の削除」をご参照ください。