Function Compute (FC) が提供するデータクレンジングテンプレートを使用してメッセージデータを処理したり、ビジネス要件に基づいてテンプレートコードを修正してカスタムのクレンジングニーズに対応したりできます。このトピックでは、ApsaraMQ for RocketMQ のデータに対するコンテンツ分割を例に、メッセージ処理テンプレートの種類とその使用方法について説明します。
概要
データクレンジング機能は、FC に基づく基本的なオペレーター機能を提供します。ApsaraMQ for RocketMQ でメッセージデータクレンジングタスクを作成した後、Function Compute にログインしてコードと関数の設定を修正できます。
オペレーター | 説明 |
メッセージフィルタリング | 正規表現に基づいてメッセージ本文を照合し、一致したメッセージを指定の送信先に送信します。詳細については、「イベントストリームのイベントパターン」をご参照ください。 |
メッセージ変換 | 文字列一致条件に基づいてメッセージ本文を変換し、変換後のメッセージを指定の送信先に送信します。例えば、文字の大文字と小文字の変換を実行できます。変換されたメッセージは、指定された送信先に送信できます。詳細については、「イベントストリームでのイベント変換」をご参照ください。 |
正規表現に基づいてメッセージ本文を分割し、分割されたメッセージを指定の送信先に送信します。 | |
正規表現に基づいてメッセージ本文を照合し、一致したメッセージを指定の送信先に、一致しなかったメッセージをデフォルトの送信先に送信します。 | |
ソースに基づいてメッセージ本文をエンリッチします。例えば、メッセージの元のコンテンツに AccountID が含まれている場合、AccountID を使用してデータベースをクエリし、顧客のリージョンを取得します。システムは顧客のリージョンをソースメッセージの本文に挿入し、そのメッセージ本文を指定の送信先サービスに送信します。 | |
正規表現に基づいてメッセージ本文をマッピングします。例えば、システムはメッセージ内の機密フィールドをマスクしたり、メッセージサイズを最小サイズに縮小したりできます。 |
例
コンテンツ分割
例えば、元の学生リストメッセージ [Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4] を 3 つの個別のメッセージに分割し、これらの 3 つのメッセージをそれぞれの送信先サービスに送信する必要があるとします。これを実装するには、コンテンツ分割オペレーターを使用します。分割されたメッセージは次のとおりです。
message:
[Jack, Male, Class 4]
message:
[Alice, Female, Class 3]
message:
[John, Male, Class 4]
動的ルーティング
次のメッセージには、3 つのブランドの歯磨き粉に関する情報が含まれています。
message:
[BrandA, toothpaste, $12.98, 100g
BrandB, toothpaste, $7.99, 80g
BrandC, toothpaste, $1.99, 100g]このリストは、カスタムの動的ルールに基づいて送信先 Topic に送信する必要があります。ルールは次のとおりです。
BrandA で始まるメッセージを BrandA-item-topic と BrandA-discount-topic に送信します。
BrandB で始まるメッセージを BrandB-item-topic と BrandB-discount-topic に送信します。
その他のメッセージを Unknown-brand-topic に送信します。
次のサンプルコードは、ルールの JSON 形式を示しています。
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}コンテンツエンリッチメント
この例では、CIDR ブロックがエンリッチされます。次のサンプルコードは、サービスのアクセスログの例です。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}
次のサンプルコードは、IP アドレスのソースをクエリし、マッピング関係を MySQL データベースに保存する方法の例です。
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );
次のサンプルコードは、処理されたメッセージの例です。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "beijing"
}
コンテンツマッピング
次のメッセージには、ある会社の従業員の登録情報が含まれています。
Zhang San, Employee ID 1, 131 1111 1111
Li Si, Employee ID 2, 132 2222 2222
Wang Wu, Employee ID 3, 133 3333 3333
上記のメッセージに含まれる従業員の名前、ID、電話番号は機密情報です。そのため、メッセージを送信先サービスに送信する前に、名前、ID、電話番号をマスクする必要があります。次のサンプルコードに例を示します。
Ja*, 従業員 ID *, ***********
Ma*, 従業員 ID *, ***********
Dav*, 従業員 ID *, ***********
操作手順
1. ApsaraMQ for RocketMQ インスタンスの Topic の作成
-
ApsaraMQ for RocketMQ コンソールにログインします。左側のナビゲーションウィンドウで、[インスタンス] をクリックします。上部のメニューバーでリージョンを選択し、[インスタンスの作成] をクリックします。
-
[RocketMQ インスタンスの作成] パネルで、[インスタンスバージョン] (例:[4.0 シリーズ]) を選択します。[インスタンスタイプ] で [標準インスタンス] を選択します。インスタンス名 (例:
test) と説明を入力し、[OK] をクリックします。 -
[インスタンス] ページで、対象のインスタンスをクリックします。インスタンス詳細ページの左側のナビゲーションウィンドウで [Topics] をクリックし、[Topic の作成] をクリックします。
-
[Topic の作成] パネルで、
source-topicとtarget-topicという名前の 2 つの Topic を作成します。[説明] を入力します。[メッセージタイプ] で [通常メッセージ] を選択し、[OK] をクリックします。説明少なくとも 2 つの Topic を作成する必要があります。1 つは元のメッセージを送信するイベントソースとして、もう 1 つはクレンジングされたデータを受信するイベントターゲットとして使用します。2 つの Topic は、同じ RocketMQ インスタンスに属していても、異なる RocketMQ インスタンスに属していてもかまいません。
2. イベントストリームの作成
-
EventBridge コンソールにログインします。左側のナビゲーションウィンドウで [イベントストリーム] をクリックします。上部のメニューバーでリージョンを選択し、[イベントストリームの作成] をクリックします。
-
[イベントストリームの作成] ページで、[ソース]、[フィルタリング]、[変換]、[シンク] の各ステップでイベントソース、フィルタリングルール、データクレンジングテンプレート、イベントターゲットを設定します。その後、[保存] をクリックします。
ソースとシンク
-
[ソース] では、ApsaraMQ for RocketMQ インスタンス test を選択し、Topic には
source-topicを選択します。 -
[シンク] では、ApsaraMQ for RocketMQ インスタンス test を選択します。Topic には
target-topicを選択します。
他の設定項目はデフォルト値のままにします。
[フィルタリング]
このステップはオプションです。デフォルト設定のままにして、[次へ] をクリックします。
③ [変換]
-
[Alibaba Cloud サービスを選択]:Function Compute。
-
[関数テンプレートの作成] を選択します:イベントストリームとともに、新しい FC 関数
EventStreaming_Transform_Customized_****が作成されます。 -
[関数テンプレート]:この例では、[コンテンツ分割] テンプレートを使用します。利用可能な他のテンプレートには、[コンテンツマッピング]、[コンテンツエンリッチメント]、[動的ルーティング] があります。要件に応じてテンプレートを選択してください。これらのテンプレートは、直接使用したりカスタマイズしたりできる基本的なデータ処理ロジックを提供します。
関数名は
EventStreaming_Transform_Split_の形式で自動的に生成され、サフィックスが付きます。テンプレートコードは、UTF-8 を使用してイベントをデコードし、その後ast.literal_evalとjson.loadsを使用してメッセージ配列を解析します。設定が有効な場合、ページの上部に緑色の [有効な設定] ラベルが表示されます。 -
3. テストと検証
3.1 ソース RocketMQ インスタンスの Topic で元のメッセージを送信
-
Message Queue for Apache RocketMQ コンソールにログインし、イベントストリームを作成したときに設定した [ソース] インスタンスの Topic
source-topicを見つけ、右側の [操作] 列で [クイック体験] をクリックします。 -
[メッセージの送受信を開始] パネルで、元のメッセージ
[Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4]を入力し、[OK] をクリックしてメッセージを送信します。
3.2 送信先 RocketMQ インスタンスの Topic でメッセージが正しく分割されていることを確認
-
Message Queue for Apache RocketMQ コンソールで、イベントストリームを作成したときに設定した [シンク] インスタンスの Topic
target-topicを見つけ、Topic 名をクリックし、[メッセージクエリ] タブを選択します。 -
[クエリ方法] で [Topic によるクエリ] を選択し、[検索] をクリックします。クエリ結果には、元のメッセージがシンクで 3 つの個別のメッセージに分割されたことが示されます。各メッセージの行にある [詳細] をクリックします。3 つのメッセージは
"data": "Jack, Male, Class 4"、"data": "Alice, Female, Class 3"、"data": "John, Male, Class 4"です。
4. リソースのクリーンアップ
テスト後、この機能を短期間使用しない場合は、不要なコストを避けるために、作成したリソースを速やかにリリースしてください。詳細については、「Topic の削除」、「RocketMQ インスタンスの削除」、および「関数の削除」をご参照ください。