すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:ApsaraMQ for RocketMQ ソースコネクタの作成

最終更新日:Mar 07, 2025

このトピックでは、ApsaraMQ for Kafka コンソールでソースコネクタを作成し、ApsaraMQ for RocketMQ から ApsaraMQ for Kafka にデータを同期する方法について説明します。

前提条件

ソースコネクタを作成する

  1. ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、[コネクタエコシステム統合] > [タスク] を選択します。

  3. [タスク] ページで、[タスクの作成] をクリックします。

  4. タスクの作成 ページで、Task Name パラメーターと Description パラメーターを構成し、画面の指示に従って他のパラメーターを構成します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Rocketmq] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。

        パラメーター

        説明

        リージョン

        ApsaraMQ for RocketMQ インスタンスが存在するリージョン。

        中国 (杭州)

        バージョン

        ApsaraMQ for RocketMQ インスタンスのバージョン。このインスタンスにメッセージをルーティングします。有効な値:

        • RocketMQ 4.x: ApsaraMQ for RocketMQ 4.x。

        • RocketMQ 5.x: ApsaraMQ for RocketMQ 5.x。

        RocketMQ 5.x

        RocketMQ インスタンス

        ルーティングするメッセージが生成される ApsaraMQ for RocketMQ インスタンス。

        rmq-cn-****

        Topic

        ルーティングするメッセージが生成される ApsaraMQ for RocketMQ インスタンス上の Topic。

        topic

        Tag

        ApsaraMQ for RocketMQ インスタンス内のメッセージをフィルタリングするために使用されるタグ。

        test_tag

        グループ ID

        ApsaraMQ for RocketMQ インスタンス上のコンシューマーグループの名前。有効な値:

        • クイック作成: システムは GID_EVENTBRIDGE_xxx 形式で名前が付けられたコンシューマーグループを自動的に作成します。この値を選択することをお勧めします。

        • 既存のグループを使用: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの送受信に影響します。

        クイック作成

        コンシューマーオフセット

        • 最新のオフセット: メッセージは最新のオフセットから消費されます。

        • 最も古いオフセット: メッセージは最も古いオフセットから消費されます。

        • タイムスタンプ: メッセージは特定のタイムスタンプから消費されます。

        最新のオフセット

        消費日時

        メッセージの消費を開始する日時。コンシューマーオフセット パラメーターを タイムスタンプ に設定した場合にのみ、このパラメーターが必要です。

        2024-06-18 15:28:29

        データ形式

        データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコーディングに特別な要件がない場合は、値として Json を指定します。

        • Json: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。これはデフォルト値です。

        • テキスト: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        • バイナリ: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        Json

        一括プッシュの件数

        各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。

        100

        バッチプッシュ間隔 (単位:秒)

        関数を呼び出す時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。

        3

      2. Filtering (フィルタリング) ステップで、パターン内容 コードエディタでデータパターンを定義してデータをフィルタリングします。詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) ステップで、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。

        パラメーター

        説明

        インスタンス ID

        作成した ApsaraMQ for Kafka インスタンスの ID。

        test

        Topic

        作成した ApsaraMQ for Kafka インスタンス上の Topic。

        test

        確認モード (ACK)

        メッセージを受信した後、ApsaraMQ for Kafka インスタンスがクライアントに確認応答 (ACK) を送信するモード。有効な値:

        • なし

        • リーダーのみ

        • すべて

        なし

        メッセージ本文 (値)

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをターゲットにルーティングします。

        • 完全なデータ

        • データ抽出

        • 固定値

        • テンプレート

        データ抽出

        $.data.value

        メッセージキー

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをターゲットにルーティングします。

        • データ抽出

        • 固定値

        • テンプレート

        データ抽出

        $.data.key
    • タスクのプロパティ

      イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーを処理するために使用するメソッドを構成します。詳細については、「リトライポリシーとデッドレターキュー」をご参照ください。

  5. タスクリスト ページに戻り、作成した ApsaraMQ for RocketMQ ソースコネクタを見つけ、操作する 列の 有効化する をクリックします。

  6. ヒント メッセージで、OK をクリックします。

    ソースコネクタが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進捗状況を確認できます。

その他の操作

タスクリスト ページで、管理するソースコネクタを見つけ、操作する 列でその他の操作を実行します。

  • コネクタの詳細を表示する: [操作] 列の 詳細 をクリックします。[タスクの詳細] ページで、コネクタの基本情報、プロパティ、およびモニタリングメトリックを表示します。

  • コネクタの構成を変更する: [操作] 列の 編集する をクリックします。[タスクの編集] パネルで、コネクタの詳細とプロパティを変更します。

  • コネクタを有効または無効にする: [操作] 列の 有効化する または 無効化 をクリックします。ヒント メッセージで、OK をクリックします。

  • コネクタを削除する: [操作] 列の 削除する をクリックします。ヒント メッセージで、OK をクリックします。