すべてのプロダクト
Search
ドキュメントセンター

Function Compute:Function Compute を使用した RocketMQ のメッセージデータのクレンジング

最終更新日:Dec 19, 2025

Function Compute が提供するデータクレンジングテンプレートを使用してメッセージデータを処理したり、ビジネス要件に基づいてテンプレートコードを変更して、カスタムのクレンジングニーズに対応したりできます。このトピックでは、ApsaraMQ for RocketMQ のデータのコンテンツ分割を例に、メッセージ処理テンプレートの種類とその使用方法について説明します。

概要

データクレンジング機能は、Function Compute に基づく基本的なオペレーター機能を提供します。ApsaraMQ for RocketMQ でメッセージデータクレンジングタスクを作成した後、Function Compute にログインして、コードと関数の設定を変更できます。

オペレーター

説明

メッセージフィルタリング

正規表現に基づいてメッセージの内容を照合し、一致したメッセージを指定の宛先に送信します。詳細については、「イベントパターン」をご参照ください。

メッセージ変換

文字列の一致条件に基づいてメッセージの内容を変換し、変換されたメッセージを指定の宛先に送信します。例えば、文字の大文字と小文字の変換を実行できます。変換されたメッセージは、指定された宛先に送信できます。詳細については、「イベント変換」をご参照ください。

コンテンツ分割

正規表現に基づいてメッセージの内容を分割し、分割されたメッセージを指定の宛先に送信します。

動的ルーティング

正規表現に基づいてメッセージの内容を照合し、一致したメッセージを指定の宛先に、一致しなかったメッセージをデフォルトの宛先に送信します。

コンテンツエンリッチメント

ソースに基づいてメッセージの内容をエンリッチします。例えば、メッセージの元の内容に AccountID が含まれている場合、AccountID を使用してデータベースをクエリし、顧客のリージョンを取得します。システムは顧客のリージョンをソースメッセージの本文に挿入し、メッセージ本文を指定の宛先サービスに送信します。

コンテンツマッピング

正規表現に基づいてメッセージの内容をマッピングします。例えば、システムはメッセージ内の機密フィールドをマスクしたり、メッセージサイズを最小サイズに縮小したりできます。

コンテンツ分割

例えば、元の学生リストメッセージ [Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4] を 3 つの個別のメッセージに分割し、これらの 3 つのメッセージをそれぞれの宛先サービスに送信する必要があります。これを実装するには、コンテンツ分割オペレーターを使用します。分割されたメッセージは次のとおりです。

message:
    [Jack, Male, Class 4]
message:
    [Alice, Female, Class 3]
message:
    [John, Male, Class 4]

次の図にプロセスを示します。

image

動的ルーティング

次のメッセージには、3 つのブランドの歯磨き粉に関する情報が含まれています。

message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

このリストは、カスタムの動的ルールに基づいて宛先 Topic に送信する必要があります。ルールは次のとおりです。

  • BrandA で始まるメッセージを BrandA-item-topic と BrandA-discount-topic の Topic に送信します。

  • BrandB で始まるメッセージを BrandB-item-topic と BrandB-discount-topic の Topic に送信します。

  • その他のメッセージを Unknown-brand-topic の Topic に送信します。

次のサンプルコードは、ルールの JSON 形式を示しています。

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

次の図にプロセスを示します。

image

コンテンツエンリッチメント

この例では、CIDR ブロックがエンリッチされます。次のサンプルコードは、サービスのアクセスログの例です。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}

次のサンプルコードは、IP アドレスのソースをクエリし、マッピング関係を MySQL データベースに格納する方法の例です。

CREATE TABLE `tb_ip` (
    ->      `IP` VARCHAR(256) NOT NULL,
    ->     `Region` VARCHAR(256) NOT NULL,
    ->      `ISP` VARCHAR(256) NOT NULL,
    ->      PRIMARY KEY (`IP`)
    -> );

次のサンプルコードは、処理されたメッセージの例です。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "beijing"
}

次の図にプロセスを示します。

image

コンテンツマッピング

次のメッセージには、会社の従業員の登録情報が含まれています。

Zhang San, 従業員ID 1, 131 1111 1111
Li Si, 従業員ID 2, 132 2222 2222
Wang Wu, 従業員ID 3, 133 3333 3333

上記のメッセージに含まれる従業員の名前、ID、電話番号は機密情報です。そのため、メッセージを宛先サービスに送信する前に、名前、ID、電話番号をマスクする必要があります。次のサンプルコードに例を示します。

Ja*, Employee ID *, ***********
Ma*, Employee ID *, ***********
Dav*, Employee ID *, *********** 

次の図にプロセスを示します。

image

操作手順

1. ApsaraMQ for RocketMQ インスタンスの Topic の作成

  1. ApsaraMQ for RocketMQ コンソールにログインします。左側のナビゲーションウィンドウで、[Instances] をクリックします。上部のナビゲーションバーでリージョンを選択し、[Create Instance] をクリックします。

  2. [Create RocketMQ Instance] パネルで、[Instance Version] (例:[4.0 Series]) を選択し、[Instance Type][Standard Instance] を選択し、インスタンス名 (例:test) とインスタンスの説明を入力して、[OK] をクリックします。

  3. [Instances] ページで、対象のインスタンスをクリックします。インスタンス詳細ページの左側のナビゲーションウィンドウで [Topics] をクリックし、[Create Topic] をクリックします。

  4. [Create Topic] パネルで、Topic 名 (例:source-topic および target-topic) を設定し、[Description] を入力し、[Message Type][Normal Message] を選択して、[OK] をクリックします。

    説明

    少なくとも 2 つの Topic を作成する必要があります。1 つは元のメッセージを送信するイベントソースとして、もう 1 つはクレンジングされたデータを受信するイベントターゲットとして使用します。2 つの Topic は、同じ RocketMQ インスタンスに属することも、異なる RocketMQ インスタンスに属することもできます。

2. イベントストリームの作成

  1. EventBridge コンソールにログインします。左側のナビゲーションウィンドウで、[Event Streams] をクリックします。上部のナビゲーションバーでリージョンを選択し、[Create Event Stream] をクリックします。

  2. [Create Event Stream] ページで、[Source][Filtering][Transformation][Sink] の設定ウィザードでイベントソース、フィルタリングルール、データクレンジングテンプレート、イベントターゲットを設定し、[Save] をクリックします。

    image

    [Source] と ④ [Sink]

    • [Source] では、ApsaraMQ for RocketMQ インスタンス test を選択し、Topic には source-topic を選択します。

    • [Sink] では、ApsaraMQ for RocketMQ インスタンス test を選択し、Topic には target-topic を選択します。

    その他の設定項目はデフォルト値のままにします。

    [Filtering]

    これはオプションの設定です。デフォルト値のままにして、[Next] をクリックします。

    [Transformation]

    • [Select Alibaba Cloud Service]: Function Compute。

    • [Create Function Template] を選択します。イベントストリームを作成すると、新しい FC 関数 EventStreaming_Transform_Customized_**** が作成されます。

    • [Function Template]: この例では、コンテンツ分割テンプレートが選択されています。オプションには、[Content Splitting][Content Mapping][Content Enrichment][Dynamic Routing] があります。ビジネス要件に基づいて、これらのテンプレートのいずれかを選択できます。テンプレートは、直接使用したり、カスタム調整したりできる基本的なデータ処理ロジックを提供します。

    image

3. テストと検証

3.1 ソース RocketMQ インスタンスの Topic での元のメッセージの送信

  1. ApsaraMQ for RocketMQ コンソールにログインします。イベントストリームの作成時に設定した [Source] インスタンスの Topic source-topic を見つけます。[Actions] 列で、[Quick Start] をクリックします。

  2. [Start Message Production And Consumption] パネルで、元のメッセージ [Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4] を入力し、[OK] をクリックしてメッセージを送信します。

3.2 宛先 RocketMQ インスタンスの Topic でメッセージが正しく分割されていることの確認

  1. ApsaraMQ for RocketMQ コンソールで、イベントストリームの作成時に設定した [Sink] インスタンスの Topic target-topic を見つけます。Topic 名をクリックし、[Message Query] タブを選択します。

  2. [Query Method][Query By Topic] を選択し、[Search] をクリックします。クエリ結果では、前のステップで送信されたメッセージが、宛先インスタンスに送信される際に 3 つのメッセージに分割されていることがわかります。各メッセージ行の [Details] をクリックすると、3 つのメッセージが "data": "Jack, Male, Class 4" "data": "Alice, Female, Class 3""data": "John, Male, Class 4" であることがわかります。

    image

4. リソースのクリーンアップ

テスト後、この機能を短期間使用しない場合は、不要なコストを避けるために、作成したリソースを速やかにリリースしてください。詳細については、「Topic の削除」、「RocketMQ インスタンスの削除」、および「関数の削除」をご参照ください。