Use Data Transmission Service (DTS) to stream data changes from a PolarDB for MySQL cluster into a Message Queue for Apache Kafka topic in real time. DTS captures the initial full dataset and all subsequent incremental changes, keeping your Kafka consumers current with the source data.
Prerequisites
Before you begin, ensure that you have:
-
A PolarDB for MySQL cluster. See Purchase a pay-as-you-go cluster or Purchase a subscription cluster
-
A Message Queue for Apache Kafka instance with a topic created to receive synchronized data. See Step 1: Create a topic
-
Read permissions on the objects to be synchronized granted to the database account used by DTS
Limitations
Source database requirements
-
Tables must have a PRIMARY KEY or UNIQUE constraint with all fields unique. Without this, the destination may contain duplicate records.
-
If you rename tables or columns in the destination database, a single task can synchronize up to 1,000 tables. For more tables, configure multiple tasks or synchronize the entire database instead.
-
If you need to synchronize incremental data, binary logging must be enabled and the
loose_polar_log_binparameter must be set to on. See Enable binary logging and Modify parameters.
Enabling binary logging on a PolarDB for MySQL cluster incurs storage charges for the binary log files.
-
Read-only nodes of the source cluster cannot be included in a synchronization task.
-
DTS periodically executes
CREATE DATABASE IF NOT EXISTS \test\`` on the source database to advance the binary log position. This is expected behavior.
Binary log retention
Insufficient binary log retention causes DTS to lose its position, which can interrupt the task and cause data inconsistency.
| Task type | Minimum retention period |
|---|---|
| Incremental data synchronization only | 24 hours |
| Full + incremental data synchronization | 7 days |
After full data synchronization is complete, you can set the retention period to more than 24 hours. Make sure that you set the retention period of binary logs based on the preceding requirements. Otherwise, the SLA of DTS does not guarantee service reliability or performance.
Single-record size limit
Kafka rejects records larger than 10 MB. If a source row exceeds this limit, DTS cannot write it and the task is interrupted.
To handle tables with large fields, use either of the following approaches:
-
Exclude those tables from the task objects entirely
-
Include the tables but add filter conditions to exclude the large-field columns
If you have already added a table with large fields, remove it from the selected objects, add it again, and then set the filter conditions.
DDL operations during synchronization
Do not run pt-online-schema-change on source tables while a task is running — this can cause synchronization to fail.
We recommend that you use only DTS to write data to the destination database. This prevents data inconsistency between the source and destination databases. If DTS is the only writer to the destination database, you can use Data Management (DMS) to perform lock-free DDL operations on the source tables. If you use tools other than DTS to write data to the destination database, data loss may occur in the destination database when you use DMS to perform online DDL operations.
Supported synchronization topologies
-
One-to-one
-
One-to-many
-
Many-to-one
-
Cascade
For details, see Synchronization topologies.
SQL operations that can be synchronized
| Type | Statements |
|---|---|
| DML | INSERT, UPDATE, DELETE |
| DDL | CREATE TABLE, ALTER TABLE, DROP TABLE, RENAME TABLE, TRUNCATE TABLE |
| DDL | CREATE VIEW, ALTER VIEW, DROP VIEW |
| DDL | CREATE PROCEDURE, ALTER PROCEDURE, DROP PROCEDURE |
| DDL | CREATE FUNCTION, DROP FUNCTION, CREATE TRIGGER, DROP TRIGGER |
| DDL | CREATE INDEX, DROP INDEX |
Create a synchronization task
Step 1: Open the Data Synchronization Tasks page
-
Log on to the DMS console.
-
In the top navigation bar, click DTS.
-
In the left-side navigation pane, choose DTS (DTS) > Data Synchronization.
You can also go directly to the Data Synchronization Tasks page in the new DTS console. Console navigation may vary based on your layout mode. See Simple mode for details.
Step 2: Select a region
From the drop-down list next to Data Synchronization Tasks, select the region where the synchronization instance will reside.
In the new DTS console, select the region in the top navigation bar instead.
Step 3: Configure source and destination databases
Click Create Task. Review the limits displayed at the top of the page before proceeding, then configure the source and destination connections.
Source Database
| Parameter | Description |
|---|---|
| Task Name | Auto-generated. Specify a descriptive name to identify the task — uniqueness is not required. |
| Select Instance | Select an existing instance to reuse its settings, or configure a new connection. |
| Database Type | Select PolarDB for MySQL. |
| Access Method | Select Alibaba Cloud Instance. |
| Instance Region | The region of the source PolarDB for MySQL cluster. |
| Replicate Data Across Alibaba Cloud Accounts | Select No for same-account synchronization. |
| PolarDB Cluster ID | The ID of the source cluster. |
| Database Account | The account with read permissions on the objects to be synchronized. |
| Database Password | The password for the database account. |
| Encryption | Select Non-encrypted or SSL-encrypted based on your security requirements. To use SSL, enable SSL encryption on the cluster first. |
Destination Database
| Parameter | Description |
|---|---|
| Select Instance | Select an existing instance to reuse its settings, or configure a new connection. |
| Database Type | Select Kafka. |
| Access Method | Select Express Connect, VPN Gateway, or Smart Access Gateway. |
| Instance Region | The region of the destination Kafka instance. |
| Connected VPC | The virtual private cloud (VPC) ID of the Kafka instance. To find it: go to the Message Queue for Apache Kafka console, open the instance details page, and look in the Configuration Information section. |
| IP Address | An IP address from the Default Endpoint of the Kafka instance. To find it: on the instance details page, look in the Endpoint Information section. |
| Port Number | The service port of the Kafka instance. Default: 9092. |
| Database Account | The Kafka account. Leave blank if the instance type is VPC Instance. |
| Database Password | The Kafka account password. Leave blank if the instance type is VPC Instance. |
| Kafka Version | The version of the destination Kafka instance. |
| Encryption | Select Non-encrypted or SCRAM-SHA-256. |
| Topic | The topic that receives the synchronized data. Select from the drop-down list. |
| Topic That Stores DDL Information | The topic for storing DDL event messages. If left blank, DDL information is stored in the same topic as the data. |
| Use Kafka Schema Registry | Select Yes to use Kafka Schema Registry for Avro schema storage. If selected, enter the Schema Registry URL or IP address. Select No to skip. |
Message Queue for Apache Kafka cannot be selected as an instance type directly. Connect to it as a self-managed Kafka endpoint via Express Connect, VPN Gateway, or Smart Access Gateway.
Step 4: Test connectivity
If the destination uses a whitelist, add the CIDR blocks of DTS servers to it first. See Add DTS server CIDR blocks to on-premises database security settings.
Click Test Connectivity and Proceed.
Step 5: Select objects and configure settings
| Parameter | Description |
|---|---|
| Task Stages | Incremental Data Synchronization is selected by default. Also select Schema Synchronization and Full Data Synchronization to capture the initial dataset before streaming changes. |
| Processing Mode of Conflicting Tables | Precheck and Report Errors: fails the precheck if the destination has tables with the same name as the source — use this to avoid unexpected overwrites. Ignore Errors and Proceed: skips the conflict check. If the source and destination databases have the same schemas and a data record has the same primary key value as an existing record in the destination: during full data synchronization, DTS does not synchronize the record and the existing destination record is retained; during incremental data synchronization, DTS synchronizes the record and the existing destination record is overwritten. If schemas differ, data may fail to be initialized — only some columns are synchronized or the task fails. |
| Data Format in Kafka | DTS Avro: data is serialized using the DTS Avro schema. See the schema definition on GitHub. Canal JSON: data is stored in Canal JSON format. See the Canal JSON format reference. |
| Policy for shipping data to Kafka partitions | Select a partition routing policy based on your ordering and throughput requirements. See Specify the policy for migrating data to Kafka partitions. |
| Capitalization of object names in destination instance | Controls the case of database, table, and column names in Kafka messages. Defaults to the DTS default policy. See Specify the capitalization of object names. |
| Source Objects | Select tables from the Source Objects panel and click the arrow icon to move them to Selected Objects. Only tables can be selected. |
| Selected Objects | Right-click an object to rename it or filter specific SQL operations. Click Batch Edit to rename multiple objects at once. To filter rows, right-click an object and specify WHERE conditions. |
Step 6: Configure advanced settings
Click Next: Advanced Settings.
| Parameter | Description |
|---|---|
| Set Alerts | Select Yes to receive notifications when the task fails or synchronization latency exceeds the threshold. Specify the threshold and alert contacts. |
| Retry Time for Failed Connection | How long DTS retries failed connections before marking the task as failed. Range: 10–1,440 minutes. Default: 720 minutes. Set this to at least 30 minutes. If multiple tasks share the same source or destination, the shortest retry time across those tasks applies. |
| Configure ETL | Select Yes to transform data during synchronization. Enter processing statements in the code editor. See Configure ETL. |
| Whether to delete SQL operations on heartbeat tables of forward and reverse tasks | Yes: DTS does not write heartbeat table operations to the source. Synchronization latency may be displayed. No: DTS writes heartbeat table operations to the source. This may affect physical backup and cloning of the source database. |
Step 7: Run the precheck
Click Next: Save Task Settings and Precheck.
DTS runs a precheck before the task can start. For each failed item:
-
Click View Details to see the cause.
-
Fix the issue and click Precheck Again.
For alert items (non-blocking):
-
Fix the issue if possible.
-
To skip: click Confirm Alert Details next to the item, click Ignore, click OK, then click Precheck Again.
Ignoring alert items may cause data inconsistency and expose your business to risk.
Step 8: Purchase an instance
Wait until the precheck success rate reaches 100%, then click Next: Purchase Instance.
Select an Instance Class based on your required synchronization throughput. See Specifications of data synchronization instances.
Step 9: Start the task
-
Read and select the checkbox for Data Transmission Service (Pay-as-you-go) Service Terms.
-
Click Buy and Start.
The task appears in the task list. Monitor its progress and synchronization latency from there.
Usage notes
-
Before you synchronize data, evaluate the impact on the performance of the source and destination databases. Run synchronization during off-peak hours to reduce this impact. During initial full data synchronization, DTS uses the read and write resources of the source and destination databases, which may increase the load on the database servers.
-
During full data synchronization, DTS performs concurrent INSERT operations that can cause table fragmentation in the destination. The destination tablespace may be larger than the source after full synchronization completes.
-
We recommend that you use only DTS to write data to the destination database. If tools other than DTS write to the destination Kafka topic at the same time, lock-free DDL operations via DMS may cause data loss in the destination.
What's next
-
Map object names — rename tables or columns as they are written to the destination
-
Use SQL conditions to filter data — synchronize a subset of rows using WHERE conditions
-
Configure monitoring and alerting — set up notifications for task failures and latency spikes
-
Data formats of a Kafka cluster — understand the full message schema written to Kafka topics