データクレンジング機能は、コンテンツ分割、動的ルーティング、コンテンツエンリッチメント、コンテンツマッピングテンプレートなど、メッセージ処理のための共通テンプレートを提供します。テンプレート内のコードを使用してメッセージを処理できます。ビジネス要件に基づいてテンプレート内のコードを変更することもできます。
背景情報
データクレンジング機能は、Function Computeに基づく基本的なオペレーター機能を提供します。データクレンジング機能は、次のサービスでサポートされています。ApsaraMQ for RocketMQ、ApsaraMQ for Kafka、ApsaraMQ for MQTT、ApsaraMQ for RabbitMQ、およびSimple Message Queue (formerly MNS)です。データクレンジングタスクを作成した後、Function Computeコンソールにログインして、カスタムコードを記述し、対応する関数の設定を変更できます。
オペレーター | 説明 |
コンテンツ分割 | 正規表現に基づいてメッセージコンテンツを分割し、分割されたメッセージを1つずつ宛先に送信します。 |
動的ルーティング | 正規表現に基づいてメッセージコンテンツを照合します。コンテンツが一致するメッセージは、対応する宛先サービスにルーティングされます。コンテンツが一致しないメッセージは、デフォルトの宛先サービスにルーティングされます。 |
コンテンツエンリッチメント | エンリッチメントソースに基づいてメッセージコンテンツをエンリッチします。メッセージの元のコンテンツにアカウント ID が含まれている場合、アカウント ID を使用してデータベースをクエリし、クライアントリージョンを取得します。次に、データベースとクライアントリージョンの情報がソースメッセージの本文に入力され、宛先サービスに送信されます。 |
コンテンツマッピング | 正規表現に基づいてメッセージコンテンツをマッピングします。たとえば、システムはメッセージ内の機密フィールドをマスクしたり、メッセージサイズを最小サイズに縮小したりします。 |
このトピックでは、ApsaraMQ for Kafka でデータクレンジング機能を使用する方法について説明します。
シナリオ
コンテンツ分割
次のメッセージには、生徒のリストが含まれています。
message:
[Jack, Male, Class 4; Alice, Female, Class 3; John, Male, Class 4]メッセージを指定された宛先サービスに送信する前に、メッセージを次のメッセージに分割します。各メッセージには、1 人の生徒の情報が含まれています。
message:
[Jack, Male, Class 4]
message:
[Alice, Female, Class 3]
message:
[John, Male, Class 4]次の図はプロセスを示しています。
動的ルーティング
次のメッセージには、歯磨き粉のリストが含まれています。
message:
[BrandA, toothpaste, $12.98, 100g
BrandB, toothpaste, $7.99, 80g
BrandC, toothpaste, $1.99, 100g]カスタム動的ルールに基づいて、リストを指定された宛先トピックに送信する必要があります。ルールは以下のとおりです。
BrandA で始まるメッセージを BrandA-item-topic および BrandA-discount-topic トピックに送信します。
BrandB で始まるメッセージを BrandB-item-topic および BrandB-discount-topic トピックに送信します。
その他のメッセージを Unknown-brand-topic トピックに送信します。
次のサンプルコードは、ルールのJSON形式を示しています。
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}次の図はプロセスを示しています。
コンテンツエンリッチメント
この例では、IPアドレスセグメントがエンリッチされます。次のコードスニペットは、サービスのアクセスログを示しています。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}IPアドレスのソースをクエリし、マッピング関係をMySQLデータベースに格納する必要があります。
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL, // IPアドレス
-> `Region` VARCHAR(256) NOT NULL, // リージョン
-> `ISP` VARCHAR(256) NOT NULL, // ISP
-> PRIMARY KEY (`IP`) // プライマリキー
-> );次のコードスニペットは、処理されたメッセージを示しています。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "beijing"
}次の図はプロセスを示しています。
コンテンツマッピング
次のメッセージには、会社の従業員の登録情報が含まれています。情報には、従業員 ID や携帯電話番号などの機密コンテンツが含まれています。
James, Employee ID 1, 131 1111 1111
Mary, Employee ID 2, 132 2222 2222
David, Employee ID 3, 133 3333 3333上記のメッセージの機密コンテンツは、宛先サービスに送信される前にマスクする必要があります。次のサンプルコードは例を示しています。
Ja*, Employee ID *, ***********
Ma*, Employee ID *, ***********
Dav*, Employee ID *, *********** 次の図はプロセスを示しています。
手順
ApsaraMQ for Kafka コンソールにログインします。左側のナビゲーションペインで、 を選択します。[タスク] ページで、タスクリストの作成 をクリックします。
[ソース]、[フィルタリング]、[変換]、[シンク] の手順で、[タスクの作成] ページで、イベントソース、フィルタリングルール、データクレンジングテンプレート、イベントターゲットを設定します。

① [ソース] と ④ [シンク]
異なる ApsaraMQ for Kafka インスタンスを選択します。
② [フィルタリング]
一致ルールを指定します。この手順はオプションです。このパラメーターを空のままにすると、すべてのイベントが一致とみなされます。詳細については、「イベントパターン」をご参照ください。
③ [変換]
Function Compute が提供するテンプレートを選択します。有効な値:[コンテンツ分割]、[コンテンツマッピング]、[コンテンツエンリッチメント]、[動的ルーティング]。ビジネス要件に基づいて上記のテンプレートのいずれかを選択できます。テンプレートは、データ処理の基本ロジックを提供します。テンプレート内のコードを使用してメッセージを処理できます。ビジネス要件に基づいてテンプレート内のコードを変更することもできます。
この例では、[コンテンツ分割] テンプレートが選択されています。
