Function Compute が提供するデータクレンジングテンプレートを使用してメッセージデータを処理したり、ビジネス要件に基づいてテンプレートコードを変更して、カスタムのクレンジングニーズに対応したりできます。このトピックでは、ApsaraMQ for RocketMQ のデータのコンテンツ分割を例に、メッセージ処理テンプレートの種類とその使用方法について説明します。
概要
データクレンジング機能は、Function Compute に基づく基本的なオペレーター機能を提供します。ApsaraMQ for RocketMQ でメッセージデータクレンジングタスクを作成した後、Function Compute にログインして、コードと関数の設定を変更できます。
オペレーター | 説明 |
メッセージフィルタリング | 正規表現に基づいてメッセージの内容を照合し、一致したメッセージを指定の宛先に送信します。詳細については、「イベントパターン」をご参照ください。 |
メッセージ変換 | 文字列の一致条件に基づいてメッセージの内容を変換し、変換されたメッセージを指定の宛先に送信します。例えば、文字の大文字と小文字の変換を実行できます。変換されたメッセージは、指定された宛先に送信できます。詳細については、「イベント変換」をご参照ください。 |
正規表現に基づいてメッセージの内容を分割し、分割されたメッセージを指定の宛先に送信します。 | |
正規表現に基づいてメッセージの内容を照合し、一致したメッセージを指定の宛先に、一致しなかったメッセージをデフォルトの宛先に送信します。 | |
ソースに基づいてメッセージの内容をエンリッチします。例えば、メッセージの元の内容に AccountID が含まれている場合、AccountID を使用してデータベースをクエリし、顧客のリージョンを取得します。システムは顧客のリージョンをソースメッセージの本文に挿入し、メッセージ本文を指定の宛先サービスに送信します。 | |
正規表現に基づいてメッセージの内容をマッピングします。例えば、システムはメッセージ内の機密フィールドをマスクしたり、メッセージサイズを最小サイズに縮小したりできます。 |
例
コンテンツ分割
例えば、元の学生リストメッセージ [Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4] を 3 つの個別のメッセージに分割し、これらの 3 つのメッセージをそれぞれの宛先サービスに送信する必要があります。これを実装するには、コンテンツ分割オペレーターを使用します。分割されたメッセージは次のとおりです。
message:
[Jack, Male, Class 4]
message:
[Alice, Female, Class 3]
message:
[John, Male, Class 4]次の図にプロセスを示します。
動的ルーティング
次のメッセージには、3 つのブランドの歯磨き粉に関する情報が含まれています。
message:
[BrandA, toothpaste, $12.98, 100g
BrandB, toothpaste, $7.99, 80g
BrandC, toothpaste, $1.99, 100g]このリストは、カスタムの動的ルールに基づいて宛先 Topic に送信する必要があります。ルールは次のとおりです。
BrandA で始まるメッセージを BrandA-item-topic と BrandA-discount-topic の Topic に送信します。
BrandB で始まるメッセージを BrandB-item-topic と BrandB-discount-topic の Topic に送信します。
その他のメッセージを Unknown-brand-topic の Topic に送信します。
次のサンプルコードは、ルールの JSON 形式を示しています。
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}次の図にプロセスを示します。
コンテンツエンリッチメント
この例では、CIDR ブロックがエンリッチされます。次のサンプルコードは、サービスのアクセスログの例です。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}次のサンプルコードは、IP アドレスのソースをクエリし、マッピング関係を MySQL データベースに格納する方法の例です。
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );次のサンプルコードは、処理されたメッセージの例です。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "beijing"
}次の図にプロセスを示します。
コンテンツマッピング
次のメッセージには、会社の従業員の登録情報が含まれています。
Zhang San, 従業員ID 1, 131 1111 1111
Li Si, 従業員ID 2, 132 2222 2222
Wang Wu, 従業員ID 3, 133 3333 3333上記のメッセージに含まれる従業員の名前、ID、電話番号は機密情報です。そのため、メッセージを宛先サービスに送信する前に、名前、ID、電話番号をマスクする必要があります。次のサンプルコードに例を示します。
Ja*, Employee ID *, ***********
Ma*, Employee ID *, ***********
Dav*, Employee ID *, *********** 次の図にプロセスを示します。
操作手順
1. ApsaraMQ for RocketMQ インスタンスの Topic の作成
ApsaraMQ for RocketMQ コンソールにログインします。左側のナビゲーションウィンドウで、[Instances] をクリックします。上部のナビゲーションバーでリージョンを選択し、[Create Instance] をクリックします。
[Create RocketMQ Instance] パネルで、[Instance Version] (例:[4.0 Series]) を選択し、[Instance Type] で [Standard Instance] を選択し、インスタンス名 (例:
test) とインスタンスの説明を入力して、[OK] をクリックします。[Instances] ページで、対象のインスタンスをクリックします。インスタンス詳細ページの左側のナビゲーションウィンドウで [Topics] をクリックし、[Create Topic] をクリックします。
[Create Topic] パネルで、Topic 名 (例:
source-topicおよびtarget-topic) を設定し、[Description] を入力し、[Message Type] で [Normal Message] を選択して、[OK] をクリックします。説明少なくとも 2 つの Topic を作成する必要があります。1 つは元のメッセージを送信するイベントソースとして、もう 1 つはクレンジングされたデータを受信するイベントターゲットとして使用します。2 つの Topic は、同じ RocketMQ インスタンスに属することも、異なる RocketMQ インスタンスに属することもできます。
2. イベントストリームの作成
EventBridge コンソールにログインします。左側のナビゲーションウィンドウで、[Event Streams] をクリックします。上部のナビゲーションバーでリージョンを選択し、[Create Event Stream] をクリックします。
[Create Event Stream] ページで、[Source]、[Filtering]、[Transformation]、[Sink] の設定ウィザードでイベントソース、フィルタリングルール、データクレンジングテンプレート、イベントターゲットを設定し、[Save] をクリックします。

① [Source] と ④ [Sink]
[Source] では、ApsaraMQ for RocketMQ インスタンス test を選択し、Topic には
source-topicを選択します。[Sink] では、ApsaraMQ for RocketMQ インスタンス test を選択し、Topic には
target-topicを選択します。
その他の設定項目はデフォルト値のままにします。
② [Filtering]
これはオプションの設定です。デフォルト値のままにして、[Next] をクリックします。
③ [Transformation]
[Select Alibaba Cloud Service]: Function Compute。
[Create Function Template] を選択します。イベントストリームを作成すると、新しい FC 関数
EventStreaming_Transform_Customized_****が作成されます。[Function Template]: この例では、コンテンツ分割テンプレートが選択されています。オプションには、[Content Splitting]、[Content Mapping]、[Content Enrichment]、[Dynamic Routing] があります。ビジネス要件に基づいて、これらのテンプレートのいずれかを選択できます。テンプレートは、直接使用したり、カスタム調整したりできる基本的なデータ処理ロジックを提供します。

3. テストと検証
3.1 ソース RocketMQ インスタンスの Topic での元のメッセージの送信
ApsaraMQ for RocketMQ コンソールにログインします。イベントストリームの作成時に設定した [Source] インスタンスの Topic
source-topicを見つけます。[Actions] 列で、[Quick Start] をクリックします。[Start Message Production And Consumption] パネルで、元のメッセージ
[Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4]を入力し、[OK] をクリックしてメッセージを送信します。
3.2 宛先 RocketMQ インスタンスの Topic でメッセージが正しく分割されていることの確認
ApsaraMQ for RocketMQ コンソールで、イベントストリームの作成時に設定した [Sink] インスタンスの Topic
target-topicを見つけます。Topic 名をクリックし、[Message Query] タブを選択します。[Query Method] で [Query By Topic] を選択し、[Search] をクリックします。クエリ結果では、前のステップで送信されたメッセージが、宛先インスタンスに送信される際に 3 つのメッセージに分割されていることがわかります。各メッセージ行の [Details] をクリックすると、3 つのメッセージが
"data": "Jack, Male, Class 4"、"data": "Alice, Female, Class 3"、"data": "John, Male, Class 4"であることがわかります。
4. リソースのクリーンアップ
テスト後、この機能を短期間使用しない場合は、不要なコストを避けるために、作成したリソースを速やかにリリースしてください。詳細については、「Topic の削除」、「RocketMQ インスタンスの削除」、および「関数の削除」をご参照ください。