IoT デバイスまたはアプリケーションが MQTT を介してメッセージをパブリッシュすると、分析、ストレージ、ストリーム処理などの目的で、下流サービスが Apache Kafka 上で同一のデータを必要とすることがあります。メッセージ流出タスクを使用すると、EventBridge を経由して ApsaraMQ for MQTT のトピックと ApsaraMQ for Kafka のトピックを接続し、カスタムブリッジコードを記述することなく、メッセージを自動的に転送できます。
仕組み
メッセージ流出タスクは、以下の 4 つのステップでデータを処理します。
取り込み — MQTT クライアントが ApsaraMQ for MQTT インスタンス上のトピックにメッセージをパブリッシュします。
フィルター処理 — EventBridge が各メッセージに対してイベントパターンを評価し、該当するメッセージをフィルターします。
変換 — EventBridge がペイロードを分割、マッピング、エンリッチメント、または動的ルートによって再構成します。
配信 — EventBridge が処理済みのメッセージを指定された Kafka トピックに書き込みます。メッセージのキーおよび値は JSONPath を使用して抽出されます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
ApsaraMQ for MQTT インスタンスが 実行中 の状態であることです。詳細については、「リソースの作成」をご参照ください。
ApsaraMQ for Kafka インスタンスが 実行中 の状態であることです。詳細については、「ApsaraMQ for Kafka の使い始め」をご参照ください。
EventBridge が有効化されており、Resource Access Management (RAM) ユーザーに必要な権限が付与されていることです。詳細については、「EventBridge の有効化と RAM ユーザーへの権限付与」をご参照ください。
メッセージ流出タスクの作成
ApsaraMQ for MQTT コンソール にログインします。左側のナビゲーションウィンドウで、メッセージ統合 > タスク を選択します。
上部のナビゲーションバーからリージョンを選択します。タスク ページで、タスクの作成 をクリックします。
タスクの作成 ページで、タスク名 および 説明 を入力し、以下の設定を行います。設定完了後、保存 をクリックします。
ステップ 1:ソースの設定
ルーティング対象のメッセージを生成する MQTT インスタンスおよびトピックを選択します。
ソース ステップで、データプロバイダー を Message Queue for MQTT に設定し、以下のパラメーターを構成します。その後、次のステップ をクリックします。
| パラメーター | 説明 | 例 |
|---|---|---|
| リージョン | ApsaraMQ for MQTT インスタンスのリージョンです。インスタンス作成時に選択したリージョンに基づいて自動入力されます。 | 中国 (杭州) |
| MQTT インスタンス | ルーティング対象のメッセージを生成するインスタンスです。 | post-cn-jajh8i\*\*\*\* |
| MQTT トピック | ルーティング対象のメッセージを生成するトピックです。 | test-topic |
| データ形式 | 配信前のバイナリペイロードのエンコーディング方式です。JSON(デフォルト):UTF-8 でエンコードされた JSON 形式。テキスト:UTF-8 文字列としてエンコードされます。バイナリ:Base64 文字列としてエンコードされます。 | JSON |
| バッチプッシュ | 1 回の配信で複数のイベントを集約します。「メッセージ数」しきい値または「間隔」しきい値のいずれかが最初に達成された時点でプッシュがトリガーされます。 | -- |
| メッセージ | 関数呼び出し 1 回あたりに送信可能な最大メッセージ数です。バックログ内のメッセージ数が指定値に達した場合にのみリクエストが送信されます。有効な値:1~10,000。 | 100 |
| 間隔(単位:秒) | 関数が呼び出される時間間隔です。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値:0~15。0 を指定すると、集約完了直後に即時配信されます。 | 3 |
ステップ 2:メッセージのフィルター処理
シンクへ送信されるメッセージを定義します。
「[フィルター処理]」ステップで、[パターン コンテンツ] エディターにイベントパターンを定義して、どのメッセージを続行させるかを選択します。パターン構文については、「イベントパターン」をご参照ください。
ステップ 3:メッセージの変換
配信前のペイロードを再構成します。
「[変換]」ステップで、メッセージを分割、マップ、エンリッチ、または動的にルーティングするためのデータクレンジングルールを設定します。詳細については、「データクレンジング」をご参照ください。
ステップ 4:シンクの設定
処理済みのメッセージを Kafka トピックにマップします。
Sink (ターゲット) ステップで、サービスタイプ を Message Queue for Apache Kafka に設定し、以下のパラメーターを構成します。
| パラメーター | 説明 | 例 |
|---|---|---|
| インスタンス ID | メッセージを受信する ApsaraMQ for Kafka インスタンスです。 | test |
| Topic | メッセージを書き込む Kafka トピックです。 | test |
| 確認モード (ACK) | Kafka ブローカーがメッセージ受信をどのように確認応答するかを指定します。 | -- |
| メッセージ本文 (値) | イベントペイロードからメッセージ本文を抽出する JSONPath 式です。 | $.data.value |
| メッセージキー | イベントペイロードからメッセージキーを抽出する JSONPath 式です。 | $.data.key |
ステップ 5:タスクのプロパティ設定
配信失敗時のタスクの動作を定義します。
失敗した配信のリトライポリシーとエラー処理方法を設定します。詳細については、「リトライポリシーとデッドレターキュー」をご参照ください。
タスクの有効化
新しく作成されたタスクは、デフォルトで無効化されています。有効化するには、以下の手順を実行します。
タスク ページで、対象のタスクを見つけ、操作 列の 有効化 をクリックします。
注意事項 ダイアログボックスで、OK をクリックします。
タスクの起動には 30~60 秒かかります。Status 列で進行状況を確認できます。
タスクの管理
タスク ページで、対象のタスクを見つけ、操作 列を使用して管理します。
| 操作 | 手順 |
|---|---|
| 詳細の表示 | 詳細 をクリックして、基本情報、プロパティ、監視メトリクスを表示します。 |
| タスクの編集 | 編集 をクリックして、タスクの編集 パネルでタスクの詳細およびプロパティを変更します。 |
| 一時停止または再開 | 一時停止 をクリックしてタスクを停止するか、有効化 をクリックして再開します。確認のため、OK をクリックします。 |
| タスクの削除 | 削除 をクリックし、確認のため OK をクリックします。 |
関連トピック
イベントパターン — 詳細なメッセージ選択のためのフィルターパターンを作成します。
データクレンジング -- ペイロードの分割やエンリッチメントなどの変換オプションを確認します。
リトライポリシーとデッドレターキュー -- 失敗した配信のエラー処理とデッドレターキューを設定します。