すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for MQTT:MQTT メッセージを ApsaraMQ for Kafka にルーティング

最終更新日:Mar 11, 2026

IoT デバイスまたはアプリケーションが MQTT を介してメッセージをパブリッシュすると、分析、ストレージ、ストリーム処理などの目的で、下流サービスが Apache Kafka 上で同一のデータを必要とすることがあります。メッセージ流出タスクを使用すると、EventBridge を経由して ApsaraMQ for MQTT のトピックと ApsaraMQ for Kafka のトピックを接続し、カスタムブリッジコードを記述することなく、メッセージを自動的に転送できます。

仕組み

メッセージ流出タスクは、以下の 4 つのステップでデータを処理します。

  1. 取り込み — MQTT クライアントが ApsaraMQ for MQTT インスタンス上のトピックにメッセージをパブリッシュします。

  2. フィルター処理 — EventBridge が各メッセージに対してイベントパターンを評価し、該当するメッセージをフィルターします。

  3. 変換 — EventBridge がペイロードを分割、マッピング、エンリッチメント、または動的ルートによって再構成します。

  4. 配信 — EventBridge が処理済みのメッセージを指定された Kafka トピックに書き込みます。メッセージのキーおよび値は JSONPath を使用して抽出されます。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • ApsaraMQ for MQTT インスタンスが 実行中 の状態であることです。詳細については、「リソースの作成」をご参照ください。

  • ApsaraMQ for Kafka インスタンスが 実行中 の状態であることです。詳細については、「ApsaraMQ for Kafka の使い始め」をご参照ください。

  • EventBridge が有効化されており、Resource Access Management (RAM) ユーザーに必要な権限が付与されていることです。詳細については、「EventBridge の有効化と RAM ユーザーへの権限付与」をご参照ください。

メッセージ流出タスクの作成

  1. ApsaraMQ for MQTT コンソール にログインします。左側のナビゲーションウィンドウで、メッセージ統合 > タスク を選択します。

  2. 上部のナビゲーションバーからリージョンを選択します。タスク ページで、タスクの作成 をクリックします。

  3. タスクの作成 ページで、タスク名 および 説明 を入力し、以下の設定を行います。設定完了後、保存 をクリックします。

ステップ 1:ソースの設定

ルーティング対象のメッセージを生成する MQTT インスタンスおよびトピックを選択します。

ソース ステップで、データプロバイダーMessage Queue for MQTT に設定し、以下のパラメーターを構成します。その後、次のステップ をクリックします。

パラメーター説明
リージョンApsaraMQ for MQTT インスタンスのリージョンです。インスタンス作成時に選択したリージョンに基づいて自動入力されます。中国 (杭州)
MQTT インスタンスルーティング対象のメッセージを生成するインスタンスです。post-cn-jajh8i\*\*\*\*
MQTT トピックルーティング対象のメッセージを生成するトピックです。test-topic
データ形式配信前のバイナリペイロードのエンコーディング方式です。JSON(デフォルト):UTF-8 でエンコードされた JSON 形式。テキスト:UTF-8 文字列としてエンコードされます。バイナリ:Base64 文字列としてエンコードされます。JSON
バッチプッシュ1 回の配信で複数のイベントを集約します。「メッセージ数」しきい値または「間隔」しきい値のいずれかが最初に達成された時点でプッシュがトリガーされます。--
メッセージ関数呼び出し 1 回あたりに送信可能な最大メッセージ数です。バックログ内のメッセージ数が指定値に達した場合にのみリクエストが送信されます。有効な値:1~10,000。100
間隔(単位:秒)関数が呼び出される時間間隔です。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値:0~15。0 を指定すると、集約完了直後に即時配信されます。3

ステップ 2:メッセージのフィルター処理

シンクへ送信されるメッセージを定義します。

[フィルター処理]」ステップで、[パターン コンテンツ] エディターにイベントパターンを定義して、どのメッセージを続行させるかを選択します。パターン構文については、「イベントパターン」をご参照ください。

ステップ 3:メッセージの変換

配信前のペイロードを再構成します。

[変換]」ステップで、メッセージを分割、マップ、エンリッチ、または動的にルーティングするためのデータクレンジングルールを設定します。詳細については、「データクレンジング」をご参照ください。

ステップ 4:シンクの設定

処理済みのメッセージを Kafka トピックにマップします。

Sink (ターゲット) ステップで、サービスタイプMessage Queue for Apache Kafka に設定し、以下のパラメーターを構成します。

パラメーター説明
インスタンス IDメッセージを受信する ApsaraMQ for Kafka インスタンスです。test
Topicメッセージを書き込む Kafka トピックです。test
確認モード (ACK)Kafka ブローカーがメッセージ受信をどのように確認応答するかを指定します。--
メッセージ本文 (値)イベントペイロードからメッセージ本文を抽出する JSONPath 式です。$.data.value
メッセージキーイベントペイロードからメッセージキーを抽出する JSONPath 式です。$.data.key

ステップ 5:タスクのプロパティ設定

配信失敗時のタスクの動作を定義します。

失敗した配信のリトライポリシーとエラー処理方法を設定します。詳細については、「リトライポリシーとデッドレターキュー」をご参照ください。

タスクの有効化

新しく作成されたタスクは、デフォルトで無効化されています。有効化するには、以下の手順を実行します。

  1. タスク ページで、対象のタスクを見つけ、操作 列の 有効化 をクリックします。

  2. 注意事項 ダイアログボックスで、OK をクリックします。

タスクの起動には 30~60 秒かかります。Status 列で進行状況を確認できます。

タスクの管理

タスク ページで、対象のタスクを見つけ、操作 列を使用して管理します。

操作手順
詳細の表示詳細 をクリックして、基本情報、プロパティ、監視メトリクスを表示します。
タスクの編集編集 をクリックして、タスクの編集 パネルでタスクの詳細およびプロパティを変更します。
一時停止または再開一時停止 をクリックしてタスクを停止するか、有効化 をクリックして再開します。確認のため、OK をクリックします。
タスクの削除削除 をクリックし、確認のため OK をクリックします。

関連トピック