すべてのプロダクト
Search
ドキュメントセンター

Function Compute:Function Compute を使用した RocketMQ のメッセージデータのクレンジング

最終更新日:Jun 22, 2026

Function Compute (FC) が提供するデータクレンジングテンプレートを使用してメッセージデータを処理したり、ビジネス要件に基づいてテンプレートコードを修正してカスタムのクレンジングニーズに対応したりできます。このトピックでは、ApsaraMQ for RocketMQ のデータに対するコンテンツ分割を例に、メッセージ処理テンプレートの種類とその使用方法について説明します。

概要

データクレンジング機能は、FC に基づく基本的なオペレーター機能を提供します。ApsaraMQ for RocketMQ でメッセージデータクレンジングタスクを作成した後、Function Compute にログインしてコードと関数の設定を修正できます。

オペレーター

説明

メッセージフィルタリング

正規表現に基づいてメッセージ本文を照合し、一致したメッセージを指定の送信先に送信します。詳細については、「イベントストリームのイベントパターン」をご参照ください。

メッセージ変換

文字列一致条件に基づいてメッセージ本文を変換し、変換後のメッセージを指定の送信先に送信します。例えば、文字の大文字と小文字の変換を実行できます。変換されたメッセージは、指定された送信先に送信できます。詳細については、「イベントストリームでのイベント変換」をご参照ください。

コンテンツ分割

正規表現に基づいてメッセージ本文を分割し、分割されたメッセージを指定の送信先に送信します。

動的ルーティング

正規表現に基づいてメッセージ本文を照合し、一致したメッセージを指定の送信先に、一致しなかったメッセージをデフォルトの送信先に送信します。

コンテンツエンリッチメント

ソースに基づいてメッセージ本文をエンリッチします。例えば、メッセージの元のコンテンツに AccountID が含まれている場合、AccountID を使用してデータベースをクエリし、顧客のリージョンを取得します。システムは顧客のリージョンをソースメッセージの本文に挿入し、そのメッセージ本文を指定の送信先サービスに送信します。

コンテンツマッピング

正規表現に基づいてメッセージ本文をマッピングします。例えば、システムはメッセージ内の機密フィールドをマスクしたり、メッセージサイズを最小サイズに縮小したりできます。

コンテンツ分割

例えば、元の学生リストメッセージ [Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4] を 3 つの個別のメッセージに分割し、これらの 3 つのメッセージをそれぞれの送信先サービスに送信する必要があるとします。これを実装するには、コンテンツ分割オペレーターを使用します。分割されたメッセージは次のとおりです。

message:
    [Jack, Male, Class 4]
message:
    [Alice, Female, Class 3]
message:
    [John, Male, Class 4]

動的ルーティング

次のメッセージには、3 つのブランドの歯磨き粉に関する情報が含まれています。

message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

このリストは、カスタムの動的ルールに基づいて送信先 Topic に送信する必要があります。ルールは次のとおりです。

  • BrandA で始まるメッセージを BrandA-item-topic と BrandA-discount-topic に送信します。

  • BrandB で始まるメッセージを BrandB-item-topic と BrandB-discount-topic に送信します。

  • その他のメッセージを Unknown-brand-topic に送信します。

次のサンプルコードは、ルールの JSON 形式を示しています。

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

コンテンツエンリッチメント

この例では、CIDR ブロックがエンリッチされます。次のサンプルコードは、サービスのアクセスログの例です。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}

次のサンプルコードは、IP アドレスのソースをクエリし、マッピング関係を MySQL データベースに保存する方法の例です。

CREATE TABLE `tb_ip` (
    ->      `IP` VARCHAR(256) NOT NULL,
    ->     `Region` VARCHAR(256) NOT NULL,
    ->      `ISP` VARCHAR(256) NOT NULL,
    ->      PRIMARY KEY (`IP`)
    -> );

次のサンプルコードは、処理されたメッセージの例です。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "beijing"
}

コンテンツマッピング

次のメッセージには、ある会社の従業員の登録情報が含まれています。

Zhang San, Employee ID 1, 131 1111 1111
Li Si, Employee ID 2, 132 2222 2222
Wang Wu, Employee ID 3, 133 3333 3333

上記のメッセージに含まれる従業員の名前、ID、電話番号は機密情報です。そのため、メッセージを送信先サービスに送信する前に、名前、ID、電話番号をマスクする必要があります。次のサンプルコードに例を示します。

Ja*, 従業員 ID *, ***********
Ma*, 従業員 ID *, ***********
Dav*, 従業員 ID *, *********** 

操作手順

1. ApsaraMQ for RocketMQ インスタンスの Topic の作成

  1. ApsaraMQ for RocketMQ コンソールにログインします。左側のナビゲーションウィンドウで、[インスタンス] をクリックします。上部のメニューバーでリージョンを選択し、[インスタンスの作成] をクリックします。

  2. [RocketMQ インスタンスの作成] パネルで、[インスタンスバージョン] (例:[4.0 シリーズ]) を選択します。[インスタンスタイプ][標準インスタンス] を選択します。インスタンス名 (例:test) と説明を入力し、[OK] をクリックします。

  3. [インスタンス] ページで、対象のインスタンスをクリックします。インスタンス詳細ページの左側のナビゲーションウィンドウで [Topics] をクリックし、[Topic の作成] をクリックします。

  4. [Topic の作成] パネルで、source-topictarget-topic という名前の 2 つの Topic を作成します。[説明] を入力します。[メッセージタイプ][通常メッセージ] を選択し、[OK] をクリックします。

    説明

    少なくとも 2 つの Topic を作成する必要があります。1 つは元のメッセージを送信するイベントソースとして、もう 1 つはクレンジングされたデータを受信するイベントターゲットとして使用します。2 つの Topic は、同じ RocketMQ インスタンスに属していても、異なる RocketMQ インスタンスに属していてもかまいません。

2. イベントストリームの作成

  1. EventBridge コンソールにログインします。左側のナビゲーションウィンドウで [イベントストリーム] をクリックします。上部のメニューバーでリージョンを選択し、[イベントストリームの作成] をクリックします。

  2. [イベントストリームの作成] ページで、[ソース][フィルタリング][変換][シンク] の各ステップでイベントソース、フィルタリングルール、データクレンジングテンプレート、イベントターゲットを設定します。その後、[保存] をクリックします。

    ソースシンク

    • [ソース] では、ApsaraMQ for RocketMQ インスタンス test を選択し、Topic には source-topic を選択します。

    • [シンク] では、ApsaraMQ for RocketMQ インスタンス test を選択します。Topic には target-topic を選択します。

    他の設定項目はデフォルト値のままにします。

    [フィルタリング]

    このステップはオプションです。デフォルト設定のままにして、[次へ] をクリックします。

    [変換]

    • [Alibaba Cloud サービスを選択]:Function Compute。

    • [関数テンプレートの作成] を選択します:イベントストリームとともに、新しい FC 関数 EventStreaming_Transform_Customized_**** が作成されます。

    • [関数テンプレート]:この例では、[コンテンツ分割] テンプレートを使用します。利用可能な他のテンプレートには、[コンテンツマッピング][コンテンツエンリッチメント][動的ルーティング] があります。要件に応じてテンプレートを選択してください。これらのテンプレートは、直接使用したりカスタマイズしたりできる基本的なデータ処理ロジックを提供します。

    関数名は EventStreaming_Transform_Split_ の形式で自動的に生成され、サフィックスが付きます。テンプレートコードは、UTF-8 を使用してイベントをデコードし、その後 ast.literal_evaljson.loads を使用してメッセージ配列を解析します。設定が有効な場合、ページの上部に緑色の [有効な設定] ラベルが表示されます。

3. テストと検証

3.1 ソース RocketMQ インスタンスの Topic で元のメッセージを送信

  1. Message Queue for Apache RocketMQ コンソールにログインし、イベントストリームを作成したときに設定した [ソース] インスタンスの Topic source-topic を見つけ、右側の [操作] 列で [クイック体験] をクリックします。

  2. [メッセージの送受信を開始] パネルで、元のメッセージ [Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4] を入力し、[OK] をクリックしてメッセージを送信します。

3.2 送信先 RocketMQ インスタンスの Topic でメッセージが正しく分割されていることを確認

  1. Message Queue for Apache RocketMQ コンソールで、イベントストリームを作成したときに設定した [シンク] インスタンスの Topic target-topic を見つけ、Topic 名をクリックし、[メッセージクエリ] タブを選択します。

  2. [クエリ方法][Topic によるクエリ] を選択し、[検索] をクリックします。クエリ結果には、元のメッセージがシンクで 3 つの個別のメッセージに分割されたことが示されます。各メッセージの行にある [詳細] をクリックします。3 つのメッセージは "data": "Jack, Male, Class 4" "data": "Alice, Female, Class 3""data": "John, Male, Class 4" です。

4. リソースのクリーンアップ

テスト後、この機能を短期間使用しない場合は、不要なコストを避けるために、作成したリソースを速やかにリリースしてください。詳細については、「Topic の削除」、「RocketMQ インスタンスの削除」、および「関数の削除」をご参照ください。