すべてのプロダクト
Search
ドキュメントセンター

EventBridge:RabbitMQコネクタを使用してApsaraMQ for RabbitMQからApsaraMQ for Kafkaにデータを同期する

最終更新日:Mar 21, 2025

このトピックでは、ApsaraMQ for RabbitMQ から ApsaraMQ for Kafka にデータを同期するためのRabbitMQコネクタを作成する方法について説明します。

前提条件

ステップ 1:ApsaraMQ for RabbitMQリソースを作成する

  1. ApsaraMQ for RabbitMQコンソール にログインし、インスタンスを作成します。詳細については、「インスタンスの作成」をご参照ください。

  2. 作成したインスタンスをクリックします。[インスタンスの詳細] ページで、次の手順を実行してリソースを作成します。

    1. 左側のナビゲーションペインで、[静的アカウント] をクリックします。表示されるページで、[ユーザー名/パスワードの作成] をクリックします。詳細については、「ステップ 2:リソースを作成する」をご参照ください。

      ユーザー名とパスワードを作成したら、保存してください。image..png

    2. 左側のナビゲーションペインで、[vhost] をクリックし、次に [vhostの作成] をクリックします。詳細については、「ステップ 2:リソースを作成する」をご参照ください。

    3. 左側のナビゲーションペインで、[キュー] をクリックします。表示されるページで、[vhost] の横にある [変更] をクリックします。次に、ドロップダウンリストから作成した vhost を選択し、[キューの作成] をクリックします。詳細については、「ステップ 2:リソースを作成する」をご参照ください。

ステップ 2:コネクタを作成する

  1. RabbitMQコネクタ ファイルをダウンロードし、作成したOSSバケットにアップロードします。詳細については、「OSSコンソールを使用して開始する」をご参照ください。

  2. ApsaraMQ for Kafka コンソール にログインします。リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションペインで、[コネクタエコシステム統合] > タスクリスト を選択します。

  4. タスクリスト ページで、タスクリストの作成 をクリックします。

  5. タスクの作成 ページで、[タスク名] パラメータを設定し、画面の指示に従って他のパラメータを設定します。次のセクションでは、パラメータについて説明します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメータを [apache Kafka Connect] に設定し、次へ をクリックします。

      2. コネクタの設定 ステップで、パラメータを設定し、次へ をクリックします。次の表にパラメータを示します。

        サブセクション

        パラメータ

        説明

        Kafka Connectプラグイン

        [バケット]

        RabbitMQコネクタファイルをアップロードしたOSSバケットを選択します。

        [ファイル]

        OSSバケットにアップロードしたRabbitMQコネクタファイルを選択します。

        Message Queue for Apache Kafkaリソース

        [message Queue For Apache Kafkaパラメータ]

        [ソースコネクト] を選択します。

        [message Queue For Apache Kafkaインスタンス]

        このトピックの「前提条件」セクションで作成したApsaraMQ for Kafkaインスタンスを選択します。

        [VPC]

        デフォルトでは、ApsaraMQ for Kafkaインスタンスをデプロイしたときに選択したVPCが指定されています。このパラメータの値は変更できません。

        [vswitch]

        デフォルトでは、ApsaraMQ for Kafkaインスタンスをデプロイしたときに選択したvSwitch IDが指定されています。このパラメータの値は変更できません。

        [セキュリティグループ]

        セキュリティグループを選択します。

        Kafka Connect

        [ZIPパッケージ内の .properties ファイルを解析する]

        [.properties ファイルを作成する] を選択します。次に、ソースコネクタの構成を含む .properties ファイルをZIPパッケージから選択します。パスは /etc/source-xxx.properties です。コードエディターで次のフィールドの値を更新します。

        サンプルコード:

        connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector
        name=rabbitmq-source-connector
        # ApsaraMQ for RabbitMQインスタンスに接続するために使用されるVPCエンドポイント。
        rabbitmq.host=xxx
        # ApsaraMQ for RabbitMQインスタンスに作成した静的パスワード。
        rabbitmq.password=xxx
        # ApsaraMQ for RabbitMQインスタンスに作成した静的ユーザー名。
        rabbitmq.username=xxx
        # ApsaraMQ for RabbitMQインスタンスに作成したvhost。
        rabbitmq.virtual.host=xxx
        # ターゲットのApsaraMQ for Kafkaトピック。
        kafka.topic=xxx
        # ApsaraMQ for RabbitMQインスタンスに作成したキュー。
        rabbitmq.queue=xxx
        tasks.max=4
      3. インスタンス設定 ステップで、パラメータを設定し、次へ をクリックします。次の表にパラメータを示します。

        サブセクション

        パラメータ

        説明

        ワーカータイプ

        [ワーカータイプ]

        ワーカータイプを選択します。

        [ワーカーの最小数]

        ワーカーの最小数を指定します。

        [ワーカーの最大数]

        ワーカーの最大数を指定します。このパラメータの値は、tasks.maxパラメータの値より大きくすることはできません。

        [水平スケーリングのしきい値 (%)]

        CPU使用率が指定された値よりも大きい場合、または小さい場合、自動スケーリングがトリガーされます。このパラメータは、[ワーカーの最小数] パラメータと [ワーカーの最大数] パラメータの値が異なる場合にのみ必須です。

        ワーカーの構成

        [apache Kafkaコネクタワーカーの依存関係を自動的に作成する]

        このオプションを選択することをお勧めします。このオプションを選択すると、システムはKafka Connectを実行するために必要な内部トピックとコンシューマーグループを選択したApsaraMQ for Kafkaインスタンスに作成し、リソース情報をコードエディターに入力します。次の項目では、構成項目について説明します。

        • オフセットトピック:オフセットデータを格納するために使用されるトピック。トピックの名前は connect-eb-offset-<タスク名> 形式です。

        • 構成トピック:コネクタとタスクの構成データを格納するために使用されるトピック。トピックの名前は connect-eb-config-<タスク名> 形式です。

        • ステータストピック:コネクタとタスクのステータスデータを格納するために使用されるトピック。トピックの名前は connect-eb-status-<タスク名> 形式です。

        • Kafka Connectコンシューマーグループ:Kafka Connectワーカーが内部トピックのメッセージを消費するために使用するコンシューマーグループ。コンシューマーグループの名前は connect-eb-cluster-<タスク名> 形式です。

        • Kafkaソースコネクタコンシューマーグループ:ソーストピックのデータを消費するために使用されるコンシューマーグループ。このコンシューマーグループは、RabbitMQシンクコネクタによってのみ使用できます。コンシューマーグループの名前は connector-eb-cluster-<タスク名>-<コネクタ名> 形式です。

      4. [実行構成] セクションで、[ログ配信] パラメータを [log Serviceにデータを送信する] または [apsaramq For Kafkaにデータを送信する] に設定し、[ロールの承認] サブセクションの [ロール] ドロップダウンリストからKafka Connectが依存するロールを選択し、[保存] をクリックします。

        重要

        AliyunSAEFullAccess権限ポリシーがアタッチされているロールを選択することをお勧めします。そうしないと、タスクが実行に失敗する可能性があります。

    • タスクのプロパティ

      タスクの再試行ポリシーと配信不能キューを設定します。詳細については、「再試行ポリシーと配信不能キュー」をご参照ください。

    タスクのステータスが [実行中] になると、コネクタは想定どおりに動作を開始します。

ステップ 3:コネクタをテストする

  1. ApsaraMQ for RabbitMQ コンソール にログインします。左側のナビゲーションペインで、インスタンスリスト をクリックします。

  2. インスタンスリスト ページの上部ナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。次に、インスタンスリストで、管理するインスタンスの名前をクリックします。

  3. 左側のナビゲーションペインで、[キュー] をクリックします。管理するキューを見つけ、[アクション] 列の [詳細] をクリックします。

  4. [キューの詳細] ページの [宛先としてバインド] タブで、[バインディングの追加] をクリックします。

  5. [宛先としてバインド] パネルで、[ソース交換] パラメータに [amq.direct] を選択し、[OK] をクリックします。

  6. [宛先としてバインド] タブで、[amq.direct] 交換を見つけ、[アクション] 列の [メッセージの送信] をクリックして、ターゲットの ApsaraMQ for Kafka トピックにメッセージを送信します。詳細については、「キューにメッセージを送信する」をご参照ください。image..png

  7. ApsaraMQ for Kafkaコンソール にログインします。[インスタンス] ページで、管理するインスタンスをクリックします。

  8. 表示されるページで、管理するトピックをクリックし、[メッセージクエリ] タブをクリックして、挿入されたメッセージデータを表示します。image..png

一般的なエラーとトラブルシューティング

エラー 1:すべてのタスクが実行に失敗する

エラーメッセージ:

コネクタ mongo-source の下のすべてのタスクが失敗しました。タスクのエラートレースを確認してください。

解決策:[メッセージ流入タスクの詳細] ページで、[基本情報] セクションの [診断] をクリックして、[コネクタの監視ページ] に移動します。[コネクタの監視] ページで、タスクの失敗の詳細を確認できます。

エラー 2:Kafka Connectが予期せず終了する

エラーメッセージ:

Kafka connectが終了しました! saeアプリケーションのエラーログ /opt/kafka/logs/connect.log を確認して、kafka connectが終了した理由を特定し、有効な引数でイベントストリーミングを更新して解決してください。

解決策:Kafka Connectのステータス更新が遅延している可能性があります。ページを更新することをお勧めします。 Kafka Connectがまだ失敗する場合は、次の操作を実行して問題のトラブルシューティングを行うことができます。

  1. [メッセージ流入タスクの詳細] ページの [ワーカー情報] セクションで、[SAEアプリケーション] の右側のインスタンス名をクリックして、[アプリケーションの詳細] ページに移動します。

  2. [基本情報] ページで、[インスタンスデプロイ情報] タブをクリックします。

  3. [アクション] 列の [webshell] をクリックして、Kafka Connectの実行環境にログインします。实例部署信息

    • vi /home/admin/connector-bootstrap.log コマンドを実行して、コネクタの起動ログを表示し、ログにエラーメッセージが存在するかどうかを確認します。

    • vi /opt/kafka/logs/connect.log コマンドを実行して、コネクタの実行ログを表示し、ERROR フィールドまたは WARN フィールドにエラーメッセージが存在するかどうかを確認します。

エラーメッセージに基づいて問題のトラブルシューティングを行った後、対応するタスクを再起動できます。

エラー 3:コネクタパラメータの検証に失敗する

エラーメッセージ:

コネクタ xxx の起動または更新に失敗しました。エラーコード=400。エラーメッセージ=コネクタの構成が無効であり、次の 1 つのエラーが含まれています。
値は never、initial_only、when_needed、initial、schema_only、schema_only_recovery のいずれかである必要があります
エンドポイント `/connector-plugins/{connectorType}/config/validate` で上記のエラーリストを確認することもできます

解決策:エラーメッセージに基づいて値が無効なパラメータを見つけ、パラメータを更新します。エラーメッセージに基づいてパラメータが見つからない場合は、Kafka Connectの実行環境にログインし、次のコマンドを実行できます。 Kafka Connectの実行環境へのログイン方法については、このトピックのエラー 2 を参照してください。

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

各コネクタパラメータの検証結果は、レスポンスで返されます。パラメータの値が無効な場合、パラメータの errors フィールドは空ではありません。

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}