DataWorks provides PolarDB Reader and PolarDB Writer for bidirectional data synchronization with PolarDB data sources. Both the codeless UI and the code editor are supported for configuring synchronization tasks.
Before you begin
To connect a PolarDB data source and run synchronization tasks, complete the following steps in order:
Add the CIDR block of your exclusive resource group's virtual private cloud (VPC) to the PolarDB for MySQL cluster's IP address whitelist.
Create a database account with the required permissions.
(Real-time synchronization only) Enable binary logging on the cluster.
Add the PolarDB data source in DataWorks.
Limitations
Batch synchronization
Data from database views can be read.
Real-time synchronization
Only PolarDB for MySQL clusters are supported as the source.
Binary logging must be enabled on the cluster. PolarDB for MySQL uses high-level physical logs by default. Enable binary logging to integrate with the MySQL ecosystem and support real-time CDC.
Choose a synchronization mode
DataWorks supports three synchronization modes for PolarDB. Use the following table to choose the right mode for your scenario.
Mode | Use when | Configuration guide |
Batch synchronization — single table | You need periodic bulk data loads from a single table | |
Real-time synchronization — single table or full database | You need low-latency CDC from a single table or an entire database | |
Full-database synchronization (batch + real-time) | You need a combination of full-load batch sync and real-time incremental or full-database sync |
Data type mappings
Batch read (PolarDB Reader)
Category | PolarDB data type | Notes |
Integer | INT, TINYINT, SMALLINT, MEDIUMINT, BIGINT | TINYINT(1) is treated as integer, not boolean |
Floating point | FLOAT, DOUBLE, DECIMAL | |
String | VARCHAR, CHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT | |
Date and time | DATE, DATETIME, TIMESTAMP, TIME, YEAR | |
Boolean | BIT, BOOL | |
Binary | TINYBLOB, MEDIUMBLOB, BLOB, LONGBLOB, VARBINARY |
Data types not listed above are not supported.
Batch write (PolarDB Writer)
Category | PolarDB data type | Notes |
Integer | INT, TINYINT, SMALLINT, MEDIUMINT, BIGINT, YEAR | YEAR is classified as Integer for writes (not Date and time) |
Floating point | FLOAT, DOUBLE, DECIMAL | |
String | VARCHAR, CHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT | |
Date and time | DATE, DATETIME, TIMESTAMP, TIME | YEAR is not in this category for writes |
Boolean | BOOL | BIT is not supported for writes |
Binary | TINYBLOB, MEDIUMBLOB, BLOB, LONGBLOB, VARBINARY |
Set up the PolarDB environment
Configure an IP address whitelist
Add the CIDR block of the VPC where your exclusive resource group for Data Integration resides to the IP address whitelist of the PolarDB for MySQL cluster.
Create an account with the required permissions
Create a database account for the PolarDB for MySQL cluster. See Create and manage a database account.
Grant the required permissions to the account. Run the following SQL statement, or assign the
SUPERrole directly.-- CREATE USER 'Account for data synchronization'@'%' IDENTIFIED BY 'Account for data synchronization'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'Account for data synchronization'@'%';
Enable binary logging (real-time synchronization only)
Enable binary logging on the cluster before configuring a real-time synchronization task. See Enable binary logging.
Add a data source
Add the PolarDB data source to DataWorks before developing a synchronization task. For the full procedure and parameter descriptions, see Data source management.
Code editor reference
The following sections describe the JSON parameters used when configuring batch synchronization tasks in the code editor. For the general script format, see Configure a task in the code editor.
PolarDB Reader
The following example reads from a single table with data sharding and bandwidth throttling enabled.
{
"type": "job",
"steps": [
{
"parameter": {
"datasource": "test_005",
"column": [
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"where": "id=1001",
"splitPk": "id",
"table": "PolarDB_person",
"useReadonly": "false"
},
"name": "Reader",
"category": "reader"
},
{
"parameter": {}
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 6,
"throttle": true,
"mbps": "12"
}
}
}PolarDB Reader parameters
Parameter | Required | Default | Description |
| Yes | — | The name of the data source. Must match the name added in DataWorks. |
| Yes | — | The name of the table to read from. |
| Yes | — | A JSON array of column names. Use |
| No |
| Set to |
| No | — | The field used for data sharding. Integer types only; use the primary key for even distribution. If left blank, data is read with a single thread. |
| No |
| The sharding factor. Number of shards = parallel threads × sharding factor. Keep this value between 1 and 100; values above 100 may cause out of memory (OOM) errors. |
| No | — | A WHERE clause for filtering rows, such as |
| No | — | An advanced SQL statement for custom filtering, such as a multi-table join. When specified, overrides |
PolarDB Writer
The following example writes to a single table using insert mode.
{
"type": "job",
"steps": [
{
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"parameter": {
"postSql": [],
"datasource": "test_005",
"column": [
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"writeMode": "insert",
"batchSize": 256,
"table": "PolarDB_person_copy",
"preSql": []
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle": true,
"concurrent": 6,
"mbps": "12"
}
}
}PolarDB Writer parameters
Parameter | Required | Default | Description |
| Yes | — | The name of the data source. Must match the name added in DataWorks. |
| Yes | — | The name of the destination table. |
| Yes | — | A JSON array of column names to write. Use |
| No |
| The write mode. See Write mode behavior. PolarDB for PostgreSQL supports |
| No | — | SQL to run before the task, such as a statement to delete outdated data. The codeless UI supports one statement; the code editor supports multiple. |
| No | — | SQL to run after the task, such as a statement to add a timestamp. The codeless UI supports one statement; the code editor supports multiple. |
| No |
| The number of records to write per batch. Larger values reduce round trips but increase memory usage. Excessively large values may cause OOM errors. |
| No | — | The columns to update when a primary key or unique index conflict occurs. Takes effect only when |
Write mode behavior
All three write modes handle rows with no conflicts the same way (equivalent to INSERT INTO). The modes differ in how they handle primary key or unique index conflicts.
Mode | UI equivalent | Conflict behavior | Use when |
| INSERT INTO | Conflicting rows are skipped and counted as dirty data | You want to ignore duplicates |
| ON DUPLICATE KEY UPDATE | Conflicting rows are updated with new values from the specified columns | You want to update existing records in place |
| REPLACE INTO | Conflicting rows are deleted, then new rows are inserted (all fields replaced) | You want to fully overwrite existing records |
Data examples
The following examples show how each mode handles a conflict on the id column.
Source table:
+----+----------+-----+
| id | name | age |
+----+----------+-----+
| 1 | zhangsan | 1 |
| 2 | lisi | |
+----+----------+-----+Original destination table:
+----+--------+-----+
| id | name | age |
+----+--------+-----+
| 2 | wangwu | 3 |
+----+--------+-----+`insert` result — row 2 conflict is skipped; row 1 is inserted:
+----+----------+-----+
| id | name | age |
+----+----------+-----+
| 1 | zhangsan | 1 |
| 2 | wangwu | 3 |
+----+----------+-----+`update` result (scenario 1: only some columns specified — `"column": ["id","name"]`) — row 1 is inserted; row 2 is updated with the source name value, but age retains the original destination value:
+----+----------+-----+
| id | name | age |
+----+----------+-----+
| 1 | zhangsan | 1 |
| 2 | lisi | 3 |
+----+----------+-----+`update` result (scenario 2: all columns specified — `"column": ["id","name","age"]`) — row 2 is updated with all source values:
+----+----------+-----+
| id | name | age |
+----+----------+-----+
| 1 | zhangsan | 1 |
| 2 | lisi | |
+----+----------+-----+`replace` result — row 2 is deleted and re-inserted with source values:
+----+----------+-----+
| id | name | age |
+----+----------+-----+
| 1 | zhangsan | 1 |
| 2 | lisi | |
+----+----------+-----+For PolarDB for PostgreSQL, only insert mode is supported. To avoid primary key conflicts, either add a TRUNCATE statement in preSql to clear the destination table before the task runs, or handle deduplication in an upstream node.