Data Transmission Service (DTS) supports synchronizing data from an AnalyticDB for PostgreSQL instance to Alibaba Cloud Message Queue for Kafka. Use this feature to stream row-level changes in real time to downstream consumers such as analytics pipelines and event-driven architectures.
This feature is in invitational preview. To use this feature, submit a ticket.
Prerequisites
Before you begin, make sure that:
-
A topic has been created in the destination Kafka instance to receive the synchronized data. For more information, see Step 1: Create a topic.
-
The storage space of the destination Kafka instance is larger than the storage space used by the source AnalyticDB for PostgreSQL instance.
Limitations
Source database
Before synchronization:
| Limitation | Details |
|---|---|
| Bandwidth | The outbound bandwidth of the source database server must be at least 100 Mb/s. |
| Kernel version | The source AnalyticDB for PostgreSQL instance must be version 7.2.1.4 or later. |
| Logical replication | Set the wal_level parameter to logical to enable logical replication. |
| High-availability configuration | If the source instance is a High-availability Edition, also set hot_standby, hot_standby_feedback, and sync_replication_slots to on. This prevents primary/standby switchovers from interrupting the synchronization task. |
| Account permissions | The synchronization account must have read permissions on the objects to be synchronized and the REPLICATION permission. Run the following command to grant the required permissions: ALTER USER your_user WITH REPLICATION; |
| Long-running transactions | During incremental synchronization, uncommitted long-running transactions in the source database can cause write-ahead log (WAL) files to accumulate and eventually exhaust the disk space. |
| Database naming | The database name cannot contain a hyphen (-). For example, dts-testdata is not supported. |
| Primary key or UNIQUE constraint | Tables must have a primary key or a UNIQUE constraint with unique values. Otherwise, duplicate data may appear in the destination. |
| Partitioned tables | Partition schema cannot be synchronized. After synchronization, all partitioned tables are created as non-partitioned tables in the destination. |
| Databases per task | A synchronization task can handle only one database. To synchronize multiple databases, create a separate task for each database. |
| Task scale | To edit more than 5,000 tables in a single task (for example, for column name mapping), create multiple tasks or configure synchronization for the entire database instead. |
| Unsupported object types | Tables that inherit across schemas, temporary tables, internal system triggers, C language functions, internal functions for PROCEDURE and FUNCTION, and plugins (EXTENSION). |
| Supported object types | Primary keys, UNIQUE constraints, CHECK constraints, and custom data types (COMPOSITE, ENUM, and RANGE). |
During synchronization:
| Limitation | Details |
|---|---|
| DDL operations | Do not perform DDL operations during schema synchronization or full data synchronization. Otherwise, the task fails. During full data synchronization, DTS queries the source database and creates metadata locks, which may block DDL operations on the source database. |
| DDL synchronization | DDL synchronization is not supported. New tables created in the source after the DTS task starts, along with their subsequent data changes, are not synchronized to the destination. If needed, create a new synchronization task. |
| Instance configuration changes | Do not modify the endpoint or zone of the AnalyticDB for PostgreSQL instance while the task is running. Otherwise, the task fails. |
Destination database
Before synchronization:
| Limitation | Details |
|---|---|
| Resource load | Run synchronization tasks during off-peak hours, for example, when the CPU load is below 30%, as full data synchronization consumes read and write resources on both the source and destination databases. |
| Message size | Kafka limits each message to 10 MB. If a single row of source data exceeds this limit after conversion, the task is interrupted. When configuring the task, filter out columns that contain large objects. If the task is already running, use Modify Synchronization Objects to remove the table, re-add it, and filter out the large object columns. |
During synchronization:
| Limitation | Details |
|---|---|
| Kafka cluster scaling | If the destination Kafka cluster is scaled out or in (for example, broker nodes are added or removed) while the task is running, restart the DTS synchronization task for the changes to take effect. |
| External writes | Do not write data from sources other than the DTS task to the destination Kafka cluster during synchronization. External writes can cause data inconsistency or task failure. |
| Storage growth | During full data synchronization, DTS performs concurrent writes, which can cause data fragmentation. As a result, storage usage of the destination may exceed that of the source after synchronization completes. |
| Task restart | Restarting a task that includes both full and incremental synchronization may re-run the full synchronization phase. |
| Instance failure recovery | If the DTS instance fails, the DTS helpdesk will attempt recovery within 8 hours. During recovery, the instance may be restarted or task parameters may be adjusted. Your database parameters will not be modified. |
| After switching to destination | Stop or release synchronization tasks that are no longer needed after switching your business to the destination database, to prevent automatic recovery from overwriting data. |
Supported SQL operations: INSERT, UPDATE, and DELETE.
Supported objects for synchronization:
-
Basic objects:
SCHEMAandTABLE(includesPRIMARY KEY,UNIQUE KEY,DATATYPE(built-in data types), andDEFAULT CONSTRAINT) -
Other objects:
VIEW,INDEX,PROCEDURE,FUNCTION,RULE,SEQUENCE,AGGREGATE,OPERATOR, andDOMAIN
Billing
| Synchronization type | Fee |
|---|---|
| Schema synchronization and full data synchronization | Free |
| Incremental data synchronization | Charged. For more information, see Billing overview. |
Create a synchronization task
Step 1: Go to the Data Synchronization page
Use one of the following methods:
DTS console
-
Log on to the DTS console.
-
In the left-side navigation pane, click Data Synchronization.
-
In the upper-left corner, select the region where the synchronization task resides.
DMS console
The actual operations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.
-
Log on to the DMS console.
-
In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.
-
From the drop-down list to the right of Data Synchronization Tasks, select the region where the synchronization instance resides.
Step 2: Configure source and destination databases
-
Click Create Task.
-
Configure the source database parameters.
Parameter Description Task Name The name of the DTS task. DTS generates a name automatically. Specify a descriptive name to make it easy to identify the task. The name does not need to be unique. Select Existing Connection If the source instance is already registered with DTS, select it from the drop-down list. DTS automatically fills in the connection parameters. Otherwise, configure the parameters manually. NoteIn the DMS console, select the instance from the Select a DMS database instance drop-down list.
Database Type Select AnalyticDB for PostgreSQL. Access Method Select Alibaba Cloud Instance. Instance Region Select the region where the source instance resides. Replicate Data Across Alibaba Cloud Accounts Select No for synchronization within the same Alibaba Cloud account. Instance ID Select the instance ID of the source AnalyticDB for PostgreSQL instance. Database Name Enter the name of the database that contains the data to be synchronized. Database Account Enter the database account. The account must have read permissions on the objects to be synchronized. Database Password Enter the password for the database account. -
Configure the destination database parameters.
Parameter Description Select Existing Connection If the destination instance is already registered with DTS, select it from the drop-down list. DTS automatically fills in the connection parameters. Otherwise, configure the parameters manually. NoteIn the DMS console, select the instance from the Select a DMS database instance drop-down list.
Database Type Select Kafka. Access Method Select Alibaba Cloud Instance. Instance Region Select the region where the destination Kafka instance resides. Kafka Instance ID Select the ID of the destination Kafka instance. Encryption Select Non-encrypted or SCRAM-SHA-256 based on your security requirements. Topic Select the topic to receive the synchronized data. Use Kafka Schema Registry Kafka Schema Registry is a metadata service that provides a RESTful interface for storing and retrieving Avro schemas. Select No to skip, or Yes and enter the URL or IP address of your Schema Registry. -
Click Test Connectivity and Proceed.
- DTS server CIDR blocks must be added to the security settings of both source and destination databases. For more information, see Add DTS server IP addresses to a whitelist. - If the source or destination database is a self-managed database and its Access Method is not set to Alibaba Cloud Instance, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.
Step 3: Configure synchronization objects
-
In the Configure Objects step, set the following parameters. Configure object mapping in the destination Kafka instance (optional)
-
In the Selected Objects section, hover over the destination topic name at the table level.
-
Click Edit next to the topic name.
-
In the Edit Table dialog box, configure the mapping.
NoteAt the database level, the Edit Schema dialog box appears with fewer parameters. At the table level, the Edit Table dialog box appears.
If you synchronize at a granularity other than the entire database, Destination Database Name (Name of target Topic) and Number of Partitions cannot be modified in the Edit Schema dialog box.
Parameter Description Name of target Topic The destination topic to which the source table is synchronized. Defaults to the Topic selected in the destination database configuration. Important: If the destination is an ApsaraMQ for Kafka instance, the topic must already exist. If it does not exist, the task fails. If the destination is a self-managed Kafka cluster and schema synchronization is included, DTS attempts to create the topic. Changing the Name of target Topic directs writes to the new topic. Filter Conditions For more information, see Set filter conditions. Number of Partitions The number of partitions for the destination topic. Column Edit the column names written to the destination topic.
- To select the SQL operations to synchronize at the database or table level, right-click the object in Selected Objects and select the desired operations. - If you use object name mapping, other objects that depend on the mapped object may fail to synchronize.
Parameter Description Synchronization Types By default, Incremental Data Synchronization is selected. Also select Schema Synchronization and Full Data Synchronization. After the precheck, DTS synchronizes historical data from the source to the destination, which serves as the basis for incremental data synchronization. NoteSchema Synchronization is not supported when the destination Kafka instance's Access Method is Alibaba Cloud Instance.
Processing Mode of Conflicting Tables Precheck and Report Errors (default): checks whether the destination contains tables with the same names as those in the source. If identical names exist, the precheck fails and the task cannot start. NoteUse the object name mapping feature to rename tables in the destination if conflicts exist. For more information, see Map object names. Ignore Errors and Proceed: skips the name conflict check.
WarningIf you select this option, data inconsistency may occur. During full data synchronization, existing records in the destination are retained when a primary key or unique key conflict is detected. During incremental data synchronization, existing records are overwritten. If the source and destination have different schemas, some columns may not be synchronized, or the task may fail.
Data Format in Kafka Select the data format for synchronized messages. DTS Avro: parse data according to the DTS Avro schema definition. See also the DTS Avro deserialization example. Canal JSON: see Canal JSON for parameter descriptions and examples. Kafka Data Compression Format Select the compression format for Kafka messages. LZ4 (default): lower compression ratio, higher speed. GZIP: higher compression ratio, lower speed. NoteGZIP consumes more CPU resources. Snappy: medium compression ratio and speed.
Policy for Shipping Data to Kafka Partitions Select a partition policy based on your requirements. Message acknowledgement mechanism Select a message acknowledgment mechanism based on your requirements. Topic That Stores DDL Information Select the topic to store DDL information. If no topic is selected, DDL information is stored in the data topic by default. Capitalization of Object Names in Destination Instance The capitalization of database names, table names, and column names in the destination. The default is DTS default policy. For more information, see Specify the capitalization of object names in the destination instance. Source Objects Select objects from Source Objects and click the right arrow icon to move them to Selected Objects. Objects can be selected at the table level. Selected Objects No additional configuration is required for a basic setup. To map source tables to specific Kafka topics, partitions, or columns, follow the steps below. -
-
Click Next: Advanced Settings and configure the advanced parameters.
Parameter Description Dedicated Cluster for Task Scheduling By default, DTS schedules the task to the shared cluster. To improve stability, purchase a dedicated cluster. For more information, see What is a DTS dedicated cluster. Enable Encryption For Data Transmission Enabling encrypted transmission may affect synchronization performance. Select based on your security requirements. Default: No. Retry Time for Failed Connections The retry window for failed connections. Valid values: 10 to 1440 minutes. Default: 720 minutes. Set this to more than 30 minutes. If DTS reconnects within this window, the task resumes. Otherwise, the task fails. NoteIf multiple tasks share the same source or destination database, the shortest retry window takes precedence. DTS instance fees continue to accrue during retries. Set this value based on your business requirements and release the instance promptly when the source or destination is released.
Retry Time for Other Issues The retry window for DDL or DML operation failures. Valid values: 1 to 1440 minutes. Default: 10 minutes. Set this to more than 10 minutes. ImportantThis value must be smaller than Retry Time for Failed Connections.
Enable Throttling for Full Data Synchronization Limits the read rate during full data synchronization to reduce load on the source and destination. Configure Queries per second (QPS) to the source database, RPS of Full Data Migration, and Data migration speed for full migration (MB/s) as needed. This parameter is available only when Full Data Synchronization is selected. Enable Throttling for Incremental Data Synchronization Limits the write rate during incremental data synchronization. Configure RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s) as needed. Environment Tag (Optional) Tag to identify the environment of the instance. Scene Label (Optional) Tag to identify the usage scenario of the instance. Scenario tags do not affect task operation. Configure ETL Specifies whether to enable the extract, transform, and load (ETL) feature. Select Yes to enter data processing statements in the code editor. For more information, see Configure ETL in a data migration or data synchronization task. Select No to skip ETL configuration. For more information about ETL, see What is ETL? Monitoring and Alerting Configures alerts for task failures or synchronization latency exceeding the threshold. Select Yes and configure the alert threshold and notification settings. For more information, see Configure monitoring and alerting when you create a DTS task.
Step 4: Run the precheck
-
To preview the API parameters for this task configuration, hover over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.
-
Click Next: Save Task Settings and Precheck.
- DTS performs a precheck before the task can start. The task starts only after it passes the precheck. - If the precheck fails, click View Details next to each failed item, address the issues, and rerun the precheck. - If an alert is triggered during the precheck: - For alerts that cannot be ignored: click View Details, fix the issue, and rerun the precheck. - For alerts that can be ignored: click Confirm Alert Details, click Ignore in the details panel, confirm, and then click Precheck Again. Ignoring alerts may result in data inconsistency.
Step 5: Purchase the instance
-
Wait until the Success Rate reaches 100%, then click Next: Purchase Instance.
-
On the buy page, configure the billing and instance parameters.
Parameter Description Billing Method Subscription: pay upfront for a set period. More cost-effective for long-term use. Pay-as-you-go: billed hourly. Suitable for short-term use. Release the instance when no longer needed to stop charges. Resource Group Settings The resource group for the instance. Default: default resource group. For more information, see What is Resource Management? Instance Class DTS provides instance classes with varying synchronization speeds. Select a class based on your requirements. For more information, see Instance classes of data synchronization instances. Subscription Duration Available only for the Subscription billing method. Options: 1–9 months, 1 year, 2 years, 3 years, or 5 years. -
Read and select Data Transmission Service (Pay-as-you-go) Service Terms.
-
Click Buy and Start, then click OK in the dialog box.
The task appears in the task list. Monitor its progress there.
FAQ
Can I modify the Kafka Data Compression Format after the task starts?
Yes. Use the Modify Synchronization Objects feature to change the format.
Can I modify the message acknowledgement mechanism after the task starts?
Yes. Use the Modify Synchronization Objects feature to change the mechanism.