すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:JDBCコネクタを使用してApsaraDB RDS for MySQLとApsaraMQ for Kafkaの間でデータを同期する

最終更新日:Apr 16, 2025

このトピックでは、ApsaraMQ for Kafka と ApsaraDB RDS for MySQL の間でデータを同期するための JDBC コネクタを作成する方法について説明します。

前提条件

手順 1: テーブルを作成する

  1. ApsaraDB RDSコンソール にログインして、ApsaraDB RDS for MySQLインスタンスを作成します。詳細については、「ApsaraDB RDS for MySQLインスタンスを作成する」をご参照ください。

    ApsaraDB RDS for MySQL インスタンスを作成する際は、このトピックの「前提条件」セクションで作成した ApsaraMQ for Kafka インスタンスがデプロイされている VPC を選択し、VPC の CIDR ブロックをホワイトリストに追加します。加入白名单

  2. [インスタンス] ページで、作成した ApsaraDB RDS for MySQL インスタンスをクリックします。次に、インスタンス詳細ページの左側のナビゲーションペインで、以下の操作を実行します。

    1. [アカウント] をクリックします。表示されるページで、[アカウントの作成] をクリックしてアカウントを作成します。詳細については、「データベースとアカウントを作成する」をご参照ください。既存のアカウントを使用することもできます。

    2. [データベース] をクリックします。表示されるページで、[データベースの作成] をクリックしてデータベースを作成します。詳細については、「データベースとアカウントを作成する」をご参照ください。既存のデータベースを使用することもできます。

    3. [データベース接続] をクリックして、内部エンドポイントとポート番号を表示し、記録します。

      内网地址

  3. インスタンス詳細ページで、[データベースにログオン] をクリックして、Data Management (DMS) コンソールに移動します。次に、管理するデータベースをクリックし、SQL ステートメントを使用してテーブルを作成します。次のサンプルコマンドは、列名が idnumber であるテーブルを作成する方法の例を示しています。詳細については、「SQLコマンド」をご参照ください。

    CREATE TABLE sql_table(id INT ,number INT);
    重要

    テーブルを作成する際は、いずれかの列をプライマリキーとして指定し、その列をインクリメンタルとして指定します。詳細については、「スキーマのクエリと変更」をご参照ください。

手順 2: コネクタを作成する

ソースコネクタ

  1. JDBCコネクタ ファイルをダウンロードし、作成した OSS バケットにアップロードします。詳細については、「OSSコンソールを使用して始める」をご参照ください。

    重要

    JDBC コネクタファイルをダウンロードする際は、Java 8 と互換性のあるバージョンを選択してください。

  2. ApsaraMQ for Kafka コンソール にログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションペインで、[コネクタエコシステム統合] > タスクリスト を選択します。

  4. タスクリスト ページで、タスクリストの作成 をクリックします。

  5. タスクの作成 ページで、[タスク名] パラメータを設定し、画面上の指示に従って他のパラメータを設定します。次のセクションでは、パラメータについて説明します。

    • タスクの作成

      1. Source (ソース) 手順で、データプロバイダー パラメータを [apache Kafka Connect] に設定し、次へ をクリックします。

      2. コネクタの設定 手順で、パラメータを設定し、次へ をクリックします。次の表にパラメータを示します。

        サブセクション

        パラメータ

        説明

        Kafka Connectプラグイン

        バケット

        JDBCコネクタファイルをアップロードした OSS バケットを選択します。

        ファイル

        OSS バケットにアップロードした JDBC コネクタファイルを選択します。

        Message Queue for Apache Kafkaリソース

        Message Queue for Apache Kafkaパラメータ

        [ソースコネクト] を選択します。

        Message Queue for Apache Kafkaインスタンス

        このトピックの「前提条件」セクションで作成した ApsaraMQ for Kafka インスタンスを選択します。

        VPC

        作成した VPC を選択します。

        vSwitch

        作成した vSwitch を選択します。

        セキュリティグループ

        セキュリティグループを選択します。

        Kafka Connect

        ZIPパッケージ内の .properties ファイルを解析する

        [.properties ファイルを作成] を選択します。次に、ソースコネクタの構成を含む .properties ファイルを ZIP パッケージから選択します。 .properties ファイルのパスは /etc/source-xxx.properties です。

        コードエディタで関連フィールドの値を変更します。 展開してフィールドの説明を表示

        フィールド

        説明

        connector.class

        JDBCコネクタファイルの名前。このフィールドにはデフォルト値を使用します。

        tasks.max

        タスクの最大数。このパラメータの値は、「手順 1: テーブルを作成する」で作成されたテーブルの数より大きくすることはできません。

        connection.url

        データベースへの接続に使用する URL。URL の形式は、データベースの種類によって異なります。この例では、ApsaraDB RDS for MySQL インスタンスが作成されます。したがって、このフィールドを設定する際は、「手順 1: テーブルを作成する」で取得した内部エンドポイントとポート番号を、ホストとポートのフィールドに入力します。

        • MySQL: jdbc:mysql://<host>:<port>/<database>

        • PostgreSQL: jdbc:postgresql://<host>:<port>/<database>

        • IBM DB2: jdbc:db2://<host>:<port>/<database>

        • IBM Informix: jdbc:informix-sqli://<ip>:<port>/<database>:informixserver=<debservername>

        • MS SQL: jdbc:sqlserver://<host>[:<port>];databaseName=<database>

        • Oracle: jdbc:oracle:thin://<host>:<port>/<service> または jdbc:oracle:thin:<host>:<port>:<SID>

        incrementing.column.name

        [手順 1: テーブルを作成する] で設定した増分列の名前を入力します。

        topic.prefix

        デスティネーション トピックのプレフィックス。デスティネーション トピックの名前は、<topic.prefix><tableName> 形式です。データを同期する前に、上記の形式に基づいてトピックを作成する必要があります。

        connection.user

        データベースにログオンするために使用するユーザー名です。

        connection.password

        データベースにログオンするために使用するパスワード。

        table.whitelist

        データベース テーブルです。複数のデータベース テーブルはカンマ (,) で区切ります。

        サンプルコードを表示するには展開します

        name=test-source-mysql
        connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
        
        # タスクの最大数を指定するために、このパラメーターを使用できます。タスクは SAE インスタンス全体に均等に分散されます。
        # tasks.max パラメーターの値は、ソーステーブルの数を超えることはできません。
        tasks.max=1
        
        # データベースへの接続に使用される URL。URL の形式は、データベースの種類によって異なります。
        # MySQL: jdbc:mysql://<host>:<port>/<database>
        # PostgreSQL: jdbc:postgresql://<host>:<port>/<database>
        # IBM DB2: jdbc:db2://<host>:<port>/<database>
        # IBM Informix: jdbc:informix-sqli://<ip>:<port>/<database>:informixserver=<debservername>
        # MS SQL: jdbc:sqlserver://<host>[:<port>];databaseName=<database>
        # Oracle: jdbc:oracle:thin://<host>:<port>/<service> or jdbc:oracle:thin:<host>:<port>:<SID>
        connection.url=jdbc:mysql://rm-******.mysql.rds.aliyuncs.com:3306/test_database
        
        # テーブルのモードを更新します。incrementing 値は、各テーブルの新しい行をチェックするために増分カラムが使用されることを指定します。
        # 注: 既存のカラムの変更または削除はチェックされません。
        mode=incrementing
        
        # 増分カラムの名前。
        incrementing.column.name=id
        # 宛先トピックの名前は、${topic.prefix}${tableName} 形式です。
        # この例では、事前に mysql-topic-source_table という名前のトピックを作成する必要があります。そうしないと、ランタイムエラーが報告されます。
        topic.prefix=mysql-topic-
        
        # データベースへのアクセスに使用されるアカウント。
        connection.user=root
        # データベースへのアクセスに使用されるパスワード。
        connection.password=123456
        
        # ソーステーブルを指定します。
        table.whitelist=source_table
        
        value.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter.schemas.enable=true
        
        key.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=true

        JDBC ソースコネクタの作成に使用されるすべてのパラメーターについては、JDBC ソースコネクタ構成プロパティ をご参照ください。

      3. インスタンス設定 ステップで、パラメーターを設定し、次へ をクリックします。次の表にパラメーターを示します。

        サブセクション

        パラメーター

        説明

        ワーカータイプ

        ワーカータイプ

        ワーカータイプを選択します。

        ワーカーの最小数

        このパラメーターを 1 に設定します。

        ワーカーの最大数

        このパラメーターを 1 に設定します。

        ワーカー設定

        Apache Kafka Connector ワーカーの依存関係を自動的に作成する

        このオプションを選択することをお勧めします。このオプションを選択すると、システムは、選択した ApsaraMQ for Kafka インスタンスで Kafka Connect を実行するために必要な内部トピックとコンシューマーグループを作成し、コードエディターの対応するパラメーターに情報を同期します。次のセクションでは、設定項目について説明します。

        • オフセットトピック: オフセットデータを格納するために使用されるトピック。トピックの名前は connect-eb-offset-<Task name> 形式です。

        • 設定トピック: コネクタとタスクの設定データを格納するために使用されるトピック。トピックの名前は connect-eb-config-<Task name> 形式です。

        • ステータストピック: コネクタとタスクのステータスデータを格納するために使用されるトピック。トピックの名前は connect-eb-status-<Task name> 形式です。

        • Kafka Connect コンシューマーグループ: Kafka Connect ワーカーが内部トピックのメッセージを消費するために使用するコンシューマーグループ。コンシューマーグループの名前は connect-eb-cluster-<Task name> 形式です。

        • Kafka ソースコネクタコンシューマーグループ: ソース トピックのデータを消費するために使用されるコンシューマーグループ。このコンシューマーグループは、シンクコネクタに対してのみ有効です。コンシューマーグループの名前は connector-eb-cluster-<task name>-<connector name> 形式です。

      4. 実行設定 セクションで、[ログ配信] パラメーターを [ログサービスにデータを送信] または [apsaramq For Kafka にデータを送信] に設定し、[ロールの承認] サブセクションの [ロール] ドロップダウンリストから Kafka Connect が依存するロールを選択し、[保存] をクリックします。

        重要

        AliyunSAEFullAccess 権限ポリシーがアタッチされているロールを選択することをお勧めします。そうでない場合、タスクの実行に失敗する可能性があります。

    • タスクのプロパティ

      タスクの再試行ポリシーとデッドレターキューを設定します。詳細については、「再試行ポリシーとデッドレターキュー」をご参照ください。

    タスクの状態が [実行中] になると、コネクタは予期したとおりに動作を開始します。

シンク コネクタ

  1. JDBCコネクタ ファイルをダウンロードし、作成したOSSバケットにアップロードします。詳細については、「OSSコンソールを使用して開始する」をご参照ください。

    重要

    JDBCコネクタファイルをダウンロードする際は、Java 8と互換性のあるバージョンを選択してください。

  2. ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理するApsaraMQ for Kafkaインスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションペインで、[コネクタエコシステム統合] > タスクリスト を選択します。

  4. タスクリスト ページで、タスクリストの作成 をクリックします。

  5. タスクの作成 ページで、[タスク名] パラメータを設定し、画面の指示に従って他のパラメータを設定します。次のセクションでは、パラメータについて説明します。

    • タスクの作成

      1. Source (ソース) 手順で、データプロバイダー パラメータを [apache Kafka Connect] に設定し、次へ をクリックします。

      2. コネクタの設定 手順で、パラメータを設定し、次へ をクリックします。次の表にパラメータを示します。

        サブセクション

        パラメータ

        説明

        Kafka Connectプラグイン

        バケット

        JDBCコネクタファイルをアップロードしたOSSバケットを選択します。

        ファイル

        OSSバケットにアップロードしたJDBCコネクタファイルを選択します。

        Message Queue for Apache Kafkaリソース

        Message Queue for Apache Kafkaパラメータ

        Sink Connectを選択します。

        Message Queue for Apache Kafkaインスタンス

        このトピックの「前提条件」セクションで作成したApsaraMQ for Kafkaインスタンスを選択します。

        VPC

        作成したVPCを選択します。

        vSwitch

        作成したvSwitchを選択します。

        セキュリティグループ

        セキュリティグループを選択します。

        Kafka Connect

        ZIPパッケージ内の.propertiesファイルを解析する

        [.propertiesファイルの作成] を選択します。次に、シンクコネクタの構成を含む.propertiesファイルをZIPパッケージから選択します。 .propertiesファイルのパスは /etc/sink-xxx.properties です。

        コードエディタで関連フィールドの値を変更します。 展開してフィールドの説明を表示

        フィールド

        説明

        connector.class

        コネクタパッケージの名前。このフィールドにはデフォルト値を使用します。

        tasks.max

        タスクの最大数。

        topics

        ソーストピックの名前。

        connection.url

        データベースへの接続に使用するURL。 URLの形式は、データベースの種類によって異なります。この例では、ApsaraDB RDS for MySQLインスタンスが作成されます。したがって、このフィールドを設定する場合は、手順 1:テーブルを作成する で取得した内部エンドポイントとポート番号をホストとポートのフィールドに入力します。

        • MySQL: jdbc:mysql://<host>:<port>/<database>

        • PostgreSQL: jdbc:postgresql://<host>:<port>/<database>

        • IBM DB2: jdbc:db2://<host>:<port>/<database>

        • IBM Informix: jdbc:informix-sqli://<ip>:<port>/<database>:informixserver=<debservername>

        • MS SQL: jdbc:sqlserver://<host>[:<port>];databaseName=<database>

        • Oracle: jdbc:oracle:thin://<host>:<port>/<service> or jdbc:oracle:thin:<host>:<port>:<SID>

        pk.fields

        プライマリキーの名前。複数のプライマリキーはコンマ (,) で区切ります。

        connection.user

        データベースにログオンするために使用するユーザー名。

        connection.password

        データベースにログオンするために使用するパスワード。

        table.name.format

        デスティネーショントテーブルの名前。

        展開してサンプルコードを表示

        name=test-sink-mysql
        connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
        
        // タスクの最大数を指定するために、このパラメータを使用できます。タスクはSAEインスタンス全体に均等に分散されます。
        tasks.max=2
        
        // ソーストピックの名前。
        topics=sink-topic
        
        // データベースへの接続に使用するURL。URLの形式は、データベースの種類によって異なります。
        // MySQL: jdbc:mysql://<host>:<port>/<database>
        // PostgreSQL: jdbc:postgresql://<host>:<port>/<database>
        // IBM DB2: jdbc:db2://<host>:<port>/<database>
        // IBM Informix: jdbc:informix-sqli://<ip>:<port>/<database>:informixserver=<debservername>
        // MS SQL: jdbc:sqlserver://<host>[:<port>];databaseName=<database>
        // Oracle: jdbc:oracle:thin://<host>:<port>/<service> or jdbc:oracle:thin:<host>:<port>:<SID>
        connection.url=jdbc:mysql://rm-******.mysql.rds.aliyuncs.com:3306/test_database
        
        // データベースにアクセスするために使用するアカウント。
        connection.user=root
        // データベースにアクセスするために使用するパスワード。
        connection.password=123456
        
        insert.mode=upsert
        // デスティネーショントテーブルを自動的に作成するかどうかを指定します。
        auto.create=false
        
        // record_valueは、ApsaraMQ for Kafkaのメッセージの値フィールドからデスティネーショントテーブルにデータが同期されることを指定します。
        pk.mode=record_value
        // プライマリキーの名前。複数のプライマリキーはコンマ (,) で区切ります。
        pk.fields=id
        
        // デスティネーショントテーブルの名前を指定します。
        table.name.format=sink_table
        
        value.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter.schemas.enable=true
        
        key.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=false
        
        // フォールトトレランスポリシー。値allは、エラーが発生した場合でもコネクタが実行を継続することを指定します。値noneは、エラーが発生した場合にコネクタが実行を停止し、システムが例外をスローすることを指定します。
        errors.tolerance=none

        JDBCシンクコネクタの作成に使用されるすべてのパラメータについては、「JDBC Sink Connector Configuration Properties」をご参照ください。

      3. インスタンス設定 手順で、パラメータを設定し、次へ をクリックします。次の表にパラメータを示します。

        サブセクション

        パラメータ

        説明

        ワーカータイプ

        ワーカータイプ

        ワーカータイプを選択します。

        ワーカーの最小数

        このパラメータを 1 に設定します。

        ワーカーの最大数

        このパラメータを 1 に設定します。このフィールドの値は、tasks.max フィールドの値より大きくすることはできません。

        ワーカー構成

        Apache Kafka Connector Workerの依存関係を自動的に作成する

        このオプションを選択することをお勧めします。このオプションを選択すると、選択したApsaraMQ for KafkaインスタンスでKafka Connectを実行するために必要な内部トピックとコンシューマーグループが作成され、情報がコードエディタの対応するパラメータに同期されます。次のセクションでは、構成項目について説明します。

        • オフセットトピック: オフセットデータを格納するために使用されるトピック。トピックの名前は connect-eb-offset-<タスク名> 形式です。

        • 構成トピック: コネクタとタスクの構成データを格納するために使用されるトピック。トピックの名前は connect-eb-config-<タスク名> 形式です。

        • ステータストピック: コネクタとタスクのステータスデータを格納するために使用されるトピック。トピックの名前は connect-eb-status-<タスク名> 形式です。

        • Kafka Connectコンシューマーグループ: Kafka Connectワーカーが内部トピックのメッセージを消費するために使用するコンシューマーグループ。コンシューマーグループの名前は connect-eb-cluster-<タスク名> 形式です。

        • Kafkaソースコネクタコンシューマーグループ: ソーストピックのデータを消費するために使用されるコンシューマーグループ。このコンシューマーグループは、シンクコネクタに対してのみ有効です。コンシューマーグループの名前は connector-eb-cluster-<タスク名>-<コネクタ名> 形式です。

      4. [実行構成] セクションで、[ログ配信] パラメータを [log Serviceにデータを送信] または [apsaramq For Kafkaにデータを送信] に設定し、[ロール] ドロップダウンリストからKafka Connectが依存するロールを選択し、[ロール認証] サブセクションで [保存] をクリックします。

        重要

        AliyunSAEFullAccess権限ポリシーがアタッチされているロールを選択することをお勧めします。そうでない場合、タスクが実行に失敗する可能性があります。

    • タスクプロパティ

      タスクの再試行ポリシーとデッドレターキューを設定します。詳細については、「再試行ポリシーとデッドレターキュー」をご参照ください。

    タスクのステータスが [実行中] になると、コネクタは想定どおりに動作を開始します。

ステップ 3: コネクターをテストする

ソースコネクター

  1. DMS コンソールで、ステップ 1: テーブルを作成するで作成したデータテーブルにデータレコードを挿入します。id が 12、number が 20 のデータレコードを挿入する方法の例を、次のサンプルコマンドに示します。

    INSERT INTO sql_table(id, number) VALUES(12,20);
  2. ApsaraMQ for Kafka コンソール にログオンします。[インスタンス] ページで、管理するインスタンスをクリックします。

  3. [インスタンスの詳細] ページの左側のナビゲーションペインで、[トピック] をクリックします。表示されるページで、管理するトピックをクリックします。次に、[メッセージクエリ] タブをクリックして、挿入されたメッセージデータを表示します。メッセージ値の例を、次のサンプルコードに示します。

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":true,"field":"number"}],"optional":false,"name":"sql_table"},"payload":{"id":12,"number":20}}

シンクコネクター

  1. ApsaraMQ for Kafka コンソール にログオンします。[インスタンス] ページで、管理するインスタンスをクリックします。

  2. [インスタンスの詳細] ページの左側のナビゲーションペインで、[トピック] をクリックします。表示されるページで、管理するトピックの名前をクリックします。

  3. [トピックの詳細] ページの右上隅にある [メッセージの送信] をクリックします。

  4. [メッセージの送受信を開始] パネルで、メッセージの内容を指定します。たとえば、id が 13、number が 14 のデータレコードを追加する場合、次のメッセージの内容を入力します。

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":true,"field":"number"}],"optional":false,"name":"sql_table"},"payload":{"id":13,"number":14}}
  5. DMS コンソールで、データがデスティネーションテーブルに書き込まれているかどうかを確認します。

    接收数据

一般的なエラーとトラブルシューティング

エラー 1:すべてのタスクが実行に失敗する

エラーメッセージ:

All tasks under connector mongo-source failed, please check the error trace of the task.

解決策:メッセージ入力タスクの詳細ページで、[基本情報] セクションの [診断] をクリックして、コネクタ監視ページに移動します。 コネクタ監視ページで、タスクの失敗の詳細を確認できます。

エラー 2:Kafka Connect が予期せず終了する

エラーメッセージ:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

解決策:Kafka Connect のステータス更新が遅延している可能性があります。 ページを更新することをお勧めします。 Kafka Connect がまだ失敗する場合は、次の操作を実行して問題をトラブルシューティングできます。

  1. メッセージ入力タスクの詳細ページの [ワーカー情報] セクションで、[SAE アプリケーション] の右側のインスタンス名をクリックして、アプリケーション詳細ページに移動します。

  2. [基本情報] ページで、[インスタンスデプロイ情報] タブをクリックします。

  3. [アクション] 列の [webshell] をクリックして、Kafka Connect の実行環境にログオンします。实例部署信息

    • vi /home/admin/connector-bootstrap.log コマンドを実行して、コネクタの起動ログを表示し、ログにエラーメッセージが存在するかどうかを確認します。

    • vi /opt/kafka/logs/connect.log コマンドを実行して、コネクタの実行ログを表示し、ERROR または WARN フィールドにエラーメッセージが存在するかどうかを確認します。

エラーメッセージに基づいて問題をトラブルシューティングした後、対応するタスクを再起動できます。

エラー 3:コネクタパラメータの検証が失敗する

エラーメッセージ:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

解決策:エラーメッセージに基づいて値が無効なパラメータを見つけ、パラメータを更新します。 エラーメッセージに基づいてパラメータが見つからない場合は、Kafka Connect の実行環境にログオンし、次のコマンドを実行できます。 Kafka Connect の実行環境へのログオン方法については、このトピックのエラー 2 を参照してください。

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

各コネクタパラメータの検証結果は、レスポンスで返されます。 パラメータの値が無効な場合、パラメータの errors フィールドは空ではありません。

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}