すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:データクレンジング

最終更新日:Jan 25, 2026

データクレンジング機能は、コンテンツ分割、動的ルーティング、コンテンツエンリッチメント、コンテンツマッピングテンプレートなど、メッセージ処理のための共通テンプレートを提供します。テンプレート内のコードを使用してメッセージを処理できます。ビジネス要件に基づいてテンプレート内のコードを変更することもできます。

背景情報

データクレンジング機能は、Function Computeに基づく基本的なオペレーター機能を提供します。データクレンジング機能は、次のサービスでサポートされています。ApsaraMQ for RocketMQApsaraMQ for KafkaApsaraMQ for MQTTApsaraMQ for RabbitMQ、およびSimple Message Queue (formerly MNS)です。データクレンジングタスクを作成した後、Function Computeコンソールにログインして、カスタムコードを記述し、対応する関数の設定を変更できます。

オペレーター

説明

コンテンツ分割

正規表現に基づいてメッセージコンテンツを分割し、分割されたメッセージを1つずつ宛先に送信します。

動的ルーティング

正規表現に基づいてメッセージコンテンツを照合します。コンテンツが一致するメッセージは、対応する宛先サービスにルーティングされます。コンテンツが一致しないメッセージは、デフォルトの宛先サービスにルーティングされます。

コンテンツエンリッチメント

エンリッチメントソースに基づいてメッセージコンテンツをエンリッチします。メッセージの元のコンテンツにアカウント ID が含まれている場合、アカウント ID を使用してデータベースをクエリし、クライアントリージョンを取得します。次に、データベースとクライアントリージョンの情報がソースメッセージの本文に入力され、宛先サービスに送信されます。

コンテンツマッピング

正規表現に基づいてメッセージコンテンツをマッピングします。たとえば、システムはメッセージ内の機密フィールドをマスクしたり、メッセージサイズを最小サイズに縮小したりします。

このトピックでは、ApsaraMQ for Kafka でデータクレンジング機能を使用する方法について説明します。

シナリオ

コンテンツ分割

次のメッセージには、生徒のリストが含まれています。

message:
[Jack, Male, Class 4; Alice, Female, Class 3; John, Male, Class 4]

メッセージを指定された宛先サービスに送信する前に、メッセージを次のメッセージに分割します。各メッセージには、1 人の生徒の情報が含まれています。

message:
    [Jack, Male, Class 4]
message:
    [Alice, Female, Class 3]
message:
    [John, Male, Class 4]

次の図はプロセスを示しています。

動的ルーティング

次のメッセージには、歯磨き粉のリストが含まれています。

message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

カスタム動的ルールに基づいて、リストを指定された宛先トピックに送信する必要があります。ルールは以下のとおりです。

  • BrandA で始まるメッセージを BrandA-item-topic および BrandA-discount-topic トピックに送信します。

  • BrandB で始まるメッセージを BrandB-item-topic および BrandB-discount-topic トピックに送信します。

  • その他のメッセージを Unknown-brand-topic トピックに送信します。

次のサンプルコードは、ルールのJSON形式を示しています。

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

次の図はプロセスを示しています。

コンテンツエンリッチメント

この例では、IPアドレスセグメントがエンリッチされます。次のコードスニペットは、サービスのアクセスログを示しています。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}

IPアドレスのソースをクエリし、マッピング関係をMySQLデータベースに格納する必要があります。

CREATE TABLE `tb_ip` (
    ->      `IP` VARCHAR(256) NOT NULL,  // IPアドレス
    ->     `Region` VARCHAR(256) NOT NULL, // リージョン
    ->      `ISP` VARCHAR(256) NOT NULL,   // ISP
    ->      PRIMARY KEY (`IP`)           // プライマリキー
    -> );

次のコードスニペットは、処理されたメッセージを示しています。

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "beijing"
}

次の図はプロセスを示しています。

コンテンツマッピング

次のメッセージには、会社の従業員の登録情報が含まれています。情報には、従業員 ID や携帯電話番号などの機密コンテンツが含まれています。

James, Employee ID 1, 131 1111 1111
Mary, Employee ID 2, 132 2222 2222
David, Employee ID 3, 133 3333 3333

上記のメッセージの機密コンテンツは、宛先サービスに送信される前にマスクする必要があります。次のサンプルコードは例を示しています。

Ja*, Employee ID *, ***********
Ma*, Employee ID *, ***********
Dav*, Employee ID *, *********** 

次の図はプロセスを示しています。

手順

  1. ApsaraMQ for Kafka コンソールにログインします。左側のナビゲーションペインで、[コネクターエコシステム統合] > [タスク] を選択します。[タスク] ページで、タスクリストの作成 をクリックします。

  2. [ソース][フィルタリング][変換][シンク] の手順で、[タスクの作成] ページで、イベントソース、フィルタリングルール、データクレンジングテンプレート、イベントターゲットを設定します。

    image

    [ソース] と ④ [シンク]

    異なる ApsaraMQ for Kafka インスタンスを選択します。

    [フィルタリング]

    一致ルールを指定します。この手順はオプションです。このパラメーターを空のままにすると、すべてのイベントが一致とみなされます。詳細については、「イベントパターン」をご参照ください。

    [変換]

    Function Compute が提供するテンプレートを選択します。有効な値:[コンテンツ分割][コンテンツマッピング][コンテンツエンリッチメント][動的ルーティング]。ビジネス要件に基づいて上記のテンプレートのいずれかを選択できます。テンプレートは、データ処理の基本ロジックを提供します。テンプレート内のコードを使用してメッセージを処理できます。ビジネス要件に基づいてテンプレート内のコードを変更することもできます。

    この例では、[コンテンツ分割] テンプレートが選択されています。

    image