すべてのプロダクト
Search
ドキュメントセンター

EventBridge:Debezium PostgreSQL Sourceコネクタを使用して、ApsaraDB RDS for PostgreSQLからApsaraMQ for Kafkaにデータを同期する

最終更新日:Mar 21, 2025

このトピックでは、Debezium PostgreSQL Sourceコネクタを作成して、ApsaraDB RDS for PostgreSQLから ApsaraMQ for Kafka にデータを同期する方法について説明します。

制限事項

Debezium PostgreSQL Sourceコネクタでは、ソースからChange Data Capture(CDC)データを使用するために、1つのタスクのみを構成できます。

前提条件

手順 1:テーブルを作成する

  1. ApsaraDB RDSコンソール にログインし、ApsaraDB RDS for PostgreSQLインスタンスを作成します。詳細については、「ApsaraDB RDS for PostgreSQLインスタンスを作成する」をご参照ください。

    ApsaraDB RDS for PostgreSQLインスタンスを作成する際は、「前提条件」セクションで作成したApsaraMQ for KafkaインスタンスがデプロイされているVPCを選択し、VPCのCIDRブロックをホワイトリストに追加します。加入白名单

  2. [インスタンス] ページで、作成したApsaraDB RDS for PostgreSQLインスタンスをクリックします。次に、インスタンスの詳細ページで次の操作を実行します。

    1. 新しいアカウントを作成します。詳細については、「データベースとアカウントを作成する」をご参照ください。既存のアカウントを使用することもできます。

    2. データベースを作成します。詳細については、「データベースとアカウントを作成する」をご参照ください。既存のデータベースを使用することもできます。

    3. [データベース接続] をクリックして、内部エンドポイントとポート番号を表示し、記録します。

      内网地址

    4. [パラメーター] をクリックし、wal_level[実行中のパラメーター値] 列で 変更の適用 パラメーターの値を logical に変更し、 をクリックします。

  3. インスタンスの詳細ページで、[データベースにログイン] をクリックして、Data Management(DMS)コンソールに移動します。次に、次の操作を実行します。

    1. 管理するデータベースを右クリックし、[モード管理] を選択し、[スキーマの作成] をクリックしてスキーマを作成します。

      説明

      コネクタの構成では、新しく作成したスキーマを使用する必要があります。information、pg_catalog、publicスキーマなどのシステムスキーマは使用できません。

    2. 新しいスキーマで、SQLステートメントを使用してテーブルを作成します。次のサンプルコマンドは、列名が idnumber であるテーブルを作成する方法の例を示しています。詳細については、「SQLコマンド」をご参照ください。

      CREATE TABLE sql_table(id INT ,number INT);
    3. 次のコマンドを実行して、wal2jsonプラグインを作成および初期化し、データサブスクリプションを有効にします。

      SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

手順 2:コネクタを作成する

  1. Debezium PostgreSQL CDC Sourceコネクタ ファイルをダウンロードし、作成したOSSバケットにアップロードします。詳細については、「OSSコンソールを使用して開始する」をご参照ください。

    重要

    Debezium PostgreSQL CDC Sourceコネクタファイルをダウンロードする際は、Java 8と互換性のあるバージョンを選択してください。

  2. ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションペインで、[コネクタエコシステム統合] > タスクリスト を選択します。

  4. タスクリスト ページで、タスクリストの作成 をクリックします。

  5. タスクの作成 ページで、[タスク名] パラメーターを構成し、画面の指示に従って他のパラメーターを構成します。次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [apache Kafka Connect] に設定し、次へ をクリックします。

      2. コネクタの設定 ステップで、パラメーターを構成し、次へ をクリックします。次の表にパラメーターを示します。

        サブセクション

        パラメーター

        説明

        Kafka Connectプラグイン

        バケット

        Debezium PostgreSQL CDC SourceコネクタファイルをアップロードしたOSSバケットを選択します。

        ファイル

        OSSバケットにアップロードしたDebezium PostgreSQL CDC Sourceコネクタファイルを選択します。

        Message Queue for Apache Kafkaリソース

        Message Queue for Apache Kafkaパラメーター

        Source Connectを選択します。

        Message Queue for Apache Kafkaインスタンス

        このトピックの「前提条件」セクションで作成したApsaraMQ for Kafkaインスタンスを選択します。

        VPC

        作成したVPCを選択します。

        vSwitch

        作成したvSwitchを選択します。

        セキュリティグループ

        セキュリティグループを選択します。

        Kafka Connect

        ZIPパッケージ内の .properties ファイルを解析する

        [.properties ファイルを作成する] を選択します。コードエディターで関連フィールドの値を変更します。

        展開してフィールドの説明を表示する

        フィールド

        説明

        connector.class

        Debezium PostgreSQL CDC Sourceコネクタファイルの名前。このフィールドにはデフォルト値を使用します。

        database.dbname

        ApsaraDB RDS for PostgreSQLデータベースの名前。

        database.hostname

        手順 1:テーブルを作成する で取得した内部エンドポイントを指定します。

        database.port

        手順 1:テーブルを作成する で取得したポート番号を指定します。

        database.user

        ApsaraDB RDS for PostgreSQLデータベースにログインするために使用するユーザー名。

        database.password

        ApsaraDB RDS for PostgreSQLデータベースにログインするために使用するパスワード。

        slot.name

        ApsaraDB RDS for PostgreSQLデータベースの論理レプリケーションストリームの名前。

        table.whitelist

        データベーステーブル。複数のテーブルはコンマ(,)で区切ります。テーブルの名前は <schemaName>.<tableName> 形式です。

        database.server.name

        デスティネーショントピック名のプレフィックス。デスティネーショントピックの名前は {database.server.name}.{schemaName}.{tableName} 形式です。

        重要

        データを同期する前に、上記の形式でトピックを作成する必要があります。

        展開してサンプルコードを表示する

        connector.class=io.debezium.connector.postgresql.PostgresConnector
        database.dbname=test_database
        database.hostname=pgm-xxx.pg.rds.aliyuncs.com
        database.password=xxx
        database.port=5432
        database.user=xxx
        name=debezium-psql-source
        // プラグインの名前。この例では、wal2jsonが使用されています。有効な値:decoderbufs、wal2json、wal2json_rds、wal2json_streaming、wal2 json_rds_streaming。
        plugin.name=wal2json
        slot.drop_on_stop=true
        slot.name=test_slot
        // ソースデータベースのテーブル。複数のテーブルはコンマ(,)で区切ります。テーブルを指定するために使用されるルールは、<schemaName>.<tableName> 形式です。
        table.whitelist=test_schema.test_table
        // 注:メッセージを使用できるタスクは1つだけです。
        tasks.max=1
        
        // デスティネーショントピックのプレフィックス。デスティネーショントピックの名前は、<database.server.name>.<schemaName>.<tableName> 形式です。
        // この例では、schemaNameはkafka_connect_schema、tableNameはtable2_with_pkです。
        // table2_with_pkテーブルのCDCデータは、デスティネーショントピックtest-prefix.kafka_connect_schema.table2_with_pkに流れます。
        database.server.name=test-prefix
        
        // メッセージ値の形式を変換するために使用されるコンポーネント。
        value.converter=org.apache.kafka.connect.json.JsonConverter
        // メッセージ値に構造に関するスキーマ情報が含まれているかどうかを指定します。
        value.converter.schemas.enable=false

        Debezium PostgreSQL Sourceコネクタの作成に使用されるすべてのパラメーターについては、Debezium Connector for PostgreSQLの「コネクタプロパティ」セクションをご参照ください。

      3. インスタンス設定 ステップで、パラメーターを構成し、次へ をクリックします。次の表にパラメーターを示します。

        サブセクション

        パラメーター

        説明

        ワーカータイプ

        [ワーカータイプ]

        ワーカータイプを選択します。

        [ワーカーの最小数]

        このパラメーターを 1 に設定します。

        [ワーカーの最大数]

        このパラメーターを 1 に設定します。

        ワーカー構成

        Apache Kafkaコネクタワーカーの依存関係を自動的に作成する

        このオプションを選択することをお勧めします。このオプションを選択すると、システムは、選択したApsaraMQ for KafkaインスタンスでKafka Connectを実行するために必要な内部トピックとコンシューマーグループを作成し、情報をコードエディターの対応するパラメーターに同期します。次の項目では、コードエディターのパラメーターについて説明します。

        • オフセットトピック:オフセットデータを格納するために使用されるトピック。トピックの名前は connect-eb-offset-<タスク名> 形式です。

        • 構成トピック:コネクタとタスクの構成データを格納するために使用されるトピック。トピックの名前は connect-eb-config-<タスク名> 形式です。

        • ステータストピック:コネクタとタスクのステータスデータを格納するために使用されるトピック。トピックの名前は connect-eb-status-<タスク名> 形式です。

        • Kafka Connectコンシューマーグループ:Kafka Connectワーカーが内部トピックのメッセージを使用するために使用するコンシューマーグループ。コンシューマーグループの名前は connect-eb-cluster-<タスク名> 形式です。

        • Kafkaソースコネクタコンシューマーグループ:ソースApsaraMQ for Kafkaトピックのデータを使用するために使用されるコンシューマーグループ。このコンシューマーグループは、シンクコネクタのみが使用できます。コンシューマーグループの名前は connector-eb-cluster-<タスク名>-<コネクタ名> 形式です。

      4. 実行設定 セクションで、[ログ配信] パラメーターを [log Serviceにデータを配信する] または [apsaramq For Kafkaにデータを配信する] に設定し、[ロールの承認] サブセクションの [ロール] ドロップダウンリストからKafka Connectが依存するロールを選択し、[保存] をクリックします。

        重要

        AliyunSAEFullAccess権限ポリシーがアタッチされているロールを選択することをお勧めします。そうしないと、タスクが実行に失敗する可能性があります。

    • タスクプロパティ

      タスクの再試行ポリシーとデッドレターキューを構成します。詳細については、「再試行ポリシーとデッドレターキュー」をご参照ください。

    タスクのステータスが [実行中] になると、コネクタは想定どおりに動作を開始します。

手順 3:コネクタをテストする

  1. DMSコンソールで、手順 1:テーブルを作成する で作成したデータテーブルにデータレコードを挿入します。次のサンプルコマンドは、id が 123 、number が 20000 であるデータレコードを挿入する方法の例を示しています。

    INSERT INTO sql_table(id, number) VALUES(123,20000);
  2. ApsaraMQ for Kafka コンソール にログインします。[インスタンス] ページで、管理するインスタンスの名前をクリックします。

  3. [インスタンスの詳細] ページの左側のナビゲーションペインで、[トピック] をクリックします。表示されるページで、{database.server.name}.{schemaName}.{tableName} 形式で作成したトピックをクリックします。表示されるページで、[メッセージクエリ] タブをクリックして、挿入されたメッセージデータを表示します。次のサンプルコードは、メッセージ値の例を示しています。

    {"before":null,"after":{"id":123,"number":20000},"source":{"version":"0.9.2.Final","connector":"postgresql","name":"test-prefix","db":"wb","ts_usec":168386295815075****,"txId":10339,"lsn":412719****,"schema":"test_schema","table":"sql_table","snapshot":false,"last_snapshot_record":null},"op":"c","ts_ms":168386295****}

一般的なエラーとトラブルシューティング

エラー 1:すべてのタスクが実行に失敗する

エラーメッセージ:

コネクタ mongo-source の下のすべてのタスクが失敗しました。タスクのエラートレースを確認してください。

解決策:[メッセージ流入タスクの詳細] ページの [基本情報] セクションで、[診断] をクリックして、[コネクタの監視] ページに移動します。[コネクタの監視] ページで、タスクの失敗の詳細を確認できます。

エラー 2:Kafka Connectが予期せず終了する

エラーメッセージ:

Kafka connectが終了しました! saeアプリケーションのエラーログ /opt/kafka/logs/connect.log を確認して、kafka connectが終了した理由を特定し、有効な引数でイベントストリーミングを更新して解決してください。

解決策:Kafka Connectのステータス更新が遅延している可能性があります。ページを更新することをお勧めします。 Kafka Connectがまだ失敗する場合は、次の操作を実行して問題をトラブルシューティングできます。

  1. [メッセージ流入タスクの詳細] ページの [ワーカー情報] セクションで、[SAEアプリケーション] の右側のインスタンス名をクリックして、[アプリケーションの詳細] ページに移動します。

  2. [基本情報] ページで、[インスタンスデプロイ情報] タブをクリックします。

  3. [アクション] 列の [webshell] をクリックして、Kafka Connectの実行環境にログインします。实例部署信息

    • vi /home/admin/connector-bootstrap.log コマンドを実行して、コネクタの起動ログを表示し、ログにエラーメッセージが存在するかどうかを確認します。

    • vi /opt/kafka/logs/connect.log コマンドを実行して、コネクタの実行ログを表示し、ERROR フィールドまたは WARN フィールドにエラーメッセージが存在するかどうかを確認します。

エラーメッセージに基づいて問題をトラブルシューティングした後、対応するタスクを再起動できます。

エラー 3:コネクタパラメーターの検証が失敗する

エラーメッセージ:

コネクタ xxx の起動または更新に失敗しました。エラーコード=400。エラーメッセージ=コネクタ構成が無効であり、次の 1 つのエラーが含まれています。
値は never、initial_only、when_needed、initial、schema_only、schema_only_recovery のいずれかである必要があります
上記のエラーのリストは、エンドポイント `/connector-plugins/{connectorType}/config/validate` でも確認できます。

解決策:エラーメッセージに基づいて値が無効なパラメーターを見つけ、パラメーターを更新します。エラーメッセージに基づいてパラメーターが見つからない場合は、Kafka Connectの実行環境にログインし、次のコマンドを実行できます。 Kafka Connectの実行環境へのログイン方法については、このトピックのエラー 2 をご参照ください。

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

各コネクタパラメーターの検証結果は、レスポンスで返されます。パラメーターの値が無効な場合、パラメーターの errors フィールドは空ではありません。

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"  // 値は never、initial_only、when_needed、initial、schema_only、schema_only_recovery のいずれかである必要があります
    ],
    "visible":true
}