DataWorks provides MaxCompute Reader and MaxCompute Writer for you to read data from and write data to MaxCompute data sources.
Limits
Batch data read
MaxCompute Reader can read data from only partitioned and non-partitioned tables. It cannot read virtual views or read data from external tables.
DataWorks does not allow you to directly configure mappings between the partition fields in a MaxCompute partitioned table and fields in a destination table. If you want to read data from a partitioned table, you must specify information about a partition when you configure MaxCompute Reader.
For example, a partitioned table named t0 contains the id and name fields. The level-1 partition field is pt and the level-2 partition field is ds. If you want to read data from the partition pt=1,ds=hangzhou in the t0 table, you must specify pt=1,ds=hangzhou when you configure MaxCompute Reader. This way, you can configure mappings for the id and name fields later.
MaxCompute Reader cannot filter data.
If you want MaxCompute Reader to read only specific data during data synchronization, you must create a table, write the data to the table, and then enable MaxCompute Reader to read the data from the table.
Batch data write
If the data in the source contains a NULL value, MaxCompute Writer cannot convert the data to the VARCHAR type.
Real-time data write
You can use only exclusive resource groups for Data Integration to run real-time synchronization tasks.
You can run a real-time synchronization task to synchronize data only from a PolarDB, Oracle, or MySQL data source to MaxCompute.
A real-time synchronization task cannot be used to synchronize data from a table that has no primary key.
If you use the default MaxCompute data source
odps_first
as the destination of the real-time synchronization task, the temporary AccessKey pair is used for data synchronization by default. The temporary AccessKey pair is valid for only seven days and is expired seven days later. In this case, the real-time synchronization task fails. If the system detects that the temporary AccessKey pair is expired, the system restarts the real-time synchronization task. If a related alert rule is configured for the synchronization task, the system reports an error.On the day on which you configure a one-click real-time synchronization task used to synchronize data to MaxCompute, you can query only the historical full data. You can query the incremental data only after full data and incremental data are merged on the next day.
A one-click real-time synchronization task used to synchronize data to MaxCompute generates a partition for storing full data in a MaxCompute table every day. To prevent data from occupying excessive storage resources, the default lifecycle of a MaxCompute table that is automatically created is 30 days. If the lifecycle does not meet your business requirements, you can click the name of a MaxCompute table to modify the lifecycle of the table when you configure the related synchronization task.
Data Integration uses the channels that are provided by MaxCompute to upload and download data. You can select a channel based on your business requirements. For more information about the types of channels that are provided by MaxCompute, see Data upload scenarios and tools.
If you want to run a real-time synchronization task to synchronize data to MaxCompute in whole-instance mode, the specifications of the exclusive resource group for Data Integration that you use to run the synchronization task must be at least 8 vCPUs and 16 GiB of memory.
You can use only a self-managed MaxCompute data source that resides in the same region as your workspace. If you use a self-managed MaxCompute data source that resides in a different region from your workspace, the data source can be connected to the resource group that you use. However, an error indicating that the compute engine instance does not exist is reported when the system creates a MaxCompute table during the running of the synchronization task.
NoteIf you use a self-managed MaxCompute data source, you must associate a MaxCompute compute engine with your DataWorks workspace as a compute engine instance. Otherwise, an ODPS SQL node cannot be created. As a result, a node that is used to mark the end of full synchronization cannot be created.
Precautions
If you use a MaxCompute data source as the destination of a synchronization task and a column in a destination table has no mapped source column, the values in the column in the destination table are /N after the synchronization task finishes running. In addition, the values in the column in the destination table are still /N even if default values are specified for the column when the system creates the destination table.
Supported data types
The following data type editions are supported: MaxCompute V1.0 data type edition, MaxCompute V2.0 data type edition, and Hive-compatible data type edition. This section provides the support status of data types of each edition.
MaxCompute V1.0 data type edition
Data type | MaxCompute Reader for batch data read | MaxCompute Writer for batch data write | MaxCompute Writer for real-time data write |
BIGINT | Supported | Supported | Supported |
DOUBLE | Supported | Supported | Supported |
DECIMAL | Supported | Supported | Supported |
STRING | Supported | Supported | Supported |
DATETIME | Supported | Supported | Supported |
BOOLEAN | Supported | Supported | Supported |
ARRAY | Supported | Supported | Supported |
MAP | Supported | Supported | Supported |
STRUCT | Supported | Supported | Supported |
MaxCompute V2.0 data type edition and Hive-compatible data type edition
Data type | MaxCompute Reader for batch data read | MaxCompute Writer for batch data write | MaxCompute Writer for real-time data write |
TINYINT | Supported | Supported | Supported |
SMALLINT | Supported | Supported | Supported |
INT | Supported | Supported | Supported |
BIGINT | Supported | Supported | Supported |
BINARY | Supported | Supported | Supported |
FLOAT | Supported | Supported | Supported |
DOUBLE | Supported | Supported | Supported |
DECIMAL(pecision,scale) | Supported | Supported | Supported |
VARCHAR(n) | Supported | Supported | Supported |
CHAR(n) | Not supported | Supported | Supported |
STRING | Supported | Supported | Supported |
DATE | Supported | Supported | Supported |
DATETIME | Supported | Supported | Supported |
TIMESTAMP | Supported | Supported | Supported |
BOOLEAN | Supported | Supported | Supported |
ARRAY | Supported | Supported | Supported |
MAP | Supported | Supported | Supported |
STRUCT | Supported | Supported | Supported |
Data type mappings
The following table lists the data type mappings based on which MaxCompute Reader converts data types.
Category | Data Integration data type | MaxCompute data type |
Integer | LONG | BIGINT, INT, TINYINT, and SMALLINT |
Boolean | BOOLEAN | BOOLEAN |
Date and time | DATE | DATETIME, TIMESTAMP, and DATE |
Floating point | DOUBLE | FLOAT, DOUBLE, and DECIMAL |
Binary | BYTES | BINARY |
Complex | STRING | ARRAY, MAP, and STRUCT |
If the data conversion fails or the data fails to be written to the destination, the data is regarded as dirty data. You can specify the maximum number of dirty data records allowed.
Prepare a MaxCompute environment before data synchronization
Before you read data from or write data to a MaxCompute table, you can determine whether to enable related properties based on your business requirements.
Connect to MaxCompute and obtain the permissions to perform project-level configurations
Start the MaxCompute client. For more information, see MaxCompute client (odpscmd).
Obtain the permissions to perform project-level configurations. Make sure that your account has the required permissions. You can use the account of a project owner to perform related operations. For more information, see Role planning.
Enable the ACID semantics
You can use the account of a project owner to run the following command on the MaxCompute client to enable the atomicity, consistency, isolation, and durability (ACID) semantics. For more information about the ACID semantics, see ACID semantics.
setproject odps.sql.acid.table.enable=true;
(Optional) Enable the MaxCompute V2.0 data type edition
If you want to use the TIMESTAMP data type in MaxCompute V2.0, use the account of a project owner to run the following command to enable the MaxCompute V2.0 data type edition:
setproject odps.sql.type.system.odps2=true;
(Optional) Create an account
After you associate a MaxCompute compute engine with a workspace as a compute engine instance, DataWorks generates a default MaxCompute data source in the workspace. You can use the default MaxCompute data source for data synchronization in the current workspace. If you want to access data in the default MaxCompute data source of the current workspace from another workspace, you must create an AccessKey pair. This way, you can access data in the compute engine instance by using the AccessKey pair when you create a MaxCompute data source or use the default MaxCompute data source in another workspace.
Create an AccessKey pair. For more information, see Create an Alibaba Cloud account.
Create a MaxCompute data source. For more information, see Add a MaxCompute data source.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Associate a MaxCompute compute engine with a DataWorks workspace and add a data source
Before you configure a synchronization task to synchronize data from or to MaxCompute, you must associate a MaxCompute compute engine with a DataWorks workspace and add a MaxCompute data source to DataWorks. When you associate a MaxCompute compute engine with a workspace, DataWorks automatically generates a default MaxCompute data source based on the compute engine.
When you associate a MaxCompute compute engine with a workspace, DataWorks automatically generates a MaxCompute data source on the Data Source page in the DataWorks console based on the configurations of the compute engine. You can also add another MaxCompute project to DataWorks as a data source. For more information about the configuration procedure, see Add and manage data sources.
When you associate a MaxCompute compute engine with a workspace for the first time, DataWorks generates a default MaxCompute data source named odps_first on the Data Source page in the DataWorks console. If you associate other MaxCompute compute engines with the workspace subsequently, DataWorks generates MaxCompute data sources named in the format of 0_Region ID_Compute engine instance name.
The names of the MaxCompute projects based on which the default MaxCompute data sources are generated are the same as the names of the MaxCompute projects that are associated with the workspace as compute engine instances. You can go to the Workspace page in SettingCenter in the DataWorks console to view the information of the MaxCompute compute engine instances that are associated with a workspace. If you want to modify the information of a MaxCompute compute engine instance that is associated with a workspace, you must make sure that no tasks are running on the compute engine instance before you perform the operation. The tasks include synchronization tasks of Data Integration, tasks of DataStudio, and other tasks that are related to DataWorks. For information about how to view the information of a MaxCompute compute engine instance, see Associate a MaxCompute compute engine with a workspace.
Workspaces in standard mode allow you to isolate data sources. You can separately add data sources for the development and production environments to isolate the data sources. This keeps your data secure. For more information, see Isolate a data source in the development and production environments.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Configure a real-time synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Create a real-time synchronization task to synchronize incremental data from a single table and Configure a real-time synchronization task in DataStudio.
Configure synchronization settings to implement batch synchronization of all data in a database, real-time synchronization of full data or incremental data in a database, and real-time synchronization of data from tables in sharded databases
For more information about the configuration procedure, see Configure a synchronization task in Data Integration.
FAQ
How do I read data from partition key columns in a MaxCompute table?
How do I read data from multiple partitions in a MaxCompute table?
For information about other common issues in Data Integration, see FAQ about Data Integration.
Appendix: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for MaxCompute Reader
You must delete the comments from the following code before you run the code.
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",// The plug-in name.
"parameter":{
"partition":[],// The names of the partitions from which you want to read data.
"isCompress":false,// Specifies whether to enable compression.
"datasource":"",// The name of the data source.
"column":[// The names of the columns.
"id"
],
"emptyAsNull":true,
"table":""// The name of the table.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1, // The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
If you want to specify the Tunnel endpoint of MaxCompute, you can use the code editor to configure the data source. To configure the data source, replace "datasource":"",
in the preceding code with the parameters of the data source. The following code provides an example:
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****",
Parameters in code for MaxCompute Reader
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table from which you want to read data. The name is not case-sensitive. | Yes | No default value |
partition | The names of the partitions from which you want to read data.
For example, the partitioned table test contains four partitions: pt=1,ds=hangzhou, pt=1,ds=shanghai, pt=2,ds=hangzhou, and pt=2,ds=beijing. In this case, you can set the partition parameter based on the following instructions:
You can also specify other conditions to read data from partitions based on your business requirements.
Note MaxCompute Reader processes the content after | Required only for partitioned tables | No default value |
column | The names of the columns from which you want to read data. For example, the test table contains the id, name, and age columns.
| Yes | No default value |
Code for MaxCompute Writer
Sample code:
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",// The plug-in name.
"parameter":{
"partition":"",// The name of the partitions to which you want to write data.
"truncate":true,// The write rule.
"compress":false,// Specifies whether to enable compression.
"datasource":"odps_first",// The name of the data source.
"column": [// The names of the columns.
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"emptyAsNull":false,// Specifies whether to convert empty strings to null.
"table":""// The name of the table.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1, // The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
If you want to specify the Tunnel endpoint, you can configure the data source in the code editor. To configure the data source, replace "datasource":"",
in the preceding code with detailed parameters of the data source. Example:
"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********",
Parameters in Code for MaxCompute Writer
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table to which you want to write data. The name is not case-sensitive. You can specify only one table. | Yes | No default value |
partition | The partition to which data is written. The last-level partition must be specified. For example, if you want to write data to a table with three-level partitions, set the partition parameter to a value that contains the third-level partition information, such as
| Required only for partitioned tables | No default value |
column | The names of the columns to which you want to write data. To write data to all the columns in the destination table, set the value to
| Yes | No default value |
truncate | To ensure the idempotence of write operations, set the MaxCompute Writer uses MaxCompute SQL to delete data. MaxCompute SQL cannot ensure data atomicity. Therefore, the TRUNCATE operation is not an atomic operation. Conflicts may occur when multiple synchronization tasks delete data from the same table or partition in parallel. To prevent this issue, we recommend that you do not execute multiple DDL statements to write data to the same partition at the same time. You can create different partitions for synchronization tasks that need to run in parallel. | Yes | No default value |