All Products
Search
Document Center

DataWorks:MySQL data source

Last Updated:Apr 04, 2025

MySQL data source provides bidirectional channels for reading and writing MySQL data. This article introduces the capabilities of MySQL data synchronization in DataWorks.

Supported MySQL versions

  • Offline read and write:

    Supports MySQL 5.5.x, MySQL 5.6.x, MySQL 5.7.x, MySQL 8.0.x, compatible with Amazon RDS for MySQL, Azure MySQL, Amazon Aurora MySQL.

    Offline synchronization supports reading from view tables.

  • Real-time reading:

    Data Integration real-time reading of MySQL data is based on real-time subscription to MySQL. Currently, it only supports real-time synchronization of MySQL 5.5.x, MySQL 5.6.x, MySQL 5.7.x, MySQL 8.0.x versions (excluding new features in 8.0, such as functional index, only compatible with existing functionality), compatible with Amazon RDS for MySQL, Azure MySQL, Amazon Aurora MySQL.

    Important

    If you need to synchronize DRDS MySQL, do not configure DRDS MySQL as a MySQL data source. You can refer to Configure DRDS data source documentation to directly configure it as a DRDS data source.

Limits

Real-time reading

  • Does not support synchronizing data from MySQL read-only instances.

  • Does not support synchronizing tables with Functional index.

  • Does not support XA ROLLBACK.

    For transaction data that has already been XA PREPARED, real-time synchronization will synchronize it to the target end. If XA ROLLBACK occurs, real-time synchronization will not perform rollback write operations for the XA PREPARED data. To handle XA ROLLBACK scenarios, you need to manually remove the XA ROLLBACK table from the real-time synchronization task, then add the table and resynchronize.

  • Only supports synchronizing MySQL servers with Binlog configuration format set to ROW.

  • Real-time synchronization will not synchronize associated table records that are cascade deleted.

  • For Amazon Aurora MySQL databases, you need to connect to your primary/write database because AWS does not allow activating the Binlog feature on Aurora MySQL read replicas. Real-time synchronization tasks require Binlog to perform incremental updates.

  • Real-time synchronization of online DDL changes only supports adding columns (Add Column) to MySQL tables through Data Management DMS.

  • Does not support reading stored procedures from MySQL.

Offline reading

  • When MySQL Reader plugin performs multi-table synchronization such as sharding, if you want to split a single table, the number of task concurrency must be greater than the number of tables. Otherwise, the number of split tasks equals the number of tables.

  • Does not support reading stored procedures from MySQL.

Supported field types

For a complete list of field types for all MySQL versions, please refer to the MySQL official documentation. The following lists the current support status for major fields using MySQL 8.0.x as an example.

Field type

Offline reading (MySQL Reader)

Offline writing (MySQL Writer)

Real-time reading

Real-time writing

TINYINT

image

image

image

image

SMALLINT

image

image

image

image

INTEGER

image

image

image

image

BIGINT

image

image

image

image

FLOAT

image

image

image

image

DOUBLE

image

image

image

image

DECIMAL/NUMBERIC

image

image

image

image

REAL

image

image

image

image

VARCHAR

image

image

image

image

JSON

image

image

image

image

TEXT

image

image

image

image

MEDIUMTEXT

image

image

image

image

LONGTEXT

image

image

image

image

VARBINARY

image

image

image

image

BINARY

image

image

image

image

TINYBLOB

image

image

image

image

MEDIUMBLOB

image

image

image

image

LONGBLOB

image

image

image

image

ENUM

image

image

image

image

SET

image

image

image

image

BOOLEAN

image

image

image

image

BIT

image

image

image

image

DATE

image

image

image

image

DATETIME

image

image

image

image

TIMESTAMP

image

image

image

image

TIME

image

image

image

image

YEAR

image

image

image

image

LINESTRING

image

image

image

image

POLYGON

image

image

image

image

MULTIPOINT

image

image

image

image

MULTILINESTRING

image

image

image

image

MULTIPOLYGON

image

image

image

image

GEOMETRYCOLLECTION

image

image

image

image

Data synchronization preparation: MySQL environment setup

Before performing data synchronization on DataWorks, you need to prepare the MySQL data synchronization environment according to this article to ensure normal service when configuring and executing MySQL data synchronization tasks on DataWorks. The following introduces the relevant environment preparation before MySQL synchronization.

Confirm MySQL version

Data Integration has requirements for MySQL versions. You can refer to the Supported MySQL versions section above to check if your MySQL to be synchronized meets the version requirements. You can check the current MySQL database version using the following statement in the MySQL database.

SELECT version();

Configure account permissions

It is recommended that you plan and create a MySQL account dedicated to DataWorks data source access in advance. The operation is as follows.

  1. Optional: Create an account.

    For operation details, see Create MySQL account.

  2. Configure permissions.

    • Offline

      In offline synchronization scenarios:

      • When reading MySQL data offline, this account needs to have read (SELECT) permission for the synchronization table.

      • When writing MySQL data offline, this account needs to have write (INSERT, DELETE, UPDATE) permissions for the synchronization table.

    • Real-time

      In real-time synchronization scenarios, this account needs to have SELECT, REPLICATION SLAVE, REPLICATION CLIENT permissions for the database.

    You can refer to the following commands to add permissions to the account, or directly grant the account SUPER permission. When using the following execution statements, please replace 'sync account' with the account created above.

    -- CREATE USER 'sync account'@'%' IDENTIFIED BY 'password'; //Create a sync account and set a password to allow it to log in to the database from any host. % represents any host.
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync account'@'%'; //Grant the sync account SELECT, REPLICATION SLAVE, REPLICATION CLIENT permissions on the database.

    *.* means granting the sync account the above permissions on all tables in all databases. You can also specify granting the sync account the above permissions on specific tables in the target database. For example, to grant the sync account the above permissions on the test database's user table, you can use the statement GRANT SELECT, REPLICATION CLIENT ON test.user TO 'sync account'@'%';.

    Note

    The REPLICATION SLAVE statement is a global permission and cannot be specified to grant the sync account permissions on specific tables in the target database.

(Only required for real-time synchronization) Enable MySQL binlog

Data Integration implements real-time incremental data synchronization through real-time subscription to MySQL Binlog. You need to enable the MySQL Binlog service before configuring synchronization in DataWorks. The operation is as follows:

Important
  • If Binlog is being consumed, it cannot be deleted by the database. If the real-time synchronization task runs with a delay, it may cause the source Binlog to be consumed for a long time. Please reasonably configure the task's delay alert and pay attention to the database's disk space in a timely manner.

  • Binlog should be retained for at least 72 hours or more to avoid data loss due to the disappearance of Binlog after a task failure, making it impossible to reset the position to before the failure occurred when restarting (in this case, only full offline synchronization can be used to make up for the data).

  1. Check if Binlog is enabled.

    • Use the following statement to check if Binlog is enabled.

      SHOW variables LIKE "log_bin";

      When the return result is ON, it indicates that Binlog is enabled.

    • If you use a standby database to synchronize data, you can also check if Binlog is enabled using the following statement.

      SHOW variables LIKE "log_slave_updates";

      When the return result is ON, it indicates that Binlog is enabled on the standby database.

    If the returned result does not match the above results:

  2. Query the Binlog format being used.

    Use the following statement to query the Binlog format being used.

    SHOW variables LIKE "binlog_format";

    Explanation of return results:

    • Returns ROW, indicating that the enabled Binlog format is ROW.

    • Returns STATEMENT, indicating that the enabled Binlog format is STATEMENT.

    • Returns MIXED, indicating that the enabled Binlog format is MIXED.

    Important

    DataWorks real-time synchronization only supports synchronizing MySQL servers with Binlog configuration format set to ROW. If the return is not ROW, please modify the Binlog Format.

  3. Check if the complete Binlog log is enabled.

    Use the following statement to check if the complete Binlog log is enabled.

    SHOW variables LIKE "binlog_row_image";

    Explanation of return results:

    • Returns FULL, indicating that the complete log is enabled for Binlog.

    • Returns MINIMAL, indicating that the minimal log is enabled for Binlog, and the complete log is not enabled.

    Important

    DataWorks real-time synchronization only supports synchronizing data from MySQL servers with complete Binlog logs enabled. If the query result returns non-FULL, please modify the binlog_row_image configuration.

OSS binlog read authorization configuration

When adding a MySQL data source, if the Configuration Mode is Alibaba Cloud Instance Mode, and the RDS MySQL instance region is in the same region as the DataWorks project space, you can enable Support OSS Binlog Reading. After enabling, when RDS Binlog cannot be accessed, it will attempt to obtain Binlog from OSS to prevent interruption of real-time synchronization tasks.

If the selected OSS Binlog Access Identity is Alibaba Cloud RAM User or Alibaba Cloud RAM Role, you need to configure account authorization as follows.

  • Alibaba Cloud RAM user.

    1. Log in to the RAM Access Control-Users console and find the user that needs authorization. Specific operation:

    2. Click Actions column's Add Permissions.

    3. Configure the following key parameters, then click Confirm.

      • Resource Scope: Account level.

      • Permission Policy: System policy.

      • Policy Name: AliyunDataWorksAccessingRdsOSSBinlogPolicy.

      image

  • Alibaba Cloud RAM role.

    1. Log in to the RAM Access Control-Roles console and create a RAM role. For specific operations, see Create a RAM role with a trusted entity of an Alibaba Cloud account.

      Key parameters:

      • Trusted Entity Type: Alibaba Cloud account.

      • Trusted Entity Name: Other Alibaba Cloud account, you need to fill in the Alibaba Cloud account that owns the DataWorks workspace.

      • Role Name: Custom.

    2. Grant precise permissions to the created RAM role. For specific operations, see Grant permissions to a RAM role.

      Key parameters:

      • Permission Policy: System policy.

      • Policy Name: AliyunDataWorksAccessingRdsOSSBinlogPolicy.

    3. Modify the trust policy for the created RAM role. For specific operations, see Modify the trust policy of a RAM role.

      {
          "Statement": [
              {
                  "Action": "sts:AssumeRole",
                  "Effect": "Allow",
                  "Principal": {
                      "Service": [
                          "@cdp.aliyuncs.com",
                          "@dataworks.aliyuncs.com"
                      ]
                  }
              }
          ],
          "Version": "1"
      }

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Add and manage data sources. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.

Please select the MySQL connection mode according to your network environment:

Scenario 1: Internal network connection (recommended)

Internal network links have low latency and more secure data transmission, with no need for additional public network permissions.

  • Applicable scenario: Your MySQL instance and Serverless resource group are in the same VPC.

  • Supports using Alibaba Cloud instance mode and connection string mode:

    • Select Alibaba Cloud Instance Mode: Directly select the MySQL instance in the same VPC, and the system automatically obtains the connection information without manual configuration.

    • Select Connection String Mode: Manually enter the instance's internal address and port.

Scenario 2: Public network connection

Public network transmission has security risks. It is recommended to use it in conjunction with security policies such as whitelists and IP authentication.

  • Applicable scenario: Need to access MySQL instances through the public network (such as cross-region, local environment access).

  • Supports using connection string mode (please ensure that the MySQL instance has enabled public network access permissions):

    • Select Connection String Mode: Manually fill in the instance's public address and port.

Note

Serverless resource groups do not have public network access capabilities by default. When connecting to MySQL instances using public addresses, you need to configure public NAT Gateway and EIP for the bound VPC to support public network access to data sources.

Data synchronization task development: MySQL synchronization process guide

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Single table offline synchronization task configuration guide

Single table real-time synchronization task configuration guide

For the operation process, see DataStudio real-time synchronization task configuration.

Configuration guide for whole database offline, whole database (real-time) full and incremental, whole database (real-time) sharding, and other database-level synchronization

For the operation process, see Data Integration synchronization task configuration.

FAQ

For more common Data Integration issues, see Data Integration FAQ.

Appendix: MySQL script demo and parameter description

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization task by using the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script demo

This article provides configuration examples for single database single table and sharding:

Note

The comments in the JSON examples in this article are only used to show the meaning of some important parameters. When actually configuring, please remove the comment content.

  • Configure single database single table

    {
      "type": "job",
      "version": "2.0",//Version number.
      "steps": [
        {
          "stepType": "mysql",//Plugin name.
          "parameter": {
            "column": [//Column names.
              "id"
            ],
            "connection": [
              {
                "querySql": [
                  "select a,b from join1 c join join2 d on c.id = d.id;"
                ],
                "datasource": ""//Data source name.
              }
            ],
            "where": "",//Filter condition.
            "splitPk": "",//Split key.
            "encoding": "UTF-8"//Encoding format.
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "stream",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "errorLimit": {
          "record": "0"//Error record count.
        },
        "speed": {
          "throttle": true,//When throttle value is false, mbps parameter is not effective, meaning no flow control; when throttle value is true, it means flow control.
          "concurrent": 1,//Job concurrency.
          "mbps": "12"//Flow control, here 1mbps = 1MB/s.
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • Configure sharding

    Note

    Sharding refers to the ability to select multiple MySQL data tables on the MySQL Reader side, with consistent table structures. The 'sharding' here means multiple MySQL tables writing to the same target table. If you want to support database-level sharding configuration, please create a task on the Data Integration site and select the whole database sharding capability

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "indexes": [
              {
                "type": "unique",
                "column": [
                  "id"
                ]
              }
            ],
            "envType": 0,
            "useSpecialSecret": false,
            "column": [
              "id",
              "buyer_name",
              "seller_name",
              "item_id",
              "city",
              "zone"
            ],
            "tableComment": "Test order table",
            "connection": [
              {
                "datasource": "rds_dataservice",
                "table": [
                  "rds_table"
                ]
              },
              {
                "datasource": "rds_workshop_log",
                "table": [
                  "rds_table"
                ]
              }
            ],
            "where": "",
            "splitPk": "id",
            "encoding": "UTF-8"
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "odps",
          "parameter": {},
          "name": "Writer",
          "category": "writer"
        },
        {
          "name": "Processor",
          "stepType": null,
          "category": "processor",
          "copies": 1,
          "parameter": {
            "nodes": [],
            "edges": [],
            "groups": [],
            "version": "2.0"
          }
        }
      ],
      "setting": {
        "executeMode": null,
        "errorLimit": {
          "record": ""
        },
        "speed": {
          "concurrent": 2,
          "throttle": false
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }

Reader script parameters

Script parameter name

Description

Required

Default value

datasource

Data source name. Code editor mode supports adding data sources, and the content filled in this configuration item must be consistent with the added data source name.

Yes

None

table

The name of the table to be synchronized. A data integration task can only read data from one table.

Advanced usage examples of table for configuring ranges:

  • You can configure reading from sharded tables by configuring intervals, for example 'table_[0-99]' means reading from 'table_0', 'table_1', 'table_2' up to 'table_99'.

  • If your table numeric suffixes have consistent lengths, such as 'table_000', 'table_001', 'table_002' up to 'table_999', you can configure it as '"table":["table_00[0-9]","table_0[10-99]","table_[100-999]"]'.

Note

The task will read all matched tables and specifically read the columns specified in the column configuration item from these tables. If the table does not exist, or if the columns being read do not exist, the task will fail.

Yes

None

column

The collection of column names in the configured table that need to be synchronized, described using a JSON array for field information. By default, all columns are configured, for example [*].

  • Supports column cropping: You can select specific columns for export.

  • Supports column reordering: Columns can be exported in an order different from the table schema information.

  • Supports constant configuration: You need to follow the MySQL SQL syntax format, for example ["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"].

    • id is a regular column name.

    • table is a column name containing a reserved word.

    • 1 is an integer constant.

    • 'mingya.wmy' is a string constant (note that it needs to be enclosed in single quotes).

    • Regarding null:

      • " " represents empty.

      • null represents null.

      • 'null' represents the string "null".

    • to_char(a+1) is a string length calculation function.

    • 2.3 is a floating-point number.

    • true is a Boolean value.

  • column must explicitly specify the collection of columns to be synchronized and cannot be empty.

Yes

None

splitPk

When MySQL Reader extracts data, if splitPk is specified, it means you want to use the field represented by splitPk for data sharding. Data synchronization will therefore start concurrent tasks for data synchronization, improving data synchronization efficiency.

  • It is recommended to use the table's primary key for splitPk because the table's primary key is usually more evenly distributed, so the shards created are less likely to have data hotspots.

  • Currently, splitPk only supports integer data splitting and does not support string, floating-point, date, or other types. If you specify other unsupported types, the splitPk function will be ignored, and a single channel will be used for synchronization.

  • If you do not fill in splitPk, including not providing splitPk or setting splitPk to empty, data synchronization will be considered as using a single channel to synchronize the table data.

No

None

where

Filter condition. In actual business scenarios, you often want to synchronize data for the current day, specifying the where condition as gmt_create>$bizdate.

  • The where condition can effectively perform incremental business synchronization. If you do not fill in the where statement, including not providing the key or value for where, data synchronization will be considered as synchronizing all data.

  • You cannot specify the where condition as limit 10, as this does not comply with the MySQL SQL WHERE clause constraints.

No

None

querySql (advanced mode, this parameter configuration is not supported in codeless UI mode)

In some business scenarios, the where configuration item is not sufficient to describe the filtering conditions. You can use this configuration item to customize filtering SQL. After configuring this item, the data synchronization system will ignore the tables, columns, and splitPk configuration items and directly use the content configured in this item to filter data. For example, if you need to synchronize data after joining multiple tables, use select a,b from table_a join table_b on table_a.id = table_b.id. When you configure querySql, MySQL Reader directly ignores the table, column, where, and splitPk configuration conditions. The querySql has higher priority than table, column, where, and splitPk options. datasource parses username, password, and other information from it.

Note

querySql is case-sensitive. For example, writing it as querysql will not work.

No

None

useSpecialSecret

When using multiple source data sources, whether to use each data source's password. Values include the following:

  • true

  • false

If you have configured multiple source data sources and the username and password for each data source are not consistent, you can set to use each data source's password, which means setting this parameter to true.

No

false

Writer script demo

{
  "type": "job",
  "version": "2.0",//Version number.
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mysql",//Plugin name.
      "parameter": {
        "postSql": [],//Preparation statement after import.
        "datasource": "",//Data source.
        "column": [//Column names.
          "id",
          "value"
        ],
        "writeMode": "insert",//Write mode, you can set it to insert, replace, or update.
        "batchSize": 1024,//Batch size for one-time batch submission of records.
        "table": "",//Table name.
        "nullMode": "skipNull",//NULL value handling strategy.
        "skipNullColumn": [//Columns that need to skip NULL values.
          "id",
          "value"
        ],
        "preSql": [
          "delete from XXX;"//Preparation statement before import.
        ]
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {//Error record count.
      "record": "0"
    },
    "speed": {
      "throttle": true,//When throttle value is false, mbps parameter is not effective, meaning no flow control; when throttle value is true, it means flow control.
      "concurrent": 1,//Job concurrency.
      "mbps": "12"//Flow control, controls the highest rate of synchronization to prevent excessive read/write pressure on upstream/downstream databases, here 1mbps = 1MB/s.
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Writer script parameters

Script parameter name

Description

Required

Default value

datasource

Data source name. Code editor mode supports adding data sources, and the content filled in this configuration item must be consistent with the added data source name.

Yes

None

table

The name of the table to be synchronized.

Yes

None

writeMode

Select the import mode, which can support insert into, on duplicate key update, and replace into methods:

  • insert into: When there is a primary key/unique index conflict, the conflicting row will not be written, shown as dirty data.

    If you configure tasks using code editor mode, please set writeMode to insert.

  • on duplicate key update: When there is no primary key/unique index conflict, the behavior is consistent with insert into. In case of conflict, it will replace the values of already specified fields with the new row and write the data to MySQL.

    If you configure tasks using code editor mode, please set writeMode to update.

  • replace into: When there is no primary key/unique index conflict, the behavior is consistent with insert into. In case of conflict, it will first delete the original row, then insert the new row. That is, the new row will replace all fields of the original row.

    If you configure tasks using code editor mode, please set writeMode to replace.

No

insert

nullMode

NULL value handling strategy, value range:

  • writeNull: When the source field data is NULL, write NULL value to the target field.

  • skipNull: When the source field data is NULL, the target does not write this field. If the target has a default value definition, that column value will use the target's default value. If the target has no default value definition, that column value will be NULL. When configuring this parameter, you also need to configure the skipNullColumn parameter.

Important

When configured as skipNull, the task will dynamically splice the SQL statements for writing data to support target default values, which will increase the number of FLUSH operations and reduce synchronization speed. In the worst case, it will FLUSH once per data record.

No

writeNull

skipNullColumn

When nullMode is configured as skipNull, the columns configured in this parameter will not be forcibly written as NULL, and will preferentially use the default value of the corresponding column itself.

Configuration format: ["c1","c2",...], where c1, c2 need to be configured as a subset of the column parameter.

No

Default is all columns configured for this task.

column

Fields in the target table that need to be written with data, separated by English commas, for example "column":["id","name","age"]. To write all columns in sequence, use an asterisk (*), for example "column":["*"].

Yes

None

preSql

SQL statement to execute before the data synchronization task. Currently, codeless UI mode only allows executing one SQL statement, while code editor mode can support multiple SQL statements. For example, clearing old data from the table before execution (truncate table tablename).

Note

When there are multiple SQL statements, transactions are not supported.

No

None

postSql

SQL statement to execute after the data synchronization task. Currently, codeless UI mode only allows executing one SQL statement, while code editor mode can support multiple SQL statements. For example, adding a timestamp ALTER TABLE tablename ADD colname TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP.

Note

When there are multiple SQL statements, transactions are not supported.

No

None

batchSize

The size of records for one-time batch submission, which can greatly reduce the number of network interactions between the data synchronization system and MySQL, and improve overall throughput. If this value is set too high, it may cause OOM exceptions in the data synchronization running process.

No

256

updateColumn

When writeMode is configured as update, the fields to be updated when a primary key/unique index conflict occurs. Fields are separated by English commas, for example "updateColumn":["name","age"].

No

None