このトピックでは、ApsaraMQ for RabbitMQ から ApsaraMQ for Kafka にデータを同期するためのRabbitMQコネクタを作成する方法について説明します。
前提条件
EventBridgeがアクティブ化されており、必要な権限がResource Access Management (RAM) ユーザーに付与されていること。詳細については、「EventBridgeをアクティブ化し、RAMユーザーに権限を付与する」をご参照ください。
ApsaraMQ for RabbitMQリソースが作成されていること。詳細については、「リソースの作成」をご参照ください。
オブジェクトストレージサービス (OSS) がアクティブ化されており、バケットが作成されていること。詳細については、「OSSコンソールを使用して開始する」をご参照ください。
Serverless App Engine (SAE) がアクティブ化されていること。詳細については、「SAEを使い始める」をご参照ください。
仮想プライベートクラウド (VPC) とvSwitchが作成されていること。詳細については、「IPv4 CIDRブロックを持つVPCを作成する」をご参照ください。
ApsaraMQ for Kafka インスタンスが購入およびデプロイされていること。詳細については、「インスタンスの購入とデプロイ」をご参照ください。
ステップ 1:ApsaraMQ for RabbitMQリソースを作成する
ApsaraMQ for RabbitMQコンソール にログインし、インスタンスを作成します。詳細については、「インスタンスの作成」をご参照ください。
作成したインスタンスをクリックします。[インスタンスの詳細] ページで、次の手順を実行してリソースを作成します。
左側のナビゲーションペインで、[静的アカウント] をクリックします。表示されるページで、[ユーザー名/パスワードの作成] をクリックします。詳細については、「ステップ 2:リソースを作成する」をご参照ください。
ユーザー名とパスワードを作成したら、保存してください。
左側のナビゲーションペインで、[vhost] をクリックし、次に [vhostの作成] をクリックします。詳細については、「ステップ 2:リソースを作成する」をご参照ください。
左側のナビゲーションペインで、[キュー] をクリックします。表示されるページで、[vhost] の横にある [変更] をクリックします。次に、ドロップダウンリストから作成した vhost を選択し、[キューの作成] をクリックします。詳細については、「ステップ 2:リソースを作成する」をご参照ください。
ステップ 2:コネクタを作成する
RabbitMQコネクタ ファイルをダウンロードし、作成したOSSバケットにアップロードします。詳細については、「OSSコンソールを使用して開始する」をご参照ください。
ApsaraMQ for Kafka コンソール にログインします。リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションペインで、
を選択します。タスクリスト ページで、タスクリストの作成 をクリックします。
タスクの作成 ページで、[タスク名] パラメータを設定し、画面の指示に従って他のパラメータを設定します。次のセクションでは、パラメータについて説明します。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメータを [apache Kafka Connect] に設定し、次へ をクリックします。
コネクタの設定 ステップで、パラメータを設定し、次へ をクリックします。次の表にパラメータを示します。
サブセクション
パラメータ
説明
Kafka Connectプラグイン
[バケット]
RabbitMQコネクタファイルをアップロードしたOSSバケットを選択します。
[ファイル]
OSSバケットにアップロードしたRabbitMQコネクタファイルを選択します。
Message Queue for Apache Kafkaリソース
[message Queue For Apache Kafkaパラメータ]
[ソースコネクト] を選択します。
[message Queue For Apache Kafkaインスタンス]
このトピックの「前提条件」セクションで作成したApsaraMQ for Kafkaインスタンスを選択します。
[VPC]
デフォルトでは、ApsaraMQ for Kafkaインスタンスをデプロイしたときに選択したVPCが指定されています。このパラメータの値は変更できません。
[vswitch]
デフォルトでは、ApsaraMQ for Kafkaインスタンスをデプロイしたときに選択したvSwitch IDが指定されています。このパラメータの値は変更できません。
[セキュリティグループ]
セキュリティグループを選択します。
Kafka Connect
[ZIPパッケージ内の .properties ファイルを解析する]
[.properties ファイルを作成する] を選択します。次に、ソースコネクタの構成を含む .properties ファイルをZIPパッケージから選択します。パスは /etc/source-xxx.properties です。コードエディターで次のフィールドの値を更新します。
connector.class:RabbitMQコネクタファイルの名前。このフィールドにはデフォルト値を使用します。
tasks.max:タスクの最大数。
rabbitmq.host: ApsaraMQ for RabbitMQインスタンスのVPCエンドポイント。エンドポイントに関する情報は、[インスタンスの詳細] ページの [エンドポイント情報] セクションで取得できます。
rabbitmq.username:ユーザー名。ステップ 1:ApsaraMQ for RabbitMQリソースを作成する で ApsaraMQ for RabbitMQ インスタンスに作成したユーザー名を入力します。
rabbitmq.password:パスワード。ステップ 1:RabbitMQリソースを作成する で ApsaraMQ for RabbitMQ インスタンスに作成したパスワードを入力します。
rabbitmq.virtual.host:vhost。ステップ 1:ApsaraMQ for RabbitMQリソースを作成する で作成した vhost を入力します。
kafka.topic:データを同期する ApsaraMQ for Kafka トピック。データを同期する前に、トピックを作成する必要があります。
rabbitmq.queue:キュー。ステップ 1:ApsaraMQ for RabbitMQリソースを作成する で作成したキューを入力します。
サンプルコード:
connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector name=rabbitmq-source-connector # ApsaraMQ for RabbitMQインスタンスに接続するために使用されるVPCエンドポイント。 rabbitmq.host=xxx # ApsaraMQ for RabbitMQインスタンスに作成した静的パスワード。 rabbitmq.password=xxx # ApsaraMQ for RabbitMQインスタンスに作成した静的ユーザー名。 rabbitmq.username=xxx # ApsaraMQ for RabbitMQインスタンスに作成したvhost。 rabbitmq.virtual.host=xxx # ターゲットのApsaraMQ for Kafkaトピック。 kafka.topic=xxx # ApsaraMQ for RabbitMQインスタンスに作成したキュー。 rabbitmq.queue=xxx tasks.max=4
インスタンス設定 ステップで、パラメータを設定し、次へ をクリックします。次の表にパラメータを示します。
サブセクション
パラメータ
説明
ワーカータイプ
[ワーカータイプ]
ワーカータイプを選択します。
[ワーカーの最小数]
ワーカーの最小数を指定します。
[ワーカーの最大数]
ワーカーの最大数を指定します。このパラメータの値は、tasks.maxパラメータの値より大きくすることはできません。
[水平スケーリングのしきい値 (%)]
CPU使用率が指定された値よりも大きい場合、または小さい場合、自動スケーリングがトリガーされます。このパラメータは、[ワーカーの最小数] パラメータと [ワーカーの最大数] パラメータの値が異なる場合にのみ必須です。
ワーカーの構成
[apache Kafkaコネクタワーカーの依存関係を自動的に作成する]
このオプションを選択することをお勧めします。このオプションを選択すると、システムはKafka Connectを実行するために必要な内部トピックとコンシューマーグループを選択したApsaraMQ for Kafkaインスタンスに作成し、リソース情報をコードエディターに入力します。次の項目では、構成項目について説明します。
オフセットトピック:オフセットデータを格納するために使用されるトピック。トピックの名前は
connect-eb-offset-<タスク名>
形式です。構成トピック:コネクタとタスクの構成データを格納するために使用されるトピック。トピックの名前は
connect-eb-config-<タスク名>
形式です。ステータストピック:コネクタとタスクのステータスデータを格納するために使用されるトピック。トピックの名前は
connect-eb-status-<タスク名>
形式です。Kafka Connectコンシューマーグループ:Kafka Connectワーカーが内部トピックのメッセージを消費するために使用するコンシューマーグループ。コンシューマーグループの名前は
connect-eb-cluster-<タスク名>
形式です。Kafkaソースコネクタコンシューマーグループ:ソーストピックのデータを消費するために使用されるコンシューマーグループ。このコンシューマーグループは、RabbitMQシンクコネクタによってのみ使用できます。コンシューマーグループの名前は
connector-eb-cluster-<タスク名>-<コネクタ名>
形式です。
[実行構成] セクションで、[ログ配信] パラメータを [log Serviceにデータを送信する] または [apsaramq For Kafkaにデータを送信する] に設定し、[ロールの承認] サブセクションの [ロール] ドロップダウンリストからKafka Connectが依存するロールを選択し、[保存] をクリックします。
重要AliyunSAEFullAccess権限ポリシーがアタッチされているロールを選択することをお勧めします。そうしないと、タスクが実行に失敗する可能性があります。
タスクのプロパティ
タスクの再試行ポリシーと配信不能キューを設定します。詳細については、「再試行ポリシーと配信不能キュー」をご参照ください。
タスクのステータスが [実行中] になると、コネクタは想定どおりに動作を開始します。
ステップ 3:コネクタをテストする
ApsaraMQ for RabbitMQ コンソール にログインします。左側のナビゲーションペインで、インスタンスリスト をクリックします。
インスタンスリスト ページの上部ナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。次に、インスタンスリストで、管理するインスタンスの名前をクリックします。
左側のナビゲーションペインで、[キュー] をクリックします。管理するキューを見つけ、[アクション] 列の [詳細] をクリックします。
[キューの詳細] ページの [宛先としてバインド] タブで、[バインディングの追加] をクリックします。
[宛先としてバインド] パネルで、[ソース交換] パラメータに [amq.direct] を選択し、[OK] をクリックします。
[宛先としてバインド] タブで、[amq.direct] 交換を見つけ、[アクション] 列の [メッセージの送信] をクリックして、ターゲットの ApsaraMQ for Kafka トピックにメッセージを送信します。詳細については、「キューにメッセージを送信する」をご参照ください。
ApsaraMQ for Kafkaコンソール にログインします。[インスタンス] ページで、管理するインスタンスをクリックします。
表示されるページで、管理するトピックをクリックし、[メッセージクエリ] タブをクリックして、挿入されたメッセージデータを表示します。
一般的なエラーとトラブルシューティング
エラー 1:すべてのタスクが実行に失敗する
エラーメッセージ:
コネクタ mongo-source の下のすべてのタスクが失敗しました。タスクのエラートレースを確認してください。
解決策:[メッセージ流入タスクの詳細] ページで、[基本情報] セクションの [診断] をクリックして、[コネクタの監視ページ] に移動します。[コネクタの監視] ページで、タスクの失敗の詳細を確認できます。
エラー 2:Kafka Connectが予期せず終了する
エラーメッセージ:
Kafka connectが終了しました! saeアプリケーションのエラーログ /opt/kafka/logs/connect.log を確認して、kafka connectが終了した理由を特定し、有効な引数でイベントストリーミングを更新して解決してください。
解決策:Kafka Connectのステータス更新が遅延している可能性があります。ページを更新することをお勧めします。 Kafka Connectがまだ失敗する場合は、次の操作を実行して問題のトラブルシューティングを行うことができます。
[メッセージ流入タスクの詳細] ページの [ワーカー情報] セクションで、[SAEアプリケーション] の右側のインスタンス名をクリックして、[アプリケーションの詳細] ページに移動します。
[基本情報] ページで、[インスタンスデプロイ情報] タブをクリックします。
[アクション] 列の [webshell] をクリックして、Kafka Connectの実行環境にログインします。
vi /home/admin/connector-bootstrap.log
コマンドを実行して、コネクタの起動ログを表示し、ログにエラーメッセージが存在するかどうかを確認します。vi /opt/kafka/logs/connect.log
コマンドを実行して、コネクタの実行ログを表示し、ERROR フィールドまたは WARN フィールドにエラーメッセージが存在するかどうかを確認します。
エラーメッセージに基づいて問題のトラブルシューティングを行った後、対応するタスクを再起動できます。
エラー 3:コネクタパラメータの検証に失敗する
エラーメッセージ:
コネクタ xxx の起動または更新に失敗しました。エラーコード=400。エラーメッセージ=コネクタの構成が無効であり、次の 1 つのエラーが含まれています。
値は never、initial_only、when_needed、initial、schema_only、schema_only_recovery のいずれかである必要があります
エンドポイント `/connector-plugins/{connectorType}/config/validate` で上記のエラーリストを確認することもできます
解決策:エラーメッセージに基づいて値が無効なパラメータを見つけ、パラメータを更新します。エラーメッセージに基づいてパラメータが見つからない場合は、Kafka Connectの実行環境にログインし、次のコマンドを実行できます。 Kafka Connectの実行環境へのログイン方法については、このトピックのエラー 2 を参照してください。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
各コネクタパラメータの検証結果は、レスポンスで返されます。パラメータの値が無効な場合、パラメータの errors フィールドは空ではありません。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}