すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for MQTT:ApsaraMQ for Kafka

最終更新日:Jan 14, 2025

このトピックでは、ApsaraMQ for MQTT コンソールでメッセージアウトフロータスクを作成し、ApsaraMQ for MQTT インスタンスから ApsaraMQ for Kafka インスタンスにデータを同期する方法について説明します。

前提条件

  • ApsaraMQ for MQTT インスタンスが購入され、デプロイされていること。インスタンスが [実行中] 状態であることを確認します。詳細については、「リソースの作成」をご参照ください。

  • ApsaraMQ for Kafka インスタンスが購入され、デプロイされていること。インスタンスが [実行中] 状態であることを確認します。詳細については、「プロセス」をご参照ください。

  • EventBridge がアクティブ化されており、必要な権限が Resource Access Management (RAM) ユーザーに付与されていること。詳細については、「EventBridge のアクティブ化と RAM ユーザーへの権限の付与」をご参照ください。

メッセージアウトフロータスクの作成

  1. ApsaraMQ for MQTT コンソール にログインします。左側のナビゲーションペインで、メッセージ統合 > タスクリスト を選択します。

  2. 上部のナビゲーションバーで、中国 (杭州) などのリージョンを選択します。タスクリスト ページで、[タスクの作成] をクリックします。

  3. タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを設定し、画面の指示に従って他のパラメーターを設定します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [message Queue For MQTT] に設定し、画面の指示に従って他のパラメーターを設定します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。

        パラメーター

        説明

        リージョン

        ApsaraMQ for MQTT インスタンスが存在するリージョン。インスタンスの作成時に選択したリージョンが自動的に入力されます。

        中国 (杭州)

        MQTT インスタンス

        ルーティングするメッセージが生成される ApsaraMQ for MQTT インスタンス。

        post-cn-jajh8i****

        MQTT Topic

        ルーティングするメッセージが生成される ApsaraMQ for MQTT インスタンスのトピック。

        test-topic

        [データ形式]

        データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコードに関する特別な要件がない場合は、値として JSON を指定します。

        • JSON: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。これはデフォルト値です。

        • テキスト: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        • バイナリ: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        JSON

        バッチプッシュ

        バッチプッシュ機能は、一度に複数のイベントを集約するのに役立ちます。この機能は、一括プッシュの件数 パラメーターまたは バッチプッシュ間隔 (単位:秒) パラメーターで指定された条件が満たされた場合にトリガーされます。

        たとえば、[メッセージ] パラメーターを 100 に、[間隔 (単位: 秒)] パラメーターを 15 に設定した場合、経過時間が 10 秒であっても、メッセージ数が 100 に達するとプッシュが実行されます。

        なし

        一括プッシュの件数

        各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。

        3

      2. Filtering (フィルタリング) ステップで、パターン内容 コードエディターでデータパターンを定義して、リクエストをフィルタリングします。詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) ステップで、データクレンジング方法を指定して、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装します。詳細については、「データクレンジング」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ パラメーターを [message Queue For Apache Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。次の表にパラメーターを示します。

        パラメーター

        説明

        インスタンス ID

        作成した ApsaraMQ for Kafka インスタンス。

        test

        Topic

        メッセージのルーティング先となる ApsaraMQ for Kafka インスタンスのトピック。

        test

        確認モード (ACK)

        ApsaraMQ for Kafka インスタンスがメッセージ受信後にクライアントに確認応答 (ACK) を送信するモード。

        なし

        メッセージ本文 (値)

        EventBridge は JSONPath を使用してイベントからデータを抽出し、イベントの指定されたコンテンツをイベントターゲットにルーティングします。

        データ抽出

        $.data.value

        メッセージキー

        EventBridge は JSONPath を使用してイベントからデータを抽出し、イベントの指定されたコンテンツをイベントターゲットにルーティングします。

        データ抽出

        $.data.key
    • タスクのプロパティ

      イベントのプッシュに失敗した場合に使用される再試行ポリシーと、障害の処理に使用される方法を設定します。詳細については、「再試行ポリシーとデッドレターキュー」をご参照ください。

  4. タスクリスト ページに戻り、作成したメッセージアウトフロータスクを見つけ、操作する 列の 有効化する をクリックします。

  5. ヒント メッセージで、OK をクリックします。

    タスクが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進捗状況を確認できます。

その他の操作

タスクリスト ページで、管理するメッセージアウトフロータスクを見つけ、操作する 列でその他の操作を実行します。

  • タスクの詳細を表示する: [アクション] 列の 詳細 をクリックします。[タスクの詳細] ページで、タスクの基本情報、プロパティ、および監視メトリクスを表示します。

  • タスクの設定を変更する: [アクション] 列の 編集する をクリックします。[タスクの編集] パネルで、タスクの詳細とプロパティを変更します。

  • タスクを有効または無効にする: [アクション] 列の 有効化する または 無効化 をクリックします。ヒント メッセージで、OK をクリックします。

  • タスクを削除する: [アクション] 列の 削除する をクリックします。ヒント メッセージで、OK をクリックします。