コンシューマーがトピックをサブスクライブすると、ApsaraMQ for RocketMQブローカーはトピック内のすべてのメッセージをコンシューマーに配信します。 コンシューマーがトピックから特定のメッセージのみを受信する場合は、メッセージ属性とフィルター条件を設定できます。 このように、ApsaraMQ for RocketMQブローカーは、フィルター条件に一致する属性を持つメッセージのみをコンシューマーに配信します。 このトピックでは、メッセージフィルタリング機能の動作メカニズム、使用シナリオ、および制限について説明します。 このトピックでは、メッセージフィルタリングの設定方法についても説明し、メッセージフィルタリングのサンプルコードを提供します。
説明
メッセージ属性とフィルター条件を設定することで、メッセージフィルター機能を実装できます。 メッセージ属性は、プロデューサによってトピックに送信されたメッセージを分類するように構成され、フィルタ条件は、トピック内の特定の属性を有するメッセージをフィルタするように構成される。 これにより、ApsaraMQ for RocketMQブローカーは、プロデューサからのメッセージをフィルタリングし、指定されたフィルタ条件を満たすメッセージのみをコンシューマに配信できます。
コンシューマーがフィルター条件を指定せずにトピックをサブスクライブした場合、コンシューマーは、プロデューサーから送信されたメッセージに属性が設定されているかどうかに関係なく、トピック内のすべてのメッセージを受信します。
次の表に、ApsaraMQ for RocketMQでサポートされているフィルタリング方法を示します。
移動方法 | 説明 | シナリオ | インスタンス制限 | プロトコル制限 |
タグベースのフィルタリング (デフォルト) |
プロデューサによって送信されたメッセージのタグが、消費者がサブスクライブするメッセージの指定されたタグと一致する場合、ブローカは、メッセージを消費者に配信する。 | この方法は、単純なフィルタリングシナリオに適しています。 メッセージに追加できるタグは1つだけです。 このメソッドを使用して、単一の属性でメッセージを分類およびフィルタリングできます。 | なし。 | なし。 |
属性ベースのSQLフィルタリング |
フィルタ条件を満たすメッセージが消費者に配信されます。 | この方法は、複雑なフィルタリングシナリオに適しています。 メッセージに複数のカスタム属性を設定できます。 この方法では、カスタムSQL式を使用して複数の属性でメッセージをフィルタリングできます。 | Enterprise Platinum Editionインスタンスのみがこの方法をサポートしています。 | ApsaraMQ for RocketMQ TCPクライアントSDKのみがこの方法をサポートしています。 |
タグベースのフィルタリング
タグは、トピック内のメッセージを分類するラベルです。 ApsaraMQ for RocketMQプロデューサーがメッセージを送信する前に、メッセージにタグを追加できます。 次いで、消費者は、追加されたタグに基づいてメッセージをサブスクライブすることができる。
サンプルシナリオ
eコマースのトランザクションシナリオでは、次のメッセージが生成されます。
注文メッセージ
支払いメッセージ
物流メッセージ
メッセージは、次のダウンストリームシステムがサブスクライブしているTrade_topicという名前のトピックに送信されます。
支払いシステム: 支払いメッセージのみを購読します。
物流システム: 物流メッセージのみを購読します。
トランザクション成功率分析システム: 注文と支払いのメッセージを購読します。
リアルタイムコンピューティングシステム: すべてのメッセージをサブスクライブします。
次の図は、フィルタリングプロセスを示しています。
設定方法
ApsaraMQ for RocketMQでは、タグを使用してメッセージをフィルタリングするためのコードをクライアントSDKで定義できます。 プロデューサーがメッセージを送信する前に、メッセージにタグを追加し、コンシューマーがサブスクライブするメッセージのタグを指定する必要があります。 SDKの詳細については、「概要」をご参照ください。 次のセクションでは、プロデューサーとコンシューマーのコードを定義する方法について説明します。
メッセージを送信する
メッセージを送信する前に、メッセージごとにタグを指定します。
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());すべてのメッセージを購読する
トピック内のすべてのメッセージをサブスクライブする場合は、アスタリスク (*) を使用してすべてのタグを指定します。
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });特定のタイプのメッセージを購読する
コンシューマーがトピック内の特定のタイプのメッセージをサブスクライブする場合は、対応するタグを指定します。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });複数のタイプのメッセージを購読する
消費者がトピック内の複数のタイプのメッセージを購読したい場合は、対応するタグを指定します。 複数のタグは2本の縦棒 (| |) で区切ります。
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });間違った例
コンシューマーにトピックへの複数のサブスクリプションがあり、各サブスクリプションに異なるタグがある場合、コンシューマーは、タグが最新のサブスクリプションで指定されたタグと一致するメッセージのみを受信します。
// In the following code, a consumer can receive messages with TagB in MQ_TOPIC but cannot receive messages with TagA. consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
属性ベースのSQLフィルタリング
属性ベースのSQLフィルタリングメソッドを使用するには、次の手順を実行します。プロデューサーがメッセージを送信する前にメッセージのカスタム属性を設定し、SQL構文を使用してフィルター式を定義し、コンシューマーがサブスクライブするメッセージの属性を指定します。 ApsaraMQ for RocketMQは、カスタム属性がフィルター式の計算結果と一致するメッセージをフィルタリングし、そのメッセージをコンシューマーに配信します。
タグは、特殊なタイプのメッセージ属性である。 属性ベースのSQLフィルタリング方法は、タグベースのフィルタリング方法と互換性があります。 SQL式を使用して、メッセージのフィルタリングに使用するタグを指定できます。 SQL構文では、タグ属性はTAGSで表されます。
制限事項
属性ベースのSQLフィルタリングメソッドを使用してメッセージをフィルタリングする場合、次の制限事項に注意してください。
Enterprise Platinum Editionインスタンスのみがこの方法をサポートしています。
TCPクライアントのみがこのメソッドをサポートしています。
ブローカーが属性ベースのSQLフィルターメソッドをサポートしておらず、コンシューマーのフィルター式を定義した場合、コンシューマーの起動時にエラーが報告されるか、コンシューマーがメッセージを受信できない場合があります。
サンプルシナリオ
次の項目では、eコマースのトランザクションシナリオで生成されるメッセージについて説明します。 メッセージは、注文メッセージと物流メッセージとに分類される。 リージョン属性は、物流メッセージに対して指定される。 リージョン属性の値は、杭州と上海です。
注文メッセージ
物流メッセージ
リージョン属性の値が杭州であるロジスティクスメッセージ
リージョン属性の値が上海である物流メッセージ
メッセージは、次のダウンストリームシステムがサブスクライブしているTrade_topicという名前のトピックに送信されます。
ロジスティクスシステム1: リージョン属性値が杭州であるロジスティクスメッセージのみをサブスクライブします。
物流システム2: すべての物流メッセージを購読する。
注文追跡システム: 注文メッセージのみを購読します。
リアルタイムコンピューティングシステム: すべてのメッセージをサブスクライブします。
次の図は、フィルタリングプロセスを示しています。
設定方法
ApsaraMQ for RocketMQでは、クライアントSDKでコードを定義し、SQL式を使用してメッセージをフィルタリングできます。 メッセージを送信するにはプロデューサーコードでカスタムメッセージ属性を構成し、メッセージをサブスクライブするにはコンシューマーコードでSQL構文を使用してフィルター式を定義する必要があります。
メッセージ属性には次の制限があります。
プロデューサがメッセージを送信する前に、プロデューサは各メッセージのカスタム属性を指定することができる。 各属性は、カスタムのキーと値のペアです。
属性のキーには、英数字、およびアンダースコア (_) を使用できます。
属性のキーは、文字またはアンダースコア (_) で始まる必要があります。
メッセージごとに複数の属性を指定できます。
SDKの詳細については、「概要」をご参照ください。 次のセクションでは、プロデューサーとコンシューマーのコードを定義する方法について説明します。
プロデューサー
カスタムメッセージ属性を設定します。
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes()); // Configure Attribute A and set the attribute value to 1. msg.putUserProperties("A", "1");消費者
カスタム属性に基づいてメッセージをフィルタリングするSQL構文を使用して、フィルタ式を定義します。
重要カスタム属性に基づいてメッセージをフィルタリングするには、フィルタ式にロジックを定義して、メッセージ属性が存在するかどうかを確認する必要があります。 属性が存在しない場合、フィルター式の計算結果はNULLになり、メッセージはコンシューマーに配信されません。
// Subscribe to messages that have Attribute A and whose attribute value is 1. consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
次の表に、フィルター式の定義に使用できるさまざまな種類のSQL構文を示します。
構文 | 説明 | 例 |
IS NULL | 属性が存在しないことを指定します。 |
|
IS NOT NULL | 属性が存在することを指定します。 |
|
| 数値を比較します。 構文を使用して文字列を比較することはできません。 それ以外の場合、コンシューマーの起動時にエラーが報告されます。 説明 数値に変換できる文字列は数値と見なされます。 |
|
xxxとxxxの間 | 数値を比較します。 構文を使用して文字列を比較することはできません。 それ以外の場合、コンシューマーの起動時にエラーが報告されます。 構文は>= xxx AND <= xxxと同等で、属性の値が2つの数値の間、または2つの数値のいずれかに等しいことを指定します。 |
|
xxxとxxxの間ではありません | 数値を比較します。 構文を使用して文字列を比較することはできません。 それ以外の場合、コンシューマーの起動時にエラーが報告されます。 構文は <xxx OR > xxxと同等で、属性の値が左側の数値より小さいか、右側の数値より大きいことを指定します。 |
|
IN (xxx, xxx) | 属性の値をセットに含めることを指定します。 セット内の要素は文字列のみです。 |
|
| 等しい演算子と等しくない演算子。 演算子を使用して、数値と文字列を比較できます。 |
|
| 論理AND演算子と論理OR演算子。 演算子を使用して、単純な論理関数を組み合わせることができます。 各論理関数は括弧で囲む必要があります。 |
|
カスタムメッセージ属性を設定し、SQLフィルター式を定義することで、属性ベースのSQLフィルターを実装できます。 フィルター式は有効な結果を生成しない場合があります。 ApsaraMQ for RocketMQブローカーは、次のロジックに基づいてメッセージを処理します。
フィルター式の計算中にエラーが報告された場合、ブローカーは受信したメッセージを自動的に除外し、メッセージをコンシューマーに配信しません。 たとえば、数値と非数値が比較されるときに例外が発生します。
フィルター式の計算結果がNULLの場合、または値がブール値でない場合、ブローカーは受信したメッセージを自動的に除外し、メッセージをコンシューマーに配信しません。 ブール値は、真値または偽値を表す。 例えば、コンシューマがメッセージを購読するとき、コンシューマは、フィルタ条件としてプロデューサによって指定されていない属性を使用する。 この場合、フィルタ式の計算結果はNULLとなる。
カスタムメッセージ属性の値が浮動小数点数であるが、フィルター式で使用される属性値が整数である場合、ブローカーは受信したメッセージを自動的に除外し、メッセージをコンシューマーに配信しません。
サンプルコード
メッセージを送信する
メッセージのタグとカスタム属性を設定します。
Producer producer = ONSFactory.createProducer(properties); // Set the value of Tag to tagA. Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes()); // Set the value of the custom attribute region to hangzhou. msg.putUserProperties("region", "hangzhou"); // Set the value of the custom attribute price to 50. msg.putUserProperties("price", "50"); SendResult sendResult = producer.send(msg);カスタム属性に基づいてメッセージをサブスクライブします。
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe to only messages whose value of the custom attribute region is hangzhou. If you do not configure this attribute for a message or the attribute value of the message is not hangzhou, the message is not delivered to the consumer. consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });期待される結果: この例で送信されるメッセージにはカスタム属性領域があり、属性値はhangzhouです。 メッセージはフィルタ条件を満たし、消費者に配信されます。
タグとカスタム属性に基づいてメッセージをサブスクライブします。
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe only to messages that have tagA and whose value of the custom attribute price is greater than 30. consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });期待される結果: この例で送信されるメッセージにはtagAとカスタム属性価格があり、カスタム属性価格の値は30を超えています。 メッセージはフィルタ条件を満たし、消費者に配信されます。
複数のカスタム属性に基づいてメッセージを購読します。
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe only to messages whose value of the custom attribute region is hangzhou and the value of the custom attribute price is less than 20. consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });期待される結果: メッセージがフィルター条件を満たしていないため、メッセージはコンシューマーに配信されません。 コンシューマは、カスタム属性価格の値が20未満であるメッセージをサブスクライブしますが、プロデューサでメッセージに指定されたカスタム属性価格の値は50です。
トピック内のすべてのメッセージを購読します。
Consumer consumer = ONSFactory.createConsumer(properties); // To subscribe to all messages in the topic, set the value of the SQL expression to TRUE. consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });期待される結果: トピック内のすべてのメッセージがコンシューマーに配信されます。
間違った例
プロデューサーがメッセージを送信する前に、カスタム属性はメッセージに設定されておらず、カスタム属性が存在するかどうかを確認するためのロジックはフィルター式に定義されていません。 カスタム属性は、式のフィルター条件として直接使用されます。 この場合、メッセージは消費者に配信されません。
Consumer consumer = ONSFactory.createConsumer(properties); // The message attribute product is not configured during message sending. The filtering fails and the message is not delivered to the consumer. consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
関連ドキュメント
同じグループIDを使用するコンシューマインスタンスは、同じトピックをサブスクライブする必要があります。 詳細については、「サブスクリプションの一貫性」をご参照ください。
トピックとタグを使用して、さまざまなサービスのメッセージを分類できます。 詳細については、「トピックとタグのベストプラクティス」をご参照ください。