このトピックでは、ApsaraMQ for RocketMQコンソールでメッセージインフロータスクを作成して、ApsaraMQ for KafkaからApsaraMQ for RocketMQにデータを同期する方法について説明します。
前提条件
ApsaraMQ for RocketMQインスタンスが購入され、デプロイされます。 インスタンスのステータスが [実行中] であることを確認します。 詳細については、「リソースの作成」をご参照ください。
ApsaraMQ for Kafkaインスタンスが購入され、デプロイされます。 インスタンスのステータスが [実行中] であることを確認します。 詳細については、「手順3: リソースの作成」をご参照ください。
メッセージ流入タスクの作成
ApsaraMQ for RocketMQ コンソールにログインします。 左側のナビゲーションウィンドウで、 を選択します。
上部のナビゲーションバーで、中国 (杭州) などのリージョンを選択します。 タスクリスト ページで、[タスクの作成] をクリックします。
タスクの作成 ページで、タスク名 および 説明 パラメーターを設定します。 次に、画面の指示に従って他のパラメーターを設定します。 次のセクションでは、パラメーターについて説明します。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメーターを [Message Queue for Apache Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。 [次のステップ] をクリックします。 下表に、各パラメーターを説明します。
パラメーター
説明
例
リージョン
ソースApsaraMQ for Kafkaインスタンスが存在するリージョン。
中国 (北京)
Apache Kafkaインスタンスのメッセージキュー
ルーティングするメッセージが生成されるApsaraMQ for Kafkaインスタンス。
MQ_INST_115964845466 ****_ByBeUp3p
トピック
ルーティングするメッセージが生成されるApsaraMQ for Kafkaインスタンスのトピック。
topic
グループID
ソースインスタンスのコンシューマーグループの名前。 メッセージルーティングソースを作成するには、別のコンシューマグループを使用する必要があります。 ApsaraMQ for Kafkaと別の既存のメッセージングサービスに同じコンシューマーグループを使用しないでください。 そうしないと、既存のメッセージングサービスを使用してメッセージの送受信に失敗する可能性があります。
GID_http_1
消費者オフセット
メッセージが消費されるオフセット。
最新のオフセット
最も早いオフセット
最新のオフセット
ネットワーク設定
メッセージをルーティングするネットワークのタイプ。
基本ネットワーク
インターネット
基本ネットワーク
[VPC]
ApsaraMQ for Kafkaインスタンスがデプロイされている仮想プライベートクラウド (VPC) のID。 このパラメーターは、[ネットワーク設定] パラメーターを [インターネット] に設定した場合にのみ必要です。
vpc-bp17fapfdj0dwzjkd ****
vSwitch
ApsaraMQ for Kafkaインスタンスが関連付けられているvSwitchのID。 このパラメーターは、[ネットワーク設定] パラメーターを [インターネット] に設定した場合にのみ必要です。
vsw-bp1gbjhj53hdjdkg ****
[セキュリティグループ]
ApsaraMQ for Kafkaインスタンスが属するセキュリティグループのID。 このパラメーターは、[ネットワーク設定] パラメーターを [インターネット] に設定した場合にのみ必要です。
alikafka_pre-cn-7mz2 ****
データ形式 (Body)
データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。 複数のデータ形式がサポートされています。 エンコードに関する特別な要件がない場合は、値としてJSONを指定します。
JSON: バイナリデータは、UTF-8エンコードに基づいてJSON形式のデータにエンコードされ、ペイロードに入れられます。 デフォルト値です。
Text: バイナリデータは、UTF-8エンコードに基づいて文字列にエンコードされ、ペイロードに入れられます。
バイナリ: バイナリデータは、Base64エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
ジェソン
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。 リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。 有効な値: 1 ~ 10000
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。 システムは、集計されたメッセージを指定された時間間隔でFunction Computeに送信します。 有効な値: 0 ~ 15。 単位は秒です。 値0は、メッセージが集約の直後に送信されることを示す。
3
Filtering (フィルタリング) ステップで、データをフィルタリングするデータパターンを定義します。 詳細については、「メッセージのフィルタリング」をご参照ください。
Transform (変換) ステップで、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装するためのデータクレンジング方法を指定します。 詳細については、「Function Computeを使用したメッセージクレンジングの実行」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ パラメーターを [Message Queue for Apache RocketMQ] に設定し、画面上の指示に従って他のパラメーターを設定します。 下表に、各パラメーターを説明します。
パラメーター
説明
例
バージョン
メッセージをルーティングするApsaraMQ for RocketMQインスタンスのバージョン。 有効な値:
RocketMQ 4.x: ApsaraMQ for RocketMQ 4.x。
RocketMQ 5.x: ApsaraMQ for RocketMQ 5.x。
RocketMQ 5.x
インスタンス ID
メッセージをルーティングするApsaraMQ for RocketMQインスタンス。
rmq-cn-****
Topic
メッセージのルーティング先のApsaraMQ for RocketMQインスタンスに関するトピック。
topic
メッセージ本文
完全なデータ
データ抽出
固定値
テンプレート
データ抽出
$.data.body
カスタムプロパティ (Properties)
指定なし
データ抽出
テンプレート
テンプレート
パラメータ:
{ "userProperties":"$.data.userProperties", "msgId":"$.data.systemProperties.UNIQ_KEY" }
テンプレート:
{ "EB_SYS_EMBED_OBJECT":"${userProperties}", "UNIQ_KEY":"${msgId}" }
メッセージインデックス (Keys)
指定なし
データ抽出
固定値
テンプレート
データ抽出
$.data.systemProperties.KEYS
メッセージタグ (Tags)
指定なし
データ抽出
固定値
テンプレート
データ抽出
$.data.systemProperties.TAGS
タスクプロパティ
イベントのプッシュに失敗した場合に使用する再試行ポリシーと、障害の処理に使用するメソッドを設定します。 詳細については、「ポリシーと無効キューの再試行」をご参照ください。
[保存] をクリックします。 [タスク] ページで、作成したタスクを見つけます。 [ステータス] 列のステータスが [開始] から [実行中] に変わると、タスクが作成されます。
その他の操作
タスクリスト ページで、操作する 列で管理およびその他の操作を実行するタスクを見つけます。
タスクの詳細を表示する: [操作] 列の 詳細 をクリックします。 [タスクの詳細] ページで、タスクの基本情報、プロパティ、およびモニタリングメトリックを表示します。
タスクの設定を変更する: [操作] 列の 編集する をクリックします。 [タスクの編集] パネルで、タスクの詳細とプロパティを変更します。
タスクの有効化または無効化: [操作] 列の 有効化する または 無効化 をクリックします。 ヒント メッセージで、OK をクリックします。
タスクを削除する: [操作] 列の 削除する をクリックします。 ヒント メッセージで、OK をクリックします。