メッセージデータクレンジング機能では、メッセージ分割、動的ルート、メッセージ本文の充実化など、一般的なメッセージ処理テンプレートを提供します。これらのテンプレートをそのまま利用してメッセージを処理することも、必要に応じてコードをカスタマイズすることも可能です。本トピックでは、ApsaraMQ for RocketMQ 向けのメッセージデータクレンジングテンプレートの種類と使用方法について説明します。
背景情報
メッセージデータクレンジングタスクは、基本的なオペレーター機能を提供し、その基盤ロジックとして Function Compute を利用します。まず ApsaraMQ for RocketMQ 向けのメッセージデータクレンジングタスクを作成した後、Function Computeコンソール にログインして、コードをカスタマイズしたり、対応する関数の設定を変更したりできます。
オペレーター | オペレーター機能 |
メッセージフィルタリング | 正規表現を用いてメッセージ本文を照合し、該当するメッセージをターゲットへ送信します。詳細については、「イベントパターン」をご参照ください。 |
メッセージ変換 | 文字列照合に基づきメッセージ本文を置換します(例:大文字と小文字の変換)。変換後のメッセージをターゲットへ送信します。詳細については、「イベント本文の変換」をご参照ください。 |
正規表現を用いてメッセージ本文を分割し、各分割結果を個別にターゲットへ送信します。 | |
正規表現を用いてメッセージ本文を照合し、該当するメッセージを対応するターゲットへルーティングします。該当しないメッセージはデフォルトターゲットへルーティングします。 | |
充実化ソースを用いてメッセージ本文を充実化します。たとえば、元のメッセージに AccountID が含まれている場合、その AccountID を用いてデータベースを照会し、顧客のリージョン情報を取得してメッセージ本文に追加したうえで、ターゲットサービスへ送信します。 | |
正規表現を用いてメッセージ本文をマッピングします(例:機密フィールドのマスク処理や、メッセージサイズの最小化)。 |
コンテンツ分割
例
以下のような学生一覧を例に説明します。
メッセージ:
[チョウ・サン、男性、17 歳、クラス 4;リ・シ、女性、17 歳、クラス 3;ワン・ウー、男性、17 歳、クラス 4]このメッセージを個別の学生レコードに分割し、それぞれを独立したメッセージとしてターゲットサービスへ送信できます。結果は以下のとおりです。
メッセージ:
[Zhao San, male, 17, Class 4]
メッセージ:
[Li Si, female, 17, Class 3]
メッセージ:
[Wang Wu, male, 17, Class 4]操作手順
まず、ApsaraMQ for RocketMQ コンソール にログインします。
左側のナビゲーションウィンドウで、 を選択します。その後、上部のメニューバーからリージョンを選択します。
メッセージシンク ページで、タスクの作成 をクリックします。
メッセージシンクの作成 パネルで、以下の項目を設定し、OK をクリックします。
主な設定項目について以下に説明します。その他の設定はすべてデフォルト値のままとします。
基本情報
設定項目
説明
タスク名
タスク名を入力します。
エグレス タイプ
RocketMQ を選択します。サポートされるメッセージサービスには、RocketMQ、RabbitMQ、Simple Message Queue (旧称:MNS)、および Kafka があります。
リソース構成
設定項目
説明
ソース
リージョン
中国 (杭州) を選択します。
バージョン
RocketMQ のバージョンを選択します。本例では RocketMQ 5.x を使用します。
RocketMQ インスタンス
メッセージを生成する RocketMQ インスタンスを選択します。
トピック
ソースインスタンスからトピックを選択します。
ターゲット
バージョン
メッセージを受信する RocketMQ のバージョンを選択します。本例では RocketMQ 5.x を使用します。
インスタンス ID
メッセージを受信する RocketMQ インスタンスのインスタンス ID を選択します。
トピック
ターゲットインスタンスからトピックを選択します。
データ処理
メッセージフィルタリング: フィルタリングなし を選択します。
メッセージ変換: カスタム設定 を選択します。メッセージ本文 (body) については、データクレンジング を選択し、関数テンプレートの作成 をクリックします。関数テンプレート では、コンテンツ分割 transform_split を選択し、必要に応じて関数コードを修正します。
タスク作成後、Function Computeコンソール にログインして、自動的に作成された サービス および 関数 を確認できます。
動的ルーティング
例
以下のような歯磨き粉在庫リストを例に説明します。
message:
[ブランドA, 歯磨き粉, $12.98, 100g
ブランドB, 歯磨き粉, $7.99, 80g
ブランドC, 歯磨き粉, $1.99, 100g]このリストをカスタムの動的ルールを用いてターゲットトピックへルーティングできます。ルールの内容は以下のとおりです。
メッセージが「ブランドA」で始まる場合、BrandA-item-topic および BrandA-discount-topic へ送信します。
メッセージが「ブランドB」で始まる場合、BrandB-item-topic および BrandB-discount-topic へ送信します。
その他のすべてのメッセージは、Unknown-brand-topic へ送信します。
このルールの JSON 表現は以下のとおりです。
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}操作手順
詳細な手順については、「コンテンツ分割の操作手順」をご参照ください。関数テンプレート では、動的ルーティング dynamic_routing を選択します。
コンテンツ充実化
例
この例では、IP アドレスセグメントの充実化方法を説明します。あるサービスのアクセスログが以下のように記録されているとします。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}IP アドレスの発信元を特定する必要があります。このマッピング情報は MySQL データベースに格納できます。
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );充実化後のメッセージは以下のとおりです。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "北京"
}操作手順
詳細な手順については、「コンテンツ分割の操作手順」をご参照ください。関数テンプレート では、コンテンツ充実化 transform_enrichment を選択します。
コンテンツマッピング
例
以下のような企業の従業員登録データを例に説明します。このデータには、従業員 ID や電話番号などの非公開情報が含まれています。
Zhao San, ID 1, 131 1111 1111
Li Si, ID 2, 132 2222 2222
Wang Wu, ID 3, 133 3333 3333メッセージ内の非公開情報をマスク処理し、ターゲットサービスへ送信できます。結果は以下のとおりです。
張*, 従業員ID *, *** **** ****
李*, 従業員ID *, *** **** ****
王*, 従業員ID *, *** **** ****操作手順
詳細な手順については、「コンテンツ分割の操作手順」をご参照ください。関数テンプレート では、コンテンツマッピング transform_projection を選択します。