A MaxCompute data source acts as a data hub for reading data from and writing data to MaxCompute. It connects to MaxCompute through a Tunnel Endpoint address and transfers data using the DownloadTable operation.
For MaxCompute data sources created after December 11, 2023, cross-region synchronization via the Tunnel endpoint is not supported. If your DataWorks workspace and the target MaxCompute project are in different regions, connect the two networks using Cloud Enterprise Network (CEN) before synchronizing data.
Supported sync modes
| Sync mode | Supported? |
|---|---|
| Batch read | Yes |
| Batch write | Yes |
| Real-time write | Yes |
Prerequisites
Before you begin, make sure that you have:
-
Access to the MaxCompute client. For setup instructions, see Connect by using a local client (odpscmd).
-
A MaxCompute account with the Project Owner role or equivalent permissions. For permission details, see Role planning.
Enable ACID semantics
Run the following command as a Project Owner to enable ACID properties. For background, see ACID semantics.
setproject odps.sql.acid.table.enable=true;
Enable data types 2.0 (optional)
To use the TIMESTAMP data type, enable MaxCompute data types 2.0 by running the following command as a Project Owner:
setproject odps.sql.type.system.odps2=true;
Grant account permissions (optional)
When you associate a MaxCompute computing resource with a workspace, a MaxCompute data source is created in DataWorks by default. This data source is available for data synchronization within the current workspace.
To use this data source from a different workspace, grant the access account in that workspace the necessary permissions on the original MaxCompute project. For details, see Cross-account authorization (MaxCompute and Hologres).
Add a MaxCompute data source
To develop a data synchronization task, first add the MaxCompute project as a data source in DataWorks. See Associate a MaxCompute computing resource.
Workspaces in standard mode support data source isolation, so you can add separate data sources for development and production environments. See Isolate a data source in the development and production environments.
If a data source named odps_first in your workspace was not manually created on the data source page, it was created automatically for the first MaxCompute engine associated with the workspace before the new version of data sources was released. To confirm which MaxCompute project this data source uses, check the data source configuration page. See Data source management.
Configure a data synchronization task
Single-table batch synchronization
For the full parameter reference and a code example, see Appendix: Code and parameters.
Single-table real-time synchronization
Full-database synchronization
Limitations
Batch read
| Limitation | Details |
|---|---|
| Supported table types | Partitioned tables and non-partitioned tables. Virtual views and external tables are not supported. |
| Partition column mapping | Partition columns cannot be directly mapped. To include partition column values, add a custom column and manually enter the partition name. To write partition column values to a destination table, add them as custom columns. |
| Scheduling parameters | Use scheduling parameters to substitute partition values automatically based on scheduling time. For example, for a table t0 with columns id and name, a level-1 partition pt, and a level-2 partition ds, set the partition values to pt=${Scheduling Parameter} and ds=hangzhou. Then map the id and name columns. |
| Data filtering | Supported via WHERE clauses. |
Batch write
| Limitation | Details |
|---|---|
| VARCHAR with null values | When data contains null values, the VARCHAR data type is not supported. |
| DeltaTable destinations | Expand Advanced Configurations and set Visible After Sync to Yes. Without this setting, tasks fail when concurrency is greater than 1. |
| External tables | Writing to external tables is not supported. |
| Unmapped destination columns | If a destination column has no source column mapped to it, the column is set to null after synchronization, overriding any default value set during table creation. |
Real-time write
| Limitation | Details |
|---|---|
| Resource groups | Supports serverless resource groups. |
| Primary key requirement | Destination tables must have a primary key. |
| External tables | Writing to external tables is not supported. |
Default data source (odps_first) and temporary Access Keys |
When synchronizing to the default MaxCompute data source (odps_first), the platform uses a temporary Access Key (AK) by default. This AK expires after 7 days and causes the task to fail. The platform automatically restarts the task when it detects an AK expiry failure. If monitoring alerts are configured, you receive an alert when this happens. |
| One-click real-time sync data availability | Historical data is available for query on the day of configuration. Incremental data becomes available after it is merged on the next day. |
| One-click real-time sync partition lifecycle | A full partition is created each day. The automatically created MaxCompute table has a default lifecycle of 30 days. To change this, click the MaxCompute table name during task configuration and modify the lifecycle. |
| One-click real-time sync in instance mode | The exclusive resource group for Data Integration must have a minimum specification of 8-core 16 GB. |
| Cross-region restriction for custom data sources | Custom MaxCompute data sources must be in the same region as the current workspace. For a cross-region custom data source, the connectivity test may succeed, but the synchronization task fails during table creation with an engine not found error. |
| Full-database synchronization table support | Standard tables support only the incremental log mode of real-time full-database synchronization and one-click real-time full-incremental synchronization. Delta Tables support both real-time full-database synchronization and one-click real-time full-incremental synchronization. |
| SLA advisory | Data Integration uses the MaxCompute engine's data synchronization channel for data uploads and downloads. For more information about the service level agreement (SLA) of this channel, see Data upload scenarios and tools. Evaluate your data synchronization solution based on the channel's SLA. |
When using a custom MaxCompute data source, the DataWorks project must still be associated with a MaxCompute engine. Without this association, you cannot create MaxCompute SQL nodes, which causes the full synchronization completion marker node to fail.
Supported data types
MaxCompute 1.0, 2.0, and Hive-compatible data types are all supported.
Data types 1.0
| Type | Batch read | Batch write | Real-time write |
|---|---|---|---|
| BIGINT | Supported | Supported | Supported |
| DOUBLE | Supported | Supported | Supported |
| DECIMAL | Supported | Supported | Supported |
| STRING | Supported | Supported | Supported |
| DATETIME | Supported | Supported | Supported |
| BOOLEAN | Supported | Supported | Supported |
| ARRAY | Supported | Supported | Supported |
| MAP | Supported | Supported | Supported |
| STRUCT | Supported | Supported | Supported |
Data types 2.0 and Hive-compatible data types
| Type | Batch read | Batch write | Real-time write |
|---|---|---|---|
| TINYINT | Supported | Supported | Supported |
| SMALLINT | Supported | Supported | Supported |
| INT | Supported | Supported | Supported |
| BIGINT | Supported | Supported | Supported |
| BINARY | Supported | Supported | Supported |
| FLOAT | Supported | Supported | Supported |
| DOUBLE | Supported | Supported | Supported |
| DECIMAL(precision,scale) | Supported | Supported | Supported |
| VARCHAR(n) | Supported | Supported | Supported |
| CHAR(n) | Not supported | Supported | Supported |
| STRING | Supported | Supported | Supported |
| DATE | Supported | Supported | Supported |
| DATETIME | Supported | Supported | Supported |
| TIMESTAMP | Supported | Supported | Supported |
| BOOLEAN | Supported | Supported | Supported |
| ARRAY | Supported | Supported | Supported |
| MAP | Supported | Supported | Supported |
| STRUCT | Supported | Supported | Supported |
Data type mapping
The following table describes how MaxCompute Reader maps MaxCompute types to Data Integration types.
| Category | Data Integration type | MaxCompute types |
|---|---|---|
| Integer | LONG | BIGINT, INT, TINYINT, SMALLINT |
| Boolean | BOOLEAN | BOOLEAN |
| Date and time | DATE | DATETIME, TIMESTAMP, DATE |
| Floating-point | DOUBLE | FLOAT, DOUBLE, DECIMAL |
| Binary | BYTES | BINARY |
| Complex | STRING | ARRAY, MAP, STRUCT |
If a data type conversion fails or data cannot be written to the destination, the record is classified as dirty data. Use this with a dirty data threshold to control task behavior.
FAQ
-
How do I synchronize partition columns when reading data from a MaxCompute (ODPS) table?
-
How do I synchronize data from multiple partitions when reading data from a MaxCompute (ODPS) table?
-
How does MaxCompute implement column filtering, reordering, and null padding?
-
What do I need to know about MaxCompute partition configuration?
For more frequently asked questions, see FAQ about Data Integration.
Appendix: Code and parameters
Configure a batch synchronization task by using the code editor
The following sections describe the reader and writer parameters for configuring a batch synchronization task in the code editor. For the code editor setup, see Use the code editor.
Reader script example
Remove all comments from the code before running the task.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "odps",
"parameter": {
"partition": [],
"isCompress": false,
"datasource": "",
"column": [
"id"
],
"where": "",
"enableWhere": false,
"table": ""
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
To specify a Tunnel Endpoint manually, replace "datasource": "", in the example above with the following parameters:
"accessId": "<yourAccessKeyId>",
"accessKey": "<yourAccessKeySecret>",
"endpoint": "http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer": "http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer": "http://dt.eu-central-1.maxcompute.aliyun.com",
"project": "<yourProjectName>",
Reader parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
datasource |
The data source name. Must match the name of the data source added in the code editor. | Yes | None |
table |
The source table name. Not case-sensitive. | Yes | None |
partition |
The partition from which to read data. Required for partitioned tables; omit for non-partitioned tables. Supports Linux shell wildcards: * for zero or more characters, ? for any single character. By default, the task reports an error if the specified partition does not exist. To allow the task to succeed in this case, add "successOnNoPartition": true in the ODPS parameters. You can also use /*query*/ prefix to filter partitions by condition — for example, /*query*/ pt>=20170101 and pt<20170110. |
Partitioned tables: Yes. Non-partitioned tables: No. | None |
column |
The columns to read from the source table. Specify column names explicitly, for example ["id", "name", "age"]. Using ["*"] reads all columns in order, but is not recommended — if the table schema changes, column mismatches can cause incorrect results or task failure. To include a constant value, enclose it in single quotes: ["age", "name", "'1988-08-08 08:08:08'", "id"]. MaxCompute functions in column are only supported when enableWhere=true and where is not empty. |
Yes | None |
enableWhere |
Whether to filter data using a WHERE clause. | No | false |
where |
The WHERE clause condition for data filtering. | No | None |
Writer script example
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "",
"truncate": true,
"compress": false,
"datasource": "odps_first",
"column": [
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"table": ""
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
To specify a Tunnel Endpoint manually, replace "datasource": "", in the example above with the following parameters:
"accessId": "<yourAccessKeyId>",
"accessKey": "<yourAccessKeySecret>",
"endpoint": "http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer": "http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer": "http://dt.eu-central-1.maxcompute.aliyun.com",
"project": "<yourProjectName>",
Writer parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
datasource |
The data source name. Must match the name of the data source added in the code editor. | Yes | None |
table |
The destination table name. Not case-sensitive. Only one table can be specified. | Yes | None |
partition |
The partition to write data to. For partitioned tables, specify the partition down to the last level — for example, pt=20150101, type=1, biz=2 for a three-level partition. Omit for non-partitioned tables. MaxCompute Writer does not support data routing; data must be written to the lowest-level partition. |
Partitioned tables: Yes. Non-partitioned tables: No. | None |
column |
The columns to write. To write all columns, set this to ["*"]. To write a subset, list the column names: ["id", "name"]. Supports column filtering and reordering — unspecified columns are set to null. |
Yes | None |
truncate |
Whether to clear existing data before writing. Setting truncate: true ensures write idempotence: if a job fails and reruns, MaxCompute Writer clears the previous run's data before importing new data. Note that truncate is not an atomic operation — it uses MaxCompute SQL, which does not guarantee atomicity. Avoid running DDL operations on the same partition from multiple concurrent jobs, or create the partitions before starting concurrent jobs. |
Yes | None |
emptyAsNull |
Whether to convert empty strings to null before writing. | No | false |
consistencyCommit |
Controls when synchronized data becomes visible. true: data is visible only after the task completes successfully. If the data volume exceeds 1 TB, the task fails because MaxCompute supports a maximum of 300,000 blocks. false: partially synchronized data can be queried before the task finishes, but the visible portion is unpredictable. Downstream applications using this table must account for data incompleteness. |
No | false |