Use Data Transmission Service (DTS) to stream change data from a PolarDB for PostgreSQL (Compatible with Oracle) cluster into Alibaba Cloud Message Queue for Apache Kafka in real time.
Prerequisites
Before you begin, make sure you have:
Set the
wal_levelparameter of the source cluster tological. This enables logical decoding information in the write-ahead log (WAL). See Configure cluster parameters.Created a destination Alibaba Cloud Message Queue for Apache Kafka instance with storage space larger than the source cluster's current usage.
Created at least one topic in the Kafka instance to receive synchronized data. See Step 1: Create a topic.
A privileged account on the source PolarDB for PostgreSQL (Compatible with Oracle) cluster. See Create and manage database accounts.
For supported source and destination database versions, see Overview of synchronization scenarios.
Limitations
Source database
Bandwidth: The source server must have enough outbound bandwidth. Insufficient bandwidth slows down synchronization.
Tables without a primary key or UNIQUE constraint: Enable the Exactly-Once write feature when configuring the task. Without it, duplicate records may appear in the destination. See Synchronize tables without a primary key or UNIQUE constraint.
More than 1,000 tables in a single task: Split into multiple tasks, or configure the task to synchronize the entire database. Exceeding this limit may cause a request error after submission.
WAL retention:
Incremental synchronization only: retain WAL logs for more than 24 hours.
Full + incremental synchronization: retain WAL logs for at least 7 days. After full synchronization completes, you can reduce the retention period to more than 24 hours.
If WAL logs are purged before DTS can read them, the task fails. In extreme cases, data loss or inconsistency may occur. Issues from insufficient WAL retention are not covered by the Service-Level Agreement (SLA).
Long-running transactions: WAL generated before long-running transactions are committed may accumulate and exhaust disk space on the source.
DDL during schema or full synchronization: Do not change database or table structure while schema synchronization or full data synchronization is running. The task will fail.
Writes during full-only synchronization: Do not write to the source while running full data synchronization only. Use schema synchronization, full data synchronization, and incremental data synchronization together to maintain real-time consistency.
Logical Replication Slot Failover: The cluster must support and have Logical Replication Slot Failover enabled. If the cluster does not support it (for example, when Database Engine is Oracle syntax compatible 2.0), a high-availability (HA) switchover may cause the synchronization instance to fail unrecoverably.
Large incremental records: If a single incremental data record exceeds 256 MB, the synchronization instance may fail unrecoverably. You must reconfigure the synchronization instance.
Other limitations
A single task can synchronize only one database. Configure separate tasks for multiple databases.
DTS does not support TimescaleDB extension tables, tables with cross-schema inheritance, or tables with unique indexes based on expressions.
Schemas created by installing plugins cannot be synchronized and do not appear in the console during task configuration.
DTS does not support synchronizing INDEX, PARTITION, VIEW, PROCEDURE, FUNCTION, TRIGGER, or foreign key (FK) objects.
DTS creates a replication slot with the
dts_sync_prefix in the source database. This slot allows DTS to fetch incremental logs from the last 15 minutes. When synchronization fails or the synchronization instance is released, DTS attempts to clean up the slot automatically.- If you change the source database account password or remove DTS IP addresses from the whitelist during synchronization, the replication slot cannot be cleaned up automatically. Clear it manually to prevent the slot from consuming all available disk space and making the source database unavailable. - If a failover occurs on the source, log in to the secondary node to clear the slot manually.

If the destination Kafka instance is scaled out or in during synchronization, restart the synchronization instance.
In the following three scenarios, run
ALTER TABLE schema.table REPLICA IDENTITY FULL;on the tables to synchronize before writing data to them. Do not lock the tables while running this command to avoid deadlocks. If you skip related precheck items, DTS runs this command automatically during initialization.When the synchronization instance runs for the first time.
When the synchronization granularity is Schema, and a new table is created in the schema or an existing table is rebuilt using the
RENAMEcommand.When you modify synchronization objects.
- Replace
schemaandtablewith the actual schema name and table name. - Run this command during off-peak hours.DTS creates the following temporary tables in the source database to track DDL statements, incremental table structure, and heartbeat information. Do not delete them during synchronization—they are removed automatically when the DTS instance is released:
public.dts_pg_class,public.dts_pg_attribute,public.dts_pg_type,public.dts_pg_enum,public.dts_postgres_heartbeat,public.dts_ddl_command,public.dts_args_session, andpublic.aliyun_dts_instance.DTS adds a heartbeat table named
dts_postgres_heartbeatto the source database to track incremental synchronization latency accurately.Run synchronization during off-peak hours (for example, when CPU usage on both databases is below 30%). Full data synchronization consumes read and write resources on both ends and can increase database load.
DTS automatically retries failed tasks for up to seven days. Before switching your business to the destination, end or release the task, or revoke the write permissions of the DTS account on the destination. Otherwise, the source data may overwrite destination data after an automatic recovery.
If a task fails, DTS technical support will attempt recovery within 8 hours. Recovery may involve restarting the task or adjusting DTS task parameters (database parameters are not changed). For adjustable parameters, see Modify instance parameters.
When synchronizing partitioned tables, include both the parent table and all child partitions as synchronization objects. The parent table does not store data directly—all data lives in child partitions. Omitting any partition may cause data inconsistency.
Billing
| Synchronization type | Pricing |
|---|---|
| Schema synchronization and full data synchronization | Free |
| Incremental data synchronization | Charged. See Billing overview. |
SQL operations supported for incremental synchronization
| Operation type | SQL statements |
|---|---|
| DML | INSERT, UPDATE, DELETE |
| DDL | CREATE TABLE, ALTER TABLE, DROP TABLE, RENAME TABLE, TRUNCATE TABLE; CREATE VIEW, ALTER VIEW, DROP VIEW; CREATE PROCEDURE, ALTER PROCEDURE, DROP PROCEDURE; CREATE FUNCTION, DROP FUNCTION; CREATE INDEX, DROP INDEX |
DDL statements are not synchronized in the following cases:
The statement includesCASCADE,RESTRICT, or other additional clauses.
A transaction contains both DML and DDL statements.
Only some DDL statements from a transaction are included in the task.
The DDL is executed from a session created with SET session_replication_role = replica.The DDL is executed by calling a FUNCTION.
No schema is defined in the DDL statement (the public schema is inferred from SHOW search_path).The DDL statement contains IF NOT EXISTS.Create a synchronization task
The task configuration follows these steps: open the task list, configure source and destination connections, select synchronization objects, configure advanced settings, run the precheck, and purchase the instance.
Step 1: Open the task list
Go to the data synchronization task list. Use either the DTS console or the DMS console.
DTS console
DTS console
Log on to the DTS console.DTS console
In the left navigation pane, click Data Synchronization.
In the upper-left corner, select the region where the synchronization instance is located.
DMS console
Steps may vary depending on the DMS console mode and layout. See Simple mode console and Customize the layout and style of the DMS console.
Log on to the DMS console.DMS console
In the top menu bar, choose Data + AI > DTS (DTS) > Data Synchronization.
To the right of Data Synchronization Tasks, select the region of the synchronization instance.
Step 2: Configure source and destination databases
Click Create Task, then configure the source and destination connections.
For Kafka connection parameters, see Configure parameters for a Message Queue for Apache Kafka instance.
General
| Parameter | Description |
|---|---|
| Task Name | DTS generates a name automatically. Specify a descriptive name for easy identification. The name does not need to be unique. |
Source database
| Parameter | Description |
|---|---|
| Select Existing Connection | Select a registered database instance from the drop-down list to auto-fill the connection details. If you haven't registered the instance, configure the parameters below manually. (In the DMS console, this field is labeled Select a DMS database instance.) |
| Database Type | Select PolarDB (Compatible with Oracle). |
| Access Method | Select Alibaba Cloud Instance. |
| Instance Region | Select the region of the source cluster. |
| Replicate Data Across Alibaba Cloud Accounts | Select No if the source is in the same Alibaba Cloud account. |
| Instance ID | Select the ID of the source cluster. |
| Database Name | Enter the name of the database containing the objects to synchronize. |
| Database Account | Enter the privileged account for the source cluster. |
| Database Password | Enter the password for the account. |
Destination database
| Parameter | Description |
|---|---|
| Select Existing Connection | Select a registered database instance from the drop-down list to auto-fill the connection details. If you haven't registered the instance, configure the parameters below manually. (In the DMS console, this field is labeled Select a DMS database instance.) |
| Database Type | Select Kafka. |
| Access Method | Select Express Connect, VPN Gateway, or Smart Access Gateway. The Alibaba Cloud Message Queue for Apache Kafka instance is configured as a self-managed Kafka database for synchronization. |
| Instance Region | Select the region of the destination Kafka instance. |
| Connected VPC | Select the VPC ID of the destination Kafka instance. |
| Domain Name or IP | Enter any IP address from the Default Endpoint of the destination Kafka instance. |
| Port Number | Enter the service port. The default is 9092. |
| Database Account | Leave blank for this example. |
| Database Password | Leave blank for this example. |
| Kafka Version | Select the version that matches your Kafka instance. |
| Encryption | Select Non-encrypted or SCRAM-SHA-256 based on your security requirements. |
| Topic | Select the topic that receives synchronized data. |
| Use Kafka Schema Registry | Kafka Schema Registry is a metadata service that provides a RESTful interface for storing and retrieving Avro schemas. Select No to skip, or Yesalert notifications to enter the URL or IP address of your Schema Registry in the URL or IP Address of Schema Registry field. |
After completing the configuration, click Test Connectivity and Proceed.
Add the CIDR blocks of DTS servers to the security settings of both databases to allow access. See Add the IP address whitelist of DTS servers.
If the source or destination uses an access method other than Alibaba Cloud Instance, also click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.
Step 3: Select synchronization objects
On the Configure Objects page, configure the synchronization scope.
Synchronization types and conflict handling
| Parameter | Description |
|---|---|
| Synchronization Types | DTS always enables Incremental Data Synchronization. By default, Schema Synchronization and Full Data Synchronization are also selected. After the precheck, DTS initializes the destination with the full data of selected source objects as the baseline for incremental synchronization. Note When the destination Kafka instance's Access Method is Alibaba Cloud Instance, Schema Synchronization is not supported. |
| Processing Mode of Conflicting Tables | Precheck and Report Errors: DTS checks for tables with identical names in the destination. If found, the precheck fails and the task does not start. Warning If you cannot delete or rename the conflicting table, use object name mapping instead. See Database Table Column Name Mapping. Ignore Errors and Proceed: DTS skips the conflict check. During full synchronization, conflicting destination records are kept and source records are skipped. During incremental synchronization, source records overwrite destination records. If table schemas differ, initialization may fail. Use with caution. |
Kafka-specific settings
The following table compares the three Kafka data formats to help you choose the right one:
| Format | Region support | Reference |
|---|---|---|
| Canal JSON | China (Qingdao) and China (Beijing) only | Canal JSON description |
| DTS Avro | — | DTS Avro schema, Deserialization sample |
| Shareplex JSON | — | Shareplex JSON |
The following table compares the three compression formats:
| Format | Compression ratio | Speed |
|---|---|---|
| LZ4 (default) | Low | Fast |
| GZIP | High | Slow. This format consumes more CPU resources. |
| Snappy | Medium | Medium |
| Parameter | Description |
|---|---|
| Data Format in Kafka | Select the data format for messages written to Kafka. See the comparison table above. |
| Kafka Data Compression Format | Select the compression format for Kafka messages. See the comparison table above. |
| Policy for shipping data to Kafka partitions | Select a partitioning strategy. |
| Message acknowledgement mechanism | Select a message acknowledgment mechanism. |
| Topic That Stores DDL Information | Select a topic to store DDL change events. If not selected, DDL events are stored in the data topic. |
| Capitalization of Object Names in Destination Instance | Configure the case policy for database, table, and column names in the destination. The default is DTS default policy. See Case policy for destination object names. |
Object selection
In the Source Objects box, click the objects to synchronize, then click
to move them to the Selected Objects box. The synchronization granularity is table.
In Selected Objects, you can configure topic name, partition count, and partition key for each source table. See Map topics and partitions.
Using the object name mapping feature may cause synchronization of dependent objects to fail.
To select specific SQL operations for incremental synchronization, right-click the object in Selected Objects and choose the operations in the dialog box.
Step 4: Configure advanced settings
Click Next: Advanced Settings and configure the following options.
| Parameter | Description |
|---|---|
| Dedicated Cluster for Task Scheduling | By default, DTS uses a shared cluster. For greater stability, purchase a dedicated cluster. See What is a DTS dedicated cluster? |
| Retry Time for Failed Connections | If the connection to the source or destination fails, DTS retries immediately. The default retry window is 720 minutes. Set a value between 10 and 1,440 minutes; 30 minutes or more is recommended. If the connection is restored within the retry window, the task resumes automatically. Note If multiple DTS instances share a source or destination, DTS uses the shortest configured retry duration across those instances. DTS charges for runtime during connection retries. |
| Retry Time for Other Issues | For non-connection errors (such as DDL or DML execution failures), DTS retries immediately. The default is 10 minutes. Set a value between 1 and 1,440 minutes; 10 minutes or more is recommended. The value must be less than Retry Time for Failed Connections. |
| Enable Throttling for Full Data Synchronization | Limit the full synchronization rate to reduce load on source and destination databases. Set Queries per second (QPS) to the source database, RPS of Full Data Migration, and Data migration speed for full migration (MB/s). Available only when Full Data Synchronization is enabled. You can also adjust the throttling rate while the task is running. |
| Enable Throttling for Incremental Data Synchronization | Limit the incremental synchronization rate by setting RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s). |
| Environment Tag | (Optional) Select an environment label to classify the instance. |
| Configure ETL | Choose whether to enable extract, transform, and load (ETL). Select Yes to enter data processing statements in the code editor. See Configure ETL in a data migration or data synchronization task. Select No to disable ETL. |
| Monitoring and Alerting | Select Yes to receive alerts when the task fails or synchronization latency exceeds the threshold. Set the alert threshold and notification contacts. See Configure monitoring and alerting during task configuration. Select No to skip alerts. |
Step 5: Run the precheck
Click Next: Save Task Settings and Precheck.
To view the API parameters for this task, hover over the button and click Preview OpenAPI parameters before proceeding.
DTS runs a precheck before the task starts. The task only starts if the precheck passes.
If the precheck fails, click View Details next to the failed item, fix the issue, then rerun the precheck.
If the precheck generates warnings:
For a warning that cannot be ignored, click View Details, fix the issue, then run the precheck again.
For an ignorable warning, click Confirm Alert Details, then Ignore, then OK, then Precheck Again. Ignoring warnings may cause data inconsistency and other issues—proceed with caution.
Step 6: Purchase the instance
When the Success Rate reaches 100%, click Next: Purchase Instance.
On the Purchase page, select the billing method and instance class.
| Parameter | Description |
|---|---|
| Billing Method | Subscription: Pay upfront for a fixed duration. Cost-effective for long-term, continuous tasks. Pay-as-you-go: Billed hourly for actual usage. Suitable for short-term or test tasks—release the instance at any time to stop charges. |
| Resource Group Settings | The resource group for the instance. Defaults to default resource group. See What is Resource Management? |
| Instance Class | Select a class based on your synchronization throughput requirements. See Data synchronization link specifications. |
| Subscription Duration | For subscription billing, select the duration: 1–9 months or 1, 2, 3, or 5 years. |
Accept the Data Transmission Service (Pay-as-you-go) Service Terms.
Click Buy and Start, then confirm by clicking OK.
Monitor the task on the data synchronization page.
Map topics and partitions
Use topic and partition mapping to control how source tables are written to destination Kafka topics.
In Selected Objects, hover over the destination topic name at the table level.
Click Edit next to the topic name.
In the Edit Table dialog box, configure the mapping.
At the schema level, the dialog box is Edit Schema, which supports fewer parameters. At the table level, it is Edit Table.
If the synchronization granularity is not the entire schema, Name of target Topic and Number of Partitions cannot be modified in Edit Schema.
| Parameter | Description |
|---|---|
| Name of target Topic | The destination topic for the source table. Defaults to the topic selected in Destination Database. For Alibaba Cloud Message Queue for Apache Kafka, the topic must already exist—otherwise synchronization fails. For self-managed Kafka with schema and table tasks, DTS attempts to create the topic. If you change this value, data is written to the new topic. |
| Filter Conditions | See Set filter conditions. |
| Number of Partitions | The number of partitions for the destination topic. |
| Partition Key | Applies when Policy for Shipping Data to Kafka Partitions is set to Ship Data to Separate Partitions Based on Hash Values of Primary Keys. Select one or more columns as the partition key for hash calculation. DTS routes rows to partitions based on the hash value. Without this setting, the partition strategy has no effect during incremental writes. Partition Key can only be configured in Edit Table. |
Click OK.
FAQ
Can I change the Kafka Data Compression Format after the task starts?
Yes. Use the Modify Sync Objects feature.
Can I change the Message acknowledgement mechanism after the task starts?
Yes. Use the Modify Sync Objects feature.