All Products
Search
Document Center

Data Transmission Service:Synchronize data from a PolarDB for PostgreSQL (Compatible with Oracle) cluster to a Message Queue for Apache Kafka instance

Last Updated:Nov 20, 2025

This topic describes how to use Data Transmission Service (DTS) to synchronize data from a PolarDB for PostgreSQL (Compatible with Oracle) cluster to a Message Queue for Apache Kafka instance.

Prerequisites

  • The wal_level parameter of the source PolarDB for PostgreSQL (Compatible with Oracle) cluster must be set to logical. This setting adds information required for logical encoding to the write-ahead logging (WAL). For more information, see Set cluster parameters.

  • A destination Message Queue for Apache Kafka instance is created. The storage space of the destination instance must be larger than the storage space used by the source PolarDB for PostgreSQL (Compatible with Oracle) instance.

    Note

    For the supported versions of the source and destination databases, see Synchronization Solution Overview.

  • A topic is created in the destination Message Queue for Apache Kafka instance to receive the synchronized data. For more information, see Step 1: Create a topic.

Usage notes

Type

Description

Source database limits

  • Bandwidth requirements: The server to which the source database belongs must have sufficient outbound bandwidth. Otherwise, the data synchronization speed is affected.

  • The tables to be synchronized must have PRIMARY KEY or UNIQUE constraints, and all fields must be unique. Otherwise, the destination database may contain duplicate data records.

  • If you select tables as the objects to be synchronized and you need to modify the tables in the destination database, such as renaming tables or columns, up to 1,000 tables can be synchronized in a single data synchronization task. If you run a task to synchronize more than 1,000 tables, a request error occurs. In this case, we recommend that you configure multiple tasks to synchronize the tables in batches or configure a task to synchronize the entire database.

  • The WAL feature must be enabled. If you perform only incremental data synchronization, the WAL logs of the source database must be stored for more than 24 hours. If you perform both full data synchronization and incremental data synchronization, the WAL logs of the source database must be stored for at least seven days. Otherwise, DTS may fail to obtain the WAL logs and the task may fail. In exceptional circumstances, data inconsistency or loss may occur. After full data synchronization is complete, you can set the retention period to more than 24 hours. Make sure that you set the retention period of WAL logs based on the preceding requirements. Otherwise, the Service Level Agreement (SLA) of DTS does not guarantee service reliability or performance.

  • If one or more long-running transactions exist in the source database and incremental data is synchronized in the data synchronization task, the WAL logs generated before the long-running transactions in the source database are committed may be accumulated. As a result, the disk space of the source database may be insufficient.

  • Limits on operations to be performed on the source database:

    • During schema synchronization and full data synchronization, do not perform DDL operations to change the schemas of databases or tables. Otherwise, the data synchronization task fails.

    • If you perform only full data synchronization, do not write data to the source database during data synchronization. Otherwise, data inconsistency may occur between the source and destination databases. To ensure data consistency, we recommend that you select schema synchronization, full data synchronization, and incremental data synchronization as the synchronization types.

    • If you need to perform a primary/secondary switchover on the source PolarDB for PostgreSQL (Compatible with Oracle) cluster, the logical replication slot failover feature must be enabled. This prevents logical subscriptions from being interrupted and ensures that your data synchronization task can run as expected. For more information, see Logical replication slot failover.

      Note

      If the source PolarDB for PostgreSQL (Compatible with Oracle) cluster does not support the logical replication slot failover feature (for example, when Database Engine is set to PolarDB for PostgreSQL (Compatible with Oracle) 2.0) and when you trigger a primary/secondary switchover, the data synchronization task may fail and it cannot be recovered.

    • The logical subscription from source database has limits on the usage of DTS. If the size of a single data to be synchronized from the source database exceeds 256 MB upon incremental data changes, the data synchronization instance that is running fails to run and cannot be recovered. You need to configure the task again.

Other limits

  • A single data synchronization task can synchronize data from only one database. To synchronize data from multiple databases, you must create a data synchronization task for each database.

  • DTS does not support synchronization of tables that use the TimescaleDB extension or tables that have cross-schema inheritance relationships.

  • DTS does not support synchronizing INDEX, PARTITION, VIEW, PROCEDURE, FUNCTION, TRIGGER, and FK objects.

  • During data synchronization, DTS creates a replication slot with the dts_sync_ prefix in the source database to replicate data. DTS can obtain the incremental logs of the source database within 15 minutes from this replication slot.

    Note
    • DTS automatically deletes the replication slot after the instance is released. If you change the password of the database account or delete the IP address of the DTS server from the whitelist of the source database during data synchronization, the replication slot cannot be automatically deleted. In this case, you must manually delete the replication slot from the source database to prevent it from occupying disk space and making the ApsaraDB RDS for PostgreSQL instance unavailable.

    • When the synchronization task is released or fails, DTS automatically clears the replication slot. If a primary/secondary failover occurs on the ApsaraDB RDS for PostgreSQL instance, you must log on to the secondary database to manually clear the replication slot.

    Amazon slot查询信息

  • If the destination Kafka instance is scaled out or scaled in during data synchronization, you must restart the migration instance.

  • In the following three scenarios, you must run the ALTER TABLE schema.table REPLICA IDENTITY FULL; command on the table to be synchronized in the source database before you write data to the table. This ensures data consistency. During the execution of this command, we recommend that you do not perform table locking operations. Otherwise, the table may be locked. If you skip the related check item in the precheck, DTS automatically runs this command when the instance is initialized.

    • When the instance runs for the first time.

    • When you synchronize data at the schema level, and a new table is created in the schema to be synchronized or a table to be synchronized is rebuilt using the RENAME command.

    • When you use the feature of modifying synchronization objects.

    Note
    • In the command, replace schema and table with the schema name and table name of the data to be synchronized.

    • We recommend that you perform this operation during off-peak hours.

  • DTS creates the following temporary tables in the source database to obtain the DDL statements of incremental data, the schemas of incremental tables, and heartbeat information. Do not delete these temporary tables from the source database during data synchronization. Otherwise, the DTS task may become abnormal. The temporary tables are automatically deleted after the DTS instance is released.

    public.dts_pg_class, public.dts_pg_attribute, public.dts_pg_type, public.dts_pg_enum, public.dts_postgres_heartbeat, public.dts_ddl_command, public.dts_args_session, and public.aliyun_dts_instance.

  • To ensure that the latency of incremental data synchronization is accurate, DTS adds a heartbeat table named dts_postgres_heartbeat to the source database.

  • Before you synchronize data, evaluate the impact of data synchronization on the performance of the source and destination databases. We recommend that you synchronize data during off-peak hours. For example, you can synchronize data when the CPU utilization of the source and destination databases is less than 30%. During full data synchronization, DTS uses the read and write resources of the source and destination databases. This may increase the loads on the database servers.

  • DTS attempts to resume a synchronization task that failed within the last seven days. Before you switch your business to the destination instance, you must end or release the task, or revoke the write permissions of the account that DTS uses to access the destination instance. This prevents the data in the source from overwriting the data in the destination instance after the task is automatically resumed.

  • If an instance fails, DTS helpdesk will try to recover the instance within 8 hours. During the recovery process, operations such as restarting the instance and adjusting parameters may be performed.

    Note

    When parameters are adjusted, only the parameters of the DTS instance are modified. The parameters of the database are not modified. The parameters that may be modified include but are not limited to those described in Modify instance parameters.

  • When you synchronize a partitioned table, you must include both the child partitions and the parent table as synchronization objects. Otherwise, data inconsistency may occur for the partitioned table.

    Note

    The parent table of a PostgreSQL partitioned table does not directly store data. All data is actually stored in the child partitions. The synchronization task must include both the parent table and all its child partitions. Otherwise, data in the child tables may not be synchronized, leading to data inconsistency between the destination and the source.

Billing

Synchronization typeTask configuration fee
Schema synchronization and full data synchronizationFree of charge.
Incremental data synchronizationCharged. For more information, see Billing overview.

SQL operations that support incremental synchronization

Operation type

SQL operation statement

DML

INSERT, UPDATE, DELETE

DDL

  • CREATE TABLE, ALTER TABLE, DROP TABLE, RENAME TABLE, TRUNCATE TABLE

  • CREATE VIEW, ALTER VIEW, DROP VIEW

  • CREATE PROCEDURE, ALTER PROCEDURE, DROP PROCEDURE

  • CREATE FUNCTION, DROP FUNCTION

  • CREATE INDEX, DROP INDEX

Note

DDL statements are not synchronized in the following scenarios:

  • Additional information such as CASCADE and RESTRICT in DDL statements is not synchronized.

  • If a transaction contains both DML and DDL statements, the DDL statements are not synchronized.

  • If only partial DDL statements of a transaction are included in the data synchronization task, the DDL statements are not synchronized.

  • If a DDL statement is executed from a session that is created by executing the SET session_replication_role = replica statement, the DDL statement is not synchronized.

  • DDL statements that are executed by calling methods such as FUNCTION are not synchronized.

  • If no schema is defined in a DDL statement, the DDL statement is not synchronized. In this case, the public schema is specified in the SHOW search_path statement.

  • If a DDL statement contains IF NOT EXISTS, the DDL statement is not synchronized.

Permissions required for database accounts

Database

Permission requirements

Account creation and authorization method

PolarDB for PostgreSQL (Compatible with Oracle) cluster

Privileged account

Create and manage a database account

Procedure

  1. Use one of the following methods to go to the Data Synchronization page and select the region in which the data synchronization instance resides.

    DTS console

    1. Log on to the DTS console.

    2. In the left-side navigation pane, click Data Synchronization.

    3. In the upper-left corner of the page, select the region in which the data synchronization task resides.

    DMS console

    Note

    The actual operations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.

    1. Log on to the DMS console.

    2. In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.

    3. From the drop-down list to the right of Data Synchronization Tasks, select the region in which the data synchronization instance resides.

  2. Click Create Task to go to the task configuration page.

  3. Configure the source and destination databases. The following table describes the parameters.

    Note

    For more information about how to obtain the parameters of the destination Message Queue for Apache Kafka instance, see Configure parameters of a Message Queue for Apache Kafka instance.

    Category

    Configuration

    Description

    None

    Task Name

    The name of the DTS task. DTS automatically generates a task name. We recommend that you specify a descriptive name that makes it easy to identify the task. You do not need to specify a unique task name.

    Source Database

    Select Existing Connection

    • If you use a database instance that is registered with DTS, select the instance from the drop-down list. DTS automatically populates the following database parameters for the instance. For more information, see Manage database connections.

      Note

      In the DMS console, you can select the database instance from the Select a DMS database instance drop-down list.

    • If you fail to register the instance with DTS, or you do not need to use the instance that is registered with DTS, you must configure the following database information.

    Database Type

    Select PolarDB (Compatible with Oracle).

    Access Method

    Select Alibaba Cloud Instance.

    Instance Region

    Select the region where the source PolarDB for PostgreSQL (Compatible with Oracle) cluster resides.

    Replicate Data Across Alibaba Cloud Accounts

    In this example, a database of the current Alibaba Cloud account is used. Select No.

    Instance ID

    Select the ID of the source PolarDB for PostgreSQL (Compatible with Oracle) cluster.

    Database Name

    Enter the name of the database that contains the objects to be synchronized in the source PolarDB for PostgreSQL (Compatible with Oracle) cluster.

    Database Account

    Enter the database account of the source PolarDB for PostgreSQL (Compatible with Oracle) cluster. For information about the required permissions, see Permissions required for database accounts.

    Database Password

    The password that is used to access the database.

    Destination Database

    Select Existing Connection

    • If you use a database instance that is registered with DTS, select the instance from the drop-down list. DTS automatically populates the following database parameters for the instance. For more information, see Manage database connections.

      Note

      In the DMS console, you can select the database instance from the Select a DMS database instance drop-down list.

    • If you fail to register the instance with DTS, or you do not need to use the instance that is registered with DTS, you must configure the following database information.

    Database Type

    Select Kafka.

    Access Method

    Select Express Connect, VPN Gateway, or Smart Access Gateway.

    Note

    Here, the Message Queue for Apache Kafka instance is configured as a self-managed Kafka database for data synchronization.

    Instance Region

    Select the region where the destination Message Queue for Apache Kafka instance resides.

    Connected VPC

    Select the ID of the VPC to which the destination Message Queue for Apache Kafka instance belongs.

    Domain Name or IP

    Enter any IP address from the Default Endpoint of the destination Message Queue for Apache Kafka instance.

    Port Number

    Enter the service port of the destination Message Queue for Apache Kafka instance. The default value is 9092.

    Database Account

    You do not need to fill in this parameter for this example.

    Database Password

    Kafka Version

    Select the version that corresponds to your Kafka instance version.

    Encryption

    Select Non-encrypted or SCRAM-SHA-256 based on your business and security requirements.

    Topic

    Select the topic for receiving data from the drop-down list.

    Use Kafka Schema Registry

    Kafka Schema Registry is a service layer for metadata. It provides a RESTful interface to store and retrieve Avro schemas.

    • No: Do not use Kafka Schema Registry.

    • Yes: Use Kafka Schema Registry. You need to enter the URL or IP address registered for the Avro schema in Kafka Schema Registry in the URL or IP Address of Schema Registry text box.

  4. Click Test Connectivity and Proceed in the lower part of the page.

    Note
    • Make sure that the CIDR blocks of DTS servers can be automatically or manually added to the security settings of the source and destination databases to allow access from DTS servers. For more information, see Add DTS server IP addresses to a whitelist.

    • If the source or destination database is a self-managed database and its Access Method is not set to Alibaba Cloud Instance, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.

  5. Configure the objects to be synchronized.

    1. In the Configure Objects step, configure the objects that you want to synchronize.

      Configuration

      Description

      Synchronization Types

      The synchronization types. By default, Incremental Data Synchronization is selected. You must also select Schema Synchronization and Full Data Synchronization. After the precheck is complete, DTS synchronizes the historical data of the selected objects from the source database to the destination cluster. The historical data is the basis for subsequent incremental synchronization.

      Note

      If the Access Method for the destination Kafka instance is Alibaba Cloud Instance, Schema Synchronization is not supported.

      Processing Mode of Conflicting Tables

      • Precheck and Report Errors: checks whether the destination database contains tables that have the same names as tables in the source database. If the source and destination databases do not contain tables that have identical table names, the precheck is passed. Otherwise, an error is returned during the precheck, and the data synchronization task cannot be started.

        Note

        If the source and destination databases contain tables with identical names and the tables in the destination database cannot be deleted or renamed, you can use the object name mapping feature to rename the tables that are synchronized to the destination database. For more information, see Map object names.

      • Ignore Errors and Proceed: skips the precheck for identical table names in the source and destination databases.

        Warning

        If you select Ignore Errors and Proceed, data inconsistency may occur and your business may be exposed to potential risks.

        • If the source and destination databases have the same schema and a data record in the destination database has the same primary key value or unique key value as a data record in the source database:

          • During full data synchronization, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.

          • During incremental data synchronization, DTS synchronizes the data record to the destination database. The existing data record in the destination database is overwritten.

        • If the source and destination databases have different schemas, data may fail to be initialized. In this case, only some columns are synchronized, or the data synchronization instance fails. Proceed with caution.

      Data Format in Kafka

      Select the desired format for storing data in the destination Kafka instance.

      • If you select Canal JSON, see Canal JSON description for the parameter descriptions and examples.

        Note

        Currently, only the China (Qingdao) and China (Beijing) regions support selecting Canal JSON.

      • If you select DTS Avro, data is parsed based on the schema definition of DTS Avro. For more information, see GitHub.

      • If you select SharePlex JSON, see SharePlex JSON for parameter descriptions and examples.

      Kafka Data Compression Format

      The compression format for Kafka compressed data. Select a compression format based on your business requirements. Valid values:

      • LZ4 (default): low compression ratio and high compression speed.

      • GZIP: high compression ratio and low compression speed.

        Note

        GZIP compression consumes a large quantity of CPU resources.

      • Snappy: medium compression ratio and medium compression speed.

      Policy for Shipping Data to Kafka Partitions

      Select a policy.

      Message acknowledgement mechanism

      Select the desired message acknowledgment mechanism.

      Topic That Stores DDL Information

      Select a topic from the drop-down list to store DDL information.

      Note

      If you do not select a topic, DDL information is stored in the data topic by default.

      Capitalization of Object Names in Destination Instance

      The capitalization of database names, table names, and column names in the destination instance. By default, DTS default policy is selected. You can select other options to ensure that the capitalization of object names is consistent with that in the source or destination database. For more information, see Specify the capitalization of object names in the destination instance.

      Source Objects

      Select one or more objects from the Source Objects section and click the 向右 icon to add the objects to the Selected Objects section.

      Note

      The granularity for selecting synchronization objects is at the table level.

      Selected Objects

      No extra configuration is required for this example. You can use the object name mapping feature to set the topic name, number of partitions, and partition key for the source table in the destination Kafka instance. For more information, see Mapping information.

      Note
      • If you use the object name mapping feature, other objects that depend on this object may fail to synchronize.

      • To select SQL operations for incremental synchronization, right-click the object to be synchronized in the Selected Objects section and select the required SQL operations from the dialog box that appears.

    2. Click Next: Advanced Settings to configure advanced settings.

      Configuration

      Description

      Dedicated Cluster for Task Scheduling

      By default, DTS schedules the task to the shared cluster if you do not specify a dedicated cluster. If you want to improve the stability of data synchronization instances, purchase a dedicated cluster. For more information, see What is a DTS dedicated cluster.

      Retry Time for Failed Connections

      The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set this parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

      Note
      • If you specify different retry time ranges for multiple data synchronization tasks that have the same source or destination database, the shortest retry time range takes precedence.

      • When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.

      Retry Time for Other Issues

      The retry time range for other issues. For example, if the DDL or DML operations fail to be performed after the data synchronization task is started, DTS immediately retries the operations within the time range. Valid values: 1 to 1440. Unit: minutes. Default value: 10. We recommend that you set this parameter to a value greater than 10. If the failed operations are successfully performed within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

      Important

      The value of the Retry Time for Other Issues parameter must be smaller than the value of the Retry Time for Failed Connections parameter.

      Enable Throttling for Full Data Synchronization

      During full data synchronization, DTS uses the read and write resources of the source and destination databases. This may increase the load on the database servers. You can configure the Queries per second (QPS) to the source database, RPS of Full Data Migration, and Data migration speed for full migration (MB/s) parameters for full data synchronization tasks to reduce the load on the destination database server.

      Note

      You can configure this parameter only if Full Data Synchronization is selected for the Synchronization Types parameter.

      Enable Throttling for Incremental Data Synchronization

      Specifies whether to enable throttling for incremental data synchronization. You can enable throttling for incremental data synchronization based on your business requirements. To configure throttling, you must configure the RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s) parameters. This reduces the load on the destination database server.

      Environment Tag

      You can select an environment tag to identify the instance if needed. You do not need to select one for this example.

      Configure ETL

      Specifies whether to enable the extract, transform, and load (ETL) feature. For more information, see What is ETL? Valid values:

      Monitoring and Alerting

      Specifies whether to configure alerting for the data synchronization instance. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:

  6. Save the task settings and run a precheck.

    • To view the parameters to be specified when you call the relevant API operation to configure the DTS task, move the pointer over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.

    • If you do not need to view or have viewed the parameters, click Next: Save Task Settings and Precheck in the lower part of the page.

    Note
    • Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.

    • If the data synchronization task fails the precheck, click View Details next to each failed item. After you analyze the causes based on the check results, troubleshoot the issues. Then, rerun the precheck.

    • If an alert is triggered for an item during the precheck:

      • If an alert item cannot be ignored, click View Details next to the failed item and troubleshoot the issue. Then, run a precheck again.

      • If an alert item can be ignored, click Confirm Alert Details. In the View Details dialog box, click Ignore. In the message that appears, click OK. Then, click Precheck Again to run a precheck again. If you ignore the alert item, data inconsistency may occur, and your business may be exposed to potential risks.

  7. Purchase the instance.

    1. Wait until the Success Rate becomes 100%. Then, click Next: Purchase Instance.

    2. On the buy page, configure the Billing Method and Instance Class parameters for the data synchronization task. The following table describes the parameters.

      Section

      Parameter

      Description

      New Instance Class

      Billing Method

      • Subscription: You pay for a subscription when you create a data synchronization instance. The subscription billing method is more cost-effective than the pay-as-you-go billing method for long-term use.

      • Pay-as-you-go: A pay-as-you-go instance is billed on an hourly basis. The pay-as-you-go billing method is suitable for short-term use. If you no longer require a pay-as-you-go data synchronization instance, you can release the instance to reduce costs.

      Resource Group Settings

      The resource group to which the data synchronization instance belongs. Default value: default resource group. For more information, see What is Resource Management?

      Instance Class

      DTS provides instance classes that vary in synchronization speed. You can select an instance class based on your business requirements. For more information, see Instance classes of data synchronization instances.

      Subscription Duration

      If you select the subscription billing method, specify the subscription duration and the number of data synchronization instances that you want to create. The subscription duration can be one to nine months, one year, two years, three years, or five years.

      Note

      This parameter is available only if you select the Subscription billing method.

    3. Read and select Data Transmission Service (Pay-as-you-go) Service Terms.

    4. Click Buy and Start. In the dialog box that appears, click OK.

      You can view the progress of the task in the task list.

Mapping information

  1. In the Selected Objects area, move the mouse pointer over the destination topic name (at the table level).

  2. Click Edit that appears after the destination topic name.

  3. In the Edit Table dialog box that appears, configure the mapping information.

    Note
    • At the schema level, the dialog box is named Edit Schema and supports fewer parameters. At the table level, the dialog box is named Edit Table.

    • If the granularity of the synchronization objects is not the entire schema, you cannot modify the Name of target Topic and Number of Partitions parameters in the Edit Schema dialog box.

    Configuration

    Description

    Name of target Topic

    The name of the destination topic to which the source table is synchronized. By default, this is the Topic selected in the Destination Database section during the Configurations for Source and Destination Databases phase.

    Important
    • If the destination database is a Message Queue for Apache Kafka instance, the topic name you enter must exist in the destination Kafka instance. Otherwise, the data synchronization will fail. If the destination database is a self-managed Kafka database and the synchronization instance includes a schema synchronization task, DTS will try to create the topic you enter in the destination database.

    • If you modify the Name of target Topic, the data will be written to the topic you enter.

    Filter Conditions

    For more information, see Set filter conditions.

    Number of Partitions

    The number of partitions for writing data to the destination topic.

    Partition Key

    When the Policy for Shipping Data to Kafka Partitions is set to Ship Data to Separate Partitions Based on Hash Values of Primary Keys, you need to configure this parameter. Specify one or more columns as the partition key to calculate hash values. DTS will deliver different rows to each partition of the destination topic based on the calculated hash values. Otherwise, this delivery policy will not take effect during the incremental write phase.

    Note

    You can select Partition Key only in the Edit Table dialog box.

  4. Click OK.

FAQ

  • Can I modify the Kafka Data Compression Format?

    Yes. For more information, see Modify the objects to be synchronized.

  • Can I modify the Message acknowledgement mechanism?

    Yes. For more information, see Modify the objects to be synchronized.