このトピックでは、ApsaraMQ for MQTT コンソールでメッセージアウトフロータスクを作成し、ApsaraMQ for MQTT インスタンスから ApsaraMQ for Kafka インスタンスにデータを同期する方法について説明します。
前提条件
ApsaraMQ for MQTT インスタンスが購入され、デプロイされていること。インスタンスが [実行中] 状態であることを確認します。詳細については、「リソースの作成」をご参照ください。
ApsaraMQ for Kafka インスタンスが購入され、デプロイされていること。インスタンスが [実行中] 状態であることを確認します。詳細については、「プロセス」をご参照ください。
EventBridge がアクティブ化されており、必要な権限が Resource Access Management (RAM) ユーザーに付与されていること。詳細については、「EventBridge のアクティブ化と RAM ユーザーへの権限の付与」をご参照ください。
メッセージアウトフロータスクの作成
ApsaraMQ for MQTT コンソール にログインします。左側のナビゲーションペインで、 を選択します。
上部のナビゲーションバーで、中国 (杭州) などのリージョンを選択します。タスクリスト ページで、[タスクの作成] をクリックします。
タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを設定し、画面の指示に従って他のパラメーターを設定します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメーターを [message Queue For MQTT] に設定し、画面の指示に従って他のパラメーターを設定します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
リージョン
ApsaraMQ for MQTT インスタンスが存在するリージョン。インスタンスの作成時に選択したリージョンが自動的に入力されます。
中国 (杭州)
MQTT インスタンス
ルーティングするメッセージが生成される ApsaraMQ for MQTT インスタンス。
post-cn-jajh8i****
MQTT Topic
ルーティングするメッセージが生成される ApsaraMQ for MQTT インスタンスのトピック。
test-topic
[データ形式]
データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコードに関する特別な要件がない場合は、値として JSON を指定します。
JSON: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。これはデフォルト値です。
テキスト: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
バイナリ: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
JSON
バッチプッシュ
バッチプッシュ機能は、一度に複数のイベントを集約するのに役立ちます。この機能は、一括プッシュの件数 パラメーターまたは バッチプッシュ間隔 (単位:秒) パラメーターで指定された条件が満たされた場合にトリガーされます。
たとえば、[メッセージ] パラメーターを 100 に、[間隔 (単位: 秒)] パラメーターを 15 に設定した場合、経過時間が 10 秒であっても、メッセージ数が 100 に達するとプッシュが実行されます。
なし
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。
3
Filtering (フィルタリング) ステップで、パターン内容 コードエディターでデータパターンを定義して、リクエストをフィルタリングします。詳細については、「イベントパターン」をご参照ください。
Transform (変換) ステップで、データクレンジング方法を指定して、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装します。詳細については、「データクレンジング」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ パラメーターを [message Queue For Apache Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。次の表にパラメーターを示します。
パラメーター
説明
例
インスタンス ID
作成した ApsaraMQ for Kafka インスタンス。
test
Topic
メッセージのルーティング先となる ApsaraMQ for Kafka インスタンスのトピック。
test
確認モード (ACK)
ApsaraMQ for Kafka インスタンスがメッセージ受信後にクライアントに確認応答 (ACK) を送信するモード。
なし
メッセージ本文 (値)
EventBridge は JSONPath を使用してイベントからデータを抽出し、イベントの指定されたコンテンツをイベントターゲットにルーティングします。
データ抽出
$.data.value
メッセージキー
EventBridge は JSONPath を使用してイベントからデータを抽出し、イベントの指定されたコンテンツをイベントターゲットにルーティングします。
データ抽出
$.data.key
タスクのプロパティ
イベントのプッシュに失敗した場合に使用される再試行ポリシーと、障害の処理に使用される方法を設定します。詳細については、「再試行ポリシーとデッドレターキュー」をご参照ください。
タスクリスト ページに戻り、作成したメッセージアウトフロータスクを見つけ、操作する 列の 有効化する をクリックします。
ヒント メッセージで、OK をクリックします。
タスクが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進捗状況を確認できます。
その他の操作
タスクリスト ページで、管理するメッセージアウトフロータスクを見つけ、操作する 列でその他の操作を実行します。
タスクの詳細を表示する: [アクション] 列の 詳細 をクリックします。[タスクの詳細] ページで、タスクの基本情報、プロパティ、および監視メトリクスを表示します。
タスクの設定を変更する: [アクション] 列の 編集する をクリックします。[タスクの編集] パネルで、タスクの詳細とプロパティを変更します。
タスクを有効または無効にする: [アクション] 列の 有効化する または 無効化 をクリックします。ヒント メッセージで、OK をクリックします。
タスクを削除する: [アクション] 列の 削除する をクリックします。ヒント メッセージで、OK をクリックします。