すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:ApsaraMQ for Kafka

最終更新日:Jul 24, 2024

このトピックでは、ApsaraMQ for RocketMQコンソールでメッセージインフロータスクを作成して、ApsaraMQ for KafkaからApsaraMQ for RocketMQにデータを同期する方法について説明します。

前提条件

  • ApsaraMQ for RocketMQインスタンスが購入され、デプロイされます。 インスタンスのステータスが [実行中] であることを確認します。 詳細については、「リソースの作成」をご参照ください。

  • ApsaraMQ for Kafkaインスタンスが購入され、デプロイされます。 インスタンスのステータスが [実行中] であることを確認します。 詳細については、「手順3: リソースの作成」をご参照ください。

メッセージ流入タスクの作成

  1. ApsaraMQ for RocketMQ コンソールにログインします。 左側のナビゲーションウィンドウで、メッセージ統合 > タスクリスト を選択します。

  2. 上部のナビゲーションバーで、中国 (杭州) などのリージョンを選択します。 タスクリスト ページで、[タスクの作成] をクリックします。

  3. タスクの作成 ページで、タスク名 および 説明 パラメーターを設定します。 次に、画面の指示に従って他のパラメーターを設定します。 次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [Message Queue for Apache Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。 [次のステップ] をクリックします。 下表に、各パラメーターを説明します。

        パラメーター

        説明

        リージョン

        ソースApsaraMQ for Kafkaインスタンスが存在するリージョン。

        中国 (北京)

        Apache Kafkaインスタンスのメッセージキュー

        ルーティングするメッセージが生成されるApsaraMQ for Kafkaインスタンス。

        MQ_INST_115964845466 ****_ByBeUp3p

        トピック

        ルーティングするメッセージが生成されるApsaraMQ for Kafkaインスタンスのトピック。

        topic

        グループID

        ソースインスタンスのコンシューマーグループの名前。 メッセージルーティングソースを作成するには、別のコンシューマグループを使用する必要があります。 ApsaraMQ for Kafkaと別の既存のメッセージングサービスに同じコンシューマーグループを使用しないでください。 そうしないと、既存のメッセージングサービスを使用してメッセージの送受信に失敗する可能性があります。

        GID_http_1

        消費者オフセット

        メッセージが消費されるオフセット。

        • 最新のオフセット

        • 最も早いオフセット

        最新のオフセット

        ネットワーク設定

        メッセージをルーティングするネットワークのタイプ。

        • 基本ネットワーク

        • インターネット

        基本ネットワーク

        [VPC]

        ApsaraMQ for Kafkaインスタンスがデプロイされている仮想プライベートクラウド (VPC) のID。 このパラメーターは、[ネットワーク設定] パラメーターを [インターネット] に設定した場合にのみ必要です。

        vpc-bp17fapfdj0dwzjkd ****

        vSwitch

        ApsaraMQ for Kafkaインスタンスが関連付けられているvSwitchのID。 このパラメーターは、[ネットワーク設定] パラメーターを [インターネット] に設定した場合にのみ必要です。

        vsw-bp1gbjhj53hdjdkg ****

        [セキュリティグループ]

        ApsaraMQ for Kafkaインスタンスが属するセキュリティグループのID。 このパラメーターは、[ネットワーク設定] パラメーターを [インターネット] に設定した場合にのみ必要です。

        alikafka_pre-cn-7mz2 ****

        データ形式 (Body)

        データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。 複数のデータ形式がサポートされています。 エンコードに関する特別な要件がない場合は、値としてJSONを指定します。

        • JSON: バイナリデータは、UTF-8エンコードに基づいてJSON形式のデータにエンコードされ、ペイロードに入れられます。 デフォルト値です。

        • Text: バイナリデータは、UTF-8エンコードに基づいて文字列にエンコードされ、ペイロードに入れられます。

        • バイナリ: バイナリデータは、Base64エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        ジェソン

        一括プッシュの件数

        各関数呼び出しで送信できるメッセージの最大数。 リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。 有効な値: 1 ~ 10000

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。 システムは、集計されたメッセージを指定された時間間隔でFunction Computeに送信します。 有効な値: 0 ~ 15。 単位は秒です。 値0は、メッセージが集約の直後に送信されることを示す。

        3

      2. Filtering (フィルタリング) ステップで、データをフィルタリングするデータパターンを定義します。 詳細については、「メッセージのフィルタリング」をご参照ください。

      3. Transform (変換) ステップで、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装するためのデータクレンジング方法を指定します。 詳細については、「Function Computeを使用したメッセージクレンジングの実行」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ パラメーターを [Message Queue for Apache RocketMQ] に設定し、画面上の指示に従って他のパラメーターを設定します。 下表に、各パラメーターを説明します。

        パラメーター

        説明

        バージョン

        メッセージをルーティングするApsaraMQ for RocketMQインスタンスのバージョン。 有効な値:

        • RocketMQ 4.x: ApsaraMQ for RocketMQ 4.x。

        • RocketMQ 5.x: ApsaraMQ for RocketMQ 5.x。

        RocketMQ 5.x

        インスタンス ID

        メッセージをルーティングするApsaraMQ for RocketMQインスタンス。

        rmq-cn-****

        Topic

        メッセージのルーティング先のApsaraMQ for RocketMQインスタンスに関するトピック。

        topic

        メッセージ本文

        • 完全なデータ

        • データ抽出

        • 固定値

        • テンプレート

        データ抽出

        $.data.body

        カスタムプロパティ (Properties)

        • 指定なし

        • データ抽出

        • テンプレート

        テンプレート

        パラメータ:

        {
          "userProperties":"$.data.userProperties",
          "msgId":"$.data.systemProperties.UNIQ_KEY"
        }

        テンプレート:

        {
          "EB_SYS_EMBED_OBJECT":"${userProperties}",
          "UNIQ_KEY":"${msgId}"
        }

        メッセージインデックス (Keys)

        • 指定なし

        • データ抽出

        • 固定値

        • テンプレート

        データ抽出

        $.data.systemProperties.KEYS

        メッセージタグ (Tags)

        • 指定なし

        • データ抽出

        • 固定値

        • テンプレート

        データ抽出

        $.data.systemProperties.TAGS
    • タスクプロパティ

      イベントのプッシュに失敗した場合に使用する再試行ポリシーと、障害の処理に使用するメソッドを設定します。 詳細については、「ポリシーと無効キューの再試行」をご参照ください。

  4. [保存] をクリックします。 [タスク] ページで、作成したタスクを見つけます。 [ステータス] 列のステータスが [開始] から [実行中] に変わると、タスクが作成されます。

その他の操作

タスクリスト ページで、操作する 列で管理およびその他の操作を実行するタスクを見つけます。

  • タスクの詳細を表示する: [操作] 列の 詳細 をクリックします。 [タスクの詳細] ページで、タスクの基本情報、プロパティ、およびモニタリングメトリックを表示します。

  • タスクの設定を変更する: [操作] 列の 編集する をクリックします。 [タスクの編集] パネルで、タスクの詳細とプロパティを変更します。

  • タスクの有効化または無効化: [操作] 列の 有効化する または 無効化 をクリックします。 ヒント メッセージで、OK をクリックします。

  • タスクを削除する: [操作] 列の 削除する をクリックします。 ヒント メッセージで、OK をクリックします。