All Products
Search
Document Center

EventBridge:Synchronize data from ApsaraDB RDS for PostgreSQL to ApsaraMQ for Kafka by using the Debezium PostgreSQL Source connector

Last Updated:Mar 11, 2026

When your application needs to react to database changes in real time -- such as updating search indexes, invalidating caches, or feeding analytics pipelines -- you need a reliable Change Data Capture (CDC) pipeline. The Debezium PostgreSQL Source connector on EventBridge captures row-level changes from an ApsaraDB RDS for PostgreSQL database and streams them to an ApsaraMQ for Kafka topic, so downstream consumers receive every insert, update, and delete as it happens.

This topic walks you through the end-to-end setup: preparing the PostgreSQL source, deploying the connector, and verifying that CDC events reach Kafka.

Limitations

The Debezium PostgreSQL Source connector supports only one task per source (tasks.max=1). You cannot run concurrent tasks to consume CDC data from the same source.

Prerequisites

Before you begin, make sure that you have:

Step 1: Prepare the PostgreSQL database

This step covers creating an ApsaraDB RDS for PostgreSQL instance, enabling logical replication, and setting up the schema, table, and replication slot that the connector requires.

Create an ApsaraDB RDS for PostgreSQL instance

  1. Log on to the ApsaraDB RDS console and create an ApsaraDB RDS for PostgreSQL instance. For more information, see Create an ApsaraDB RDS for PostgreSQL instance. Select the same VPC as your ApsaraMQ for Kafka instance and add the VPC CIDR block to the whitelist.

    Add the VPC CIDR block to the whitelist

Create a database account and database

  1. On the Instances page, click the instance you created, and then complete the following tasks:

    1. Create a database account. For more information, see Create a database and an account on an ApsaraDB RDS for PostgreSQL instance. You can also use an existing account.

    2. Create a database. For more information, see Create a database and an account on an ApsaraDB RDS for PostgreSQL instance. You can also use an existing database.

    3. Click Database Connection and note the internal endpoint and port number. You need these values when you configure the connector.

    Internal endpoint and port number

Enable logical replication

  1. On the instance details page, click Parameters, change the value of the wal_level parameter to logical in the Running Parameter Value column, and then click Apply Changes.

    Note

    This change enables Write-Ahead Log (WAL) logical decoding, which the Debezium connector requires to capture row-level changes.

Create a schema, table, and replication slot

  1. On the instance details page, click Log On to Database to open the Data Management (DMS) console. Then complete the following tasks:

    1. Right-click the target database, select Mode Management, and then click Create Schema.

      Important

      Use a custom schema. System schemas (information, pg_catalog, and public) are not supported for this connector.

    2. In the new schema, run the following SQL statement to create a table. This example creates a table named sql_table with two integer columns:

      CREATE TABLE sql_table(id INT, number INT);

      For more information about PostgreSQL SQL commands, see SQL Commands.

    3. Run the following SQL statement to create a logical replication slot with the wal2json plug-in and enable data subscription:

      SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

Step 2: Upload the connector plug-in and create a task

Upload the connector file to OSS

  1. Download the Debezium PostgreSQL CDC Source Connector from Confluent Hub.

    Important

    Select a version compatible with Java 8.

  2. Upload the downloaded file to the OSS bucket you created. For more information, see Get started by using the OSS console.

Create the connector task

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where your ApsaraMQ for Kafka instance resides.

  2. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  3. On the Tasks page, click Create Task.

  4. On the Create Task page, configure Task Name and then complete the following sections.

Source configuration

In the Source step, set Data Provider to Apache Kafka Connect and click Next Step.

Connector configuration

In the Connector step, configure the following parameters and click Next Step.

SubsectionParameterDescription
Kafka Connect Plug-inBucketThe OSS bucket that contains the connector file
FileThe Debezium PostgreSQL CDC Source Connector file
Message Queue for Apache Kafka ResourcesMessage Queue for Apache Kafka ParametersSelect Source Connect
Message Queue for Apache Kafka InstanceThe ApsaraMQ for Kafka instance you created
VPCThe VPC you created
vSwitchThe vSwitch you created
Security GroupA security group in the same VPC
Kafka ConnectParse .properties File in ZIP PackageSelect Create .properties File

After you select Create .properties File, a code editor opens with the connector properties. The following section explains each field.

Connector properties reference

Review and update the following fields in the code editor:

FieldRequiredDefaultDescription
connector.classYesio.debezium.connector.postgresql.PostgresConnectorThe fully qualified connector class. Do not change this value.
database.dbnameYes--The name of your ApsaraDB RDS for PostgreSQL database.
database.hostnameYes--The internal endpoint from Step 1.
database.portYes5432The port number from Step 1.
database.userYes--The database account username.
database.passwordYes--The database account password.
nameYes--A name for this connector instance. Example: debezium-psql-source.
plugin.nameYes--The PostgreSQL logical decoding plug-in. Valid values: decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming.
slot.drop_on_stopNofalseWhether to drop the replication slot when the connector stops. Set to true for development environments only.
slot.nameYes--The logical replication slot name. Must match the slot created in Step 1. Example: test_slot.
table.whitelistYes--The tables to monitor, in <schemaName>.<tableName> format. Separate multiple tables with commas.
tasks.maxYes--Must be 1. Only one task can consume CDC data from a source.
database.server.nameYes--A logical name that serves as the prefix for destination topic names. The destination topic is named <database.server.name>.<schemaName>.<tableName>.
value.converterNoorg.apache.kafka.connect.json.JsonConverterThe message value converter.
value.converter.schemas.enableNofalseWhether to include schema information in message values.
Important

Create the destination topic in ApsaraMQ for Kafka before you start the connector. The topic name must follow the format <database.server.name>.<schemaName>.<tableName>.

The following example shows a complete .properties configuration:

connector.class=io.debezium.connector.postgresql.PostgresConnector
database.dbname=test_database
database.hostname=pgm-xxx.pg.rds.aliyuncs.com
database.password=<your-password>
database.port=5432
database.user=<your-username>
name=debezium-psql-source

# Logical decoding plug-in. Valid values: decoderbufs, wal2json,
# wal2json_rds, wal2json_streaming, wal2json_rds_streaming
plugin.name=wal2json
slot.drop_on_stop=true
slot.name=test_slot

# Tables to capture. Format: <schemaName>.<tableName>
table.whitelist=test_schema.test_table

# Only one task can consume CDC data from a single source
tasks.max=1

# Destination topic prefix.
# Resulting topic name: test-prefix.kafka_connect_schema.table2_with_pk
database.server.name=test-prefix

# Message format
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Important

Do not store passwords in plain text in production configurations. Use a secrets management mechanism such as KIP-297 to externalize sensitive credentials.

For all available connector properties, see the Connector properties section in the Debezium documentation.

Instance configuration

In the Instance step, configure the following parameters and click Next Step.

SubsectionParameterDescription
Worker TypeWorker TypeSelect a worker type
Min. Number of WorkersSet to 1
Max. Number of WorkersSet to 1
Worker ConfigurationsAutomatically Create Dependencies for Apache Kafka Connector Worker(Recommended) Select this option to automatically create the internal topics and consumer groups that Kafka Connect requires

When you select Automatically Create Dependencies for Apache Kafka Connector Worker, the system creates the following resources in your ApsaraMQ for Kafka instance:

ResourceNaming formatPurpose
Offset topicconnect-eb-offset-<Task name>Stores connector offset data
Config topicconnect-eb-config-<Task name>Stores connector and task configuration
Status topicconnect-eb-status-<Task name>Stores connector and task status
Kafka Connect consumer groupconnect-eb-cluster-<Task name>Used by Kafka Connect workers for internal topics
Source connector consumer groupconnector-eb-cluster-<task name>-<connector name>Used by sink connectors only

Running configurations and role authorization

In the Running Configurations section:

  1. Set Log Delivery to Deliver Data to Log Service or Deliver Data to ApsaraMQ for Kafka.

  2. In the Role Authorization subsection, select a role from the Role drop-down list.

    Important

    Select a role with the AliyunSAEFullAccess policy attached. Otherwise, the task may fail to start.

  3. Click Save.

Task property

Configure the retry policy and dead-letter queue for the task. For more information, see Retry policies and dead-letter queues.

When the task status changes to Running, the connector is working.

Step 3: Verify data synchronization

After the connector starts, insert a test record into your PostgreSQL table and verify that the CDC event appears in the corresponding Kafka topic.

Insert a test record

  1. In the DMS console, insert a record into the table you created in Step 1:

       INSERT INTO sql_table(id, number) VALUES(123, 20000);

Check the Kafka topic for CDC events

  1. Log on to the ApsaraMQ for Kafka console and click the instance name on the Instances page.

  2. In the left-side navigation pane, click Topics. Click the topic that matches the <database.server.name>.<schemaName>.<tableName> format, and then click the Message Query tab.

  3. Verify that the CDC event appears. The following example shows a typical INSERT event:

       {
         "before": null,
         "after": {
           "id": 123,
           "number": 20000
         },
         "source": {
           "version": "0.9.2.Final",
           "connector": "postgresql",
           "name": "test-prefix",
           "db": "wb",
           "ts_usec": 168386295815075****,
           "txId": 10339,
           "lsn": 412719****,
           "schema": "test_schema",
           "table": "sql_table",
           "snapshot": false,
           "last_snapshot_record": null
         },
         "op": "c",
         "ts_ms": 168386295****
       }

CDC event field reference

The following table describes the key fields in a CDC event:

FieldDescriptionExample values
beforeThe row state before the change. null for INSERT operations.null, {"id": 123, "number": 10000}
afterThe row state after the change. Contains the inserted or updated column values. null for DELETE operations.{"id": 123, "number": 20000}
sourceMetadata about the change event, including the database name, schema, table, and transaction ID.See the example above
opThe operation type.c (create/insert), u (update), d (delete)
ts_msThe timestamp when the connector processed the event, in milliseconds since epoch.168386295****

Troubleshooting

All tasks fail to run

Error message:

All tasks under connector mongo-source failed, please check the error trace of the task.

Solution: On the Message Inflow Task Details page, click Diagnostics in the Basic Information section to open the Connector Monitoring page. Review the task failure details.

Kafka Connect unexpectedly exits

Error message:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

Solution: Refresh the page first. The status update may be delayed. If Kafka Connect remains in a failed state, access the SAE application logs:

  1. In the Worker Information section of the Message Inflow Task Details page, click the instance name next to SAE Application.

  2. On the Basic Information page, click the Instance Deployment Information tab.

  3. Click Webshell in the Actions column.

    Instance deployment information

  4. Check the following log files:

    • Startup logs: /home/admin/connector-bootstrap.log

    • Runtime logs: /opt/kafka/logs/connect.log (look for ERROR or WARN entries)

After you identify and fix the issue, restart the task.

Connector parameter validation fails

Error message:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

Solution: Identify the invalid parameter from the error message and correct its value.

If the error message does not specify which parameter failed, log on to the SAE application (see the previous section) and run the following command to validate all parameters:

curl -i -X PUT \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  -d @$CONNECTOR_PROPERTIES_MAPPING \
  http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

The response returns the validation result for each parameter. Parameters with errors have a non-empty errors array:

{
  "value": {
    "name": "snapshot.mode",
    "value": null,
    "recommended_values": [
      "never",
      "initial_only",
      "when_needed",
      "initial",
      "schema_only",
      "schema_only_recovery"
    ],
    "errors": [
      "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible": true
  }
}

What to do next