MySQL data source provides bidirectional channels for reading and writing MySQL data. This article introduces the capabilities of MySQL data synchronization in DataWorks.
Supported MySQL versions
Offline read and write:
Supports MySQL 5.5.x, MySQL 5.6.x, MySQL 5.7.x, MySQL 8.0.x, compatible with Amazon RDS for MySQL, Azure MySQL, Amazon Aurora MySQL.
Offline synchronization supports reading from view tables.
Real-time reading:
Data Integration real-time reading of MySQL data is based on real-time subscription to MySQL. Currently, it only supports real-time synchronization of MySQL 5.5.x, MySQL 5.6.x, MySQL 5.7.x, MySQL 8.0.x versions (excluding new features in 8.0, such as functional index, only compatible with existing functionality), compatible with Amazon RDS for MySQL, Azure MySQL, Amazon Aurora MySQL.
ImportantIf you need to synchronize DRDS MySQL, do not configure DRDS MySQL as a MySQL data source. You can refer to Configure DRDS data source documentation to directly configure it as a DRDS data source.
Limits
Real-time reading
Does not support synchronizing data from MySQL read-only instances.
Does not support synchronizing tables with Functional index.
Does not support XA ROLLBACK.
For transaction data that has already been XA PREPARED, real-time synchronization will synchronize it to the target end. If XA ROLLBACK occurs, real-time synchronization will not perform rollback write operations for the XA PREPARED data. To handle XA ROLLBACK scenarios, you need to manually remove the XA ROLLBACK table from the real-time synchronization task, then add the table and resynchronize.
Only supports synchronizing MySQL servers with Binlog configuration format set to ROW.
Real-time synchronization will not synchronize associated table records that are cascade deleted.
For Amazon Aurora MySQL databases, you need to connect to your primary/write database because AWS does not allow activating the Binlog feature on Aurora MySQL read replicas. Real-time synchronization tasks require Binlog to perform incremental updates.
Real-time synchronization of online DDL changes only supports adding columns (Add Column) to MySQL tables through Data Management DMS.
Does not support reading stored procedures from MySQL.
Offline reading
When MySQL Reader plugin performs multi-table synchronization such as sharding, if you want to split a single table, the number of task concurrency must be greater than the number of tables. Otherwise, the number of split tasks equals the number of tables.
Does not support reading stored procedures from MySQL.
Supported field types
For a complete list of field types for all MySQL versions, please refer to the MySQL official documentation. The following lists the current support status for major fields using MySQL 8.0.x as an example.
Field type | Offline reading (MySQL Reader) | Offline writing (MySQL Writer) | Real-time reading | Real-time writing |
TINYINT | ||||
SMALLINT | ||||
INTEGER | ||||
BIGINT | ||||
FLOAT | ||||
DOUBLE | ||||
DECIMAL/NUMBERIC | ||||
REAL | ||||
VARCHAR | ||||
JSON | ||||
TEXT | ||||
MEDIUMTEXT | ||||
LONGTEXT | ||||
VARBINARY | ||||
BINARY | ||||
TINYBLOB | ||||
MEDIUMBLOB | ||||
LONGBLOB | ||||
ENUM | ||||
SET | ||||
BOOLEAN | ||||
BIT | ||||
DATE | ||||
DATETIME | ||||
TIMESTAMP | ||||
TIME | ||||
YEAR | ||||
LINESTRING | ||||
POLYGON | ||||
MULTIPOINT | ||||
MULTILINESTRING | ||||
MULTIPOLYGON | ||||
GEOMETRYCOLLECTION |
Data synchronization preparation: MySQL environment setup
Before performing data synchronization on DataWorks, you need to prepare the MySQL data synchronization environment according to this article to ensure normal service when configuring and executing MySQL data synchronization tasks on DataWorks. The following introduces the relevant environment preparation before MySQL synchronization.
Confirm MySQL version
Data Integration has requirements for MySQL versions. You can refer to the Supported MySQL versions section above to check if your MySQL to be synchronized meets the version requirements. You can check the current MySQL database version using the following statement in the MySQL database.
SELECT version();
Configure account permissions
It is recommended that you plan and create a MySQL account dedicated to DataWorks data source access in advance. The operation is as follows.
Optional: Create an account.
For operation details, see Create MySQL account.
Configure permissions.
Offline
In offline synchronization scenarios:
When reading MySQL data offline, this account needs to have read (
SELECT
) permission for the synchronization table.When writing MySQL data offline, this account needs to have write (
INSERT
,DELETE
,UPDATE
) permissions for the synchronization table.
Real-time
In real-time synchronization scenarios, this account needs to have
SELECT
,REPLICATION SLAVE
,REPLICATION CLIENT
permissions for the database.
You can refer to the following commands to add permissions to the account, or directly grant the account
SUPER
permission. When using the following execution statements, please replace'sync account'
with the account created above.-- CREATE USER 'sync account'@'%' IDENTIFIED BY 'password'; //Create a sync account and set a password to allow it to log in to the database from any host. % represents any host. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync account'@'%'; //Grant the sync account SELECT, REPLICATION SLAVE, REPLICATION CLIENT permissions on the database.
*.*
means granting the sync account the above permissions on all tables in all databases. You can also specify granting the sync account the above permissions on specific tables in the target database. For example, to grant the sync account the above permissions on the test database's user table, you can use the statementGRANT SELECT, REPLICATION CLIENT ON test.user TO 'sync account'@'%';
.NoteThe
REPLICATION SLAVE
statement is a global permission and cannot be specified to grant the sync account permissions on specific tables in the target database.
(Only required for real-time synchronization) Enable MySQL binlog
Data Integration implements real-time incremental data synchronization through real-time subscription to MySQL Binlog. You need to enable the MySQL Binlog service before configuring synchronization in DataWorks. The operation is as follows:
If Binlog is being consumed, it cannot be deleted by the database. If the real-time synchronization task runs with a delay, it may cause the source Binlog to be consumed for a long time. Please reasonably configure the task's delay alert and pay attention to the database's disk space in a timely manner.
Binlog should be retained for at least 72 hours or more to avoid data loss due to the disappearance of Binlog after a task failure, making it impossible to reset the position to before the failure occurred when restarting (in this case, only full offline synchronization can be used to make up for the data).
Check if Binlog is enabled.
Use the following statement to check if Binlog is enabled.
SHOW variables LIKE "log_bin";
When the return result is ON, it indicates that Binlog is enabled.
If you use a standby database to synchronize data, you can also check if Binlog is enabled using the following statement.
SHOW variables LIKE "log_slave_updates";
When the return result is ON, it indicates that Binlog is enabled on the standby database.
If the returned result does not match the above results:
For open-source MySQL, please refer to the MySQL official documentation to enable Binlog.
For Alibaba Cloud RDS MySQL, please refer to RDS MySQL log backup to enable Binlog.
For Alibaba Cloud PolarDB MySQL, please refer to Enable Binlog to enable Binlog.
Query the Binlog format being used.
Use the following statement to query the Binlog format being used.
SHOW variables LIKE "binlog_format";
Explanation of return results:
Returns ROW, indicating that the enabled Binlog format is ROW.
Returns STATEMENT, indicating that the enabled Binlog format is STATEMENT.
Returns MIXED, indicating that the enabled Binlog format is MIXED.
ImportantDataWorks real-time synchronization only supports synchronizing MySQL servers with Binlog configuration format set to ROW. If the return is not ROW, please modify the Binlog Format.
Check if the complete Binlog log is enabled.
Use the following statement to check if the complete Binlog log is enabled.
SHOW variables LIKE "binlog_row_image";
Explanation of return results:
Returns FULL, indicating that the complete log is enabled for Binlog.
Returns MINIMAL, indicating that the minimal log is enabled for Binlog, and the complete log is not enabled.
ImportantDataWorks real-time synchronization only supports synchronizing data from MySQL servers with complete Binlog logs enabled. If the query result returns non-FULL, please modify the binlog_row_image configuration.
OSS binlog read authorization configuration
When adding a MySQL data source, if the Configuration Mode is Alibaba Cloud Instance Mode, and the RDS MySQL instance region is in the same region as the DataWorks project space, you can enable Support OSS Binlog Reading. After enabling, when RDS Binlog cannot be accessed, it will attempt to obtain Binlog from OSS to prevent interruption of real-time synchronization tasks.
If the selected OSS Binlog Access Identity is Alibaba Cloud RAM User or Alibaba Cloud RAM Role, you need to configure account authorization as follows.
Alibaba Cloud RAM user.
Log in to the RAM Access Control-Users console and find the user that needs authorization. Specific operation:
Click Actions column's Add Permissions.
Configure the following key parameters, then click Confirm.
Resource Scope: Account level.
Permission Policy: System policy.
Policy Name:
AliyunDataWorksAccessingRdsOSSBinlogPolicy
.
Alibaba Cloud RAM role.
Log in to the RAM Access Control-Roles console and create a RAM role. For specific operations, see Create a RAM role with a trusted entity of an Alibaba Cloud account.
Key parameters:
Trusted Entity Type: Alibaba Cloud account.
Trusted Entity Name: Other Alibaba Cloud account, you need to fill in the Alibaba Cloud account that owns the DataWorks workspace.
Role Name: Custom.
Grant precise permissions to the created RAM role. For specific operations, see Grant permissions to a RAM role.
Key parameters:
Permission Policy: System policy.
Policy Name:
AliyunDataWorksAccessingRdsOSSBinlogPolicy
.
Modify the trust policy for the created RAM role. For specific operations, see Modify the trust policy of a RAM role.
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "@cdp.aliyuncs.com", "@dataworks.aliyuncs.com" ] } } ], "Version": "1" }
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Add and manage data sources. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.
Please select the MySQL connection mode according to your network environment:
Scenario 1: Internal network connection (recommended)
Internal network links have low latency and more secure data transmission, with no need for additional public network permissions.
Applicable scenario: Your MySQL instance and Serverless resource group are in the same VPC.
Supports using Alibaba Cloud instance mode and connection string mode:
Select Alibaba Cloud Instance Mode: Directly select the MySQL instance in the same VPC, and the system automatically obtains the connection information without manual configuration.
Select Connection String Mode: Manually enter the instance's internal address and port.
Scenario 2: Public network connection
Public network transmission has security risks. It is recommended to use it in conjunction with security policies such as whitelists and IP authentication.
Applicable scenario: Need to access MySQL instances through the public network (such as cross-region, local environment access).
Supports using connection string mode (please ensure that the MySQL instance has enabled public network access permissions):
Select Connection String Mode: Manually fill in the instance's public address and port.
Serverless resource groups do not have public network access capabilities by default. When connecting to MySQL instances using public addresses, you need to configure public NAT Gateway and EIP for the bound VPC to support public network access to data sources.
Data synchronization task development: MySQL synchronization process guide
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Single table offline synchronization task configuration guide
For the operation process, see Configure offline synchronization tasks using codeless UI, Configure offline synchronization tasks using code editor.
For full parameters and script demos in code editor mode, see Appendix: MySQL script demo and parameter description below.
Single table real-time synchronization task configuration guide
For the operation process, see DataStudio real-time synchronization task configuration.
Configuration guide for whole database offline, whole database (real-time) full and incremental, whole database (real-time) sharding, and other database-level synchronization
For the operation process, see Data Integration synchronization task configuration.
FAQ
For more common Data Integration issues, see Data Integration FAQ.
Appendix: MySQL script demo and parameter description
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization task by using the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo
This article provides configuration examples for single database single table and sharding:
The comments in the JSON examples in this article are only used to show the meaning of some important parameters. When actually configuring, please remove the comment content.
Configure single database single table
{ "type": "job", "version": "2.0",//Version number. "steps": [ { "stepType": "mysql",//Plugin name. "parameter": { "column": [//Column names. "id" ], "connection": [ { "querySql": [ "select a,b from join1 c join join2 d on c.id = d.id;" ], "datasource": ""//Data source name. } ], "where": "",//Filter condition. "splitPk": "",//Split key. "encoding": "UTF-8"//Encoding format. }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": {}, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "0"//Error record count. }, "speed": { "throttle": true,//When throttle value is false, mbps parameter is not effective, meaning no flow control; when throttle value is true, it means flow control. "concurrent": 1,//Job concurrency. "mbps": "12"//Flow control, here 1mbps = 1MB/s. } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Configure sharding
NoteSharding refers to the ability to select multiple MySQL data tables on the MySQL Reader side, with consistent table structures. The 'sharding' here means multiple MySQL tables writing to the same target table. If you want to support database-level sharding configuration, please create a task on the Data Integration site and select the whole database sharding capability
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "indexes": [ { "type": "unique", "column": [ "id" ] } ], "envType": 0, "useSpecialSecret": false, "column": [ "id", "buyer_name", "seller_name", "item_id", "city", "zone" ], "tableComment": "Test order table", "connection": [ { "datasource": "rds_dataservice", "table": [ "rds_table" ] }, { "datasource": "rds_workshop_log", "table": [ "rds_table" ] } ], "where": "", "splitPk": "id", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": {}, "name": "Writer", "category": "writer" }, { "name": "Processor", "stepType": null, "category": "processor", "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" } } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader script parameters
Script parameter name | Description | Required | Default value |
datasource | Data source name. Code editor mode supports adding data sources, and the content filled in this configuration item must be consistent with the added data source name. | Yes | None |
table | The name of the table to be synchronized. A data integration task can only read data from one table. Advanced usage examples of table for configuring ranges:
Note The task will read all matched tables and specifically read the columns specified in the column configuration item from these tables. If the table does not exist, or if the columns being read do not exist, the task will fail. | Yes | None |
column | The collection of column names in the configured table that need to be synchronized, described using a JSON array for field information. By default, all columns are configured, for example [*].
| Yes | None |
splitPk | When MySQL Reader extracts data, if splitPk is specified, it means you want to use the field represented by splitPk for data sharding. Data synchronization will therefore start concurrent tasks for data synchronization, improving data synchronization efficiency.
| No | None |
where | Filter condition. In actual business scenarios, you often want to synchronize data for the current day, specifying the where condition as
| No | None |
querySql (advanced mode, this parameter configuration is not supported in codeless UI mode) | In some business scenarios, the where configuration item is not sufficient to describe the filtering conditions. You can use this configuration item to customize filtering SQL. After configuring this item, the data synchronization system will ignore the tables, columns, and splitPk configuration items and directly use the content configured in this item to filter data. For example, if you need to synchronize data after joining multiple tables, use Note querySql is case-sensitive. For example, writing it as querysql will not work. | No | None |
useSpecialSecret | When using multiple source data sources, whether to use each data source's password. Values include the following:
If you have configured multiple source data sources and the username and password for each data source are not consistent, you can set to use each data source's password, which means setting this parameter to true. | No | false |
Writer script demo
{
"type": "job",
"version": "2.0",//Version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mysql",//Plugin name.
"parameter": {
"postSql": [],//Preparation statement after import.
"datasource": "",//Data source.
"column": [//Column names.
"id",
"value"
],
"writeMode": "insert",//Write mode, you can set it to insert, replace, or update.
"batchSize": 1024,//Batch size for one-time batch submission of records.
"table": "",//Table name.
"nullMode": "skipNull",//NULL value handling strategy.
"skipNullColumn": [//Columns that need to skip NULL values.
"id",
"value"
],
"preSql": [
"delete from XXX;"//Preparation statement before import.
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {//Error record count.
"record": "0"
},
"speed": {
"throttle": true,//When throttle value is false, mbps parameter is not effective, meaning no flow control; when throttle value is true, it means flow control.
"concurrent": 1,//Job concurrency.
"mbps": "12"//Flow control, controls the highest rate of synchronization to prevent excessive read/write pressure on upstream/downstream databases, here 1mbps = 1MB/s.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer script parameters
Script parameter name | Description | Required | Default value |
datasource | Data source name. Code editor mode supports adding data sources, and the content filled in this configuration item must be consistent with the added data source name. | Yes | None |
table | The name of the table to be synchronized. | Yes | None |
writeMode | Select the import mode, which can support insert into, on duplicate key update, and replace into methods:
| No | insert |
nullMode | NULL value handling strategy, value range:
Important When configured as skipNull, the task will dynamically splice the SQL statements for writing data to support target default values, which will increase the number of FLUSH operations and reduce synchronization speed. In the worst case, it will FLUSH once per data record. | No | writeNull |
skipNullColumn | When nullMode is configured as skipNull, the columns configured in this parameter will not be forcibly written as Configuration format: | No | Default is all columns configured for this task. |
column | Fields in the target table that need to be written with data, separated by English commas, for example | Yes | None |
preSql | SQL statement to execute before the data synchronization task. Currently, codeless UI mode only allows executing one SQL statement, while code editor mode can support multiple SQL statements. For example, clearing old data from the table before execution (truncate table tablename). Note When there are multiple SQL statements, transactions are not supported. | No | None |
postSql | SQL statement to execute after the data synchronization task. Currently, codeless UI mode only allows executing one SQL statement, while code editor mode can support multiple SQL statements. For example, adding a timestamp Note When there are multiple SQL statements, transactions are not supported. | No | None |
batchSize | The size of records for one-time batch submission, which can greatly reduce the number of network interactions between the data synchronization system and MySQL, and improve overall throughput. If this value is set too high, it may cause OOM exceptions in the data synchronization running process. | No | 256 |
updateColumn | When writeMode is configured as update, the fields to be updated when a primary key/unique index conflict occurs. Fields are separated by English commas, for example | No | None |