All Products
Search
Document Center

DataWorks:MaxCompute data source

Last Updated:Feb 20, 2024

DataWorks provides MaxCompute Reader and MaxCompute Writer for you to read data from and write data to MaxCompute data sources.

Limits

Note

DataWorks allows you to use a Tunnel endpoint to access the Tunnel service of a MaxCompute project that is added to DataWorks as a data source. This way, you can use the Tunnel service to upload data to or download data from the MaxCompute project to synchronize the data.

For a MaxCompute data source that is added after December 11, 2023, if the workspace to which the data source is added resides in a different region from the MaxCompute project that is added as the data source, you cannot use a Tunnel endpoint to synchronize data from the MaxCompute project. If you want to synchronize data from the MaxCompute project, you must purchase a Cloud Enterprise Network (CEN) instance to establish a network connection between DataWorks and the MaxCompute project. After the network connection is established, you can synchronize data from the MaxCompute project across regions. For information about CEN and operations related to CEN, see What is CEN?

Batch data read

  • MaxCompute Reader can read data from only partitioned and non-partitioned tables. It cannot read virtual views or read data from external tables.

  • DataWorks does not allow you to directly configure mappings between the partition fields in a MaxCompute partitioned table and fields in a destination table. If you want to read data from a partitioned table, you must specify information about a partition when you configure MaxCompute Reader.

    For example, a partitioned table named t0 contains the id and name fields. The level-1 partition field is pt and the level-2 partition field is ds. If you want to read data from the partition pt=1,ds=hangzhou in the t0 table, you must specify pt=1,ds=hangzhou when you configure MaxCompute Reader. This way, you can configure mappings for the id and name fields later.

  • MaxCompute Reader cannot filter data.

    If you want MaxCompute Reader to read only specific data during data synchronization, you must create a table, write the data to the table, and then enable MaxCompute Reader to read the data from the table.

Batch data write

If the data in the source contains a NULL value, MaxCompute Writer cannot convert the data to the VARCHAR type.

Real-time data write

  • You can use only exclusive resource groups for Data Integration to run real-time synchronization tasks.

  • You can run a real-time synchronization task to synchronize data only from a PolarDB, Oracle, or MySQL data source to MaxCompute.

  • A real-time synchronization task cannot be used to synchronize data from a table that has no primary key.

  • If you use the default MaxCompute data source odps_first as the destination of the real-time synchronization task, the temporary AccessKey pair is used for data synchronization by default. The temporary AccessKey pair is valid for only seven days and expires seven days later. In this case, the real-time synchronization task fails. If the system detects that the temporary AccessKey pair is expired, the system restarts the real-time synchronization task. If a related alert rule is configured for the synchronization task, the system reports an error.

  • On the day on which you configure a one-click real-time synchronization task used to synchronize data to MaxCompute, you can query only the historical full data. You can query the incremental data only after full data and incremental data are merged on the next day.

  • A one-click real-time synchronization task used to synchronize data to MaxCompute generates a partition for storing full data in a MaxCompute table every day. To prevent data from occupying excessive storage resources, the default lifecycle of a MaxCompute table that is automatically created is 30 days. If the lifecycle does not meet your business requirements, you can click the name of a MaxCompute table to modify the lifecycle of the table when you configure the related synchronization task.

  • Data Integration uses the channels that are provided by MaxCompute to upload and download data. You can select a channel based on your business requirements. For more information about the types of channels that are provided by MaxCompute, see Data upload scenarios and tools.

  • If you want to run a real-time synchronization task to synchronize data to MaxCompute in whole-instance mode, the specifications of the exclusive resource group for Data Integration that you use to run the synchronization task must be at least 8 vCPUs and 16 GiB of memory.

  • You can use only a self-managed MaxCompute data source that resides in the same region as your workspace. If you use a self-managed MaxCompute data source that resides in a different region from your workspace, the data source can be connected to the resource group that you use. However, an error indicating that the compute engine instance does not exist is reported when the system creates a MaxCompute table during the running of the synchronization task.

    Note

    If you use a self-managed MaxCompute data source, you must associate a MaxCompute compute engine with your DataWorks workspace as a compute engine instance. Otherwise, an ODPS SQL node cannot be created. As a result, a node that is used to mark the end of full synchronization cannot be created.

Precautions

If you use a MaxCompute data source as the destination of a synchronization task and a column in a destination MaxCompute table has no mapped source column, the values in the column in the destination MaxCompute table are /N after the synchronization task finishes running. In addition, the values in the column in the destination MaxCompute table are still /N even if default values are specified for the column when the system creates the destination MaxCompute table.

Supported data types

The following data type editions are supported: MaxCompute V1.0 data type edition, MaxCompute V2.0 data type edition, and Hive-compatible data type edition. This section provides the support status of data types of each edition.

MaxCompute V1.0 data type edition

Data type

MaxCompute Reader for batch data read

MaxCompute Writer for batch data write

MaxCompute Writer for real-time data write

BIGINT

Supported

Supported

Supported

DOUBLE

Supported

Supported

Supported

DECIMAL

Supported

Supported

Supported

STRING

Supported

Supported

Supported

DATETIME

Supported

Supported

Supported

BOOLEAN

Supported

Supported

Supported

ARRAY

Supported

Supported

Supported

MAP

Supported

Supported

Supported

STRUCT

Supported

Supported

Supported

MaxCompute V2.0 data type edition and Hive-compatible data type edition

Data type

MaxCompute Reader for batch data read

MaxCompute Writer for batch data write

MaxCompute Writer for real-time data write

TINYINT

Supported

Supported

Supported

SMALLINT

Supported

Supported

Supported

INT

Supported

Supported

Supported

BIGINT

Supported

Supported

Supported

BINARY

Supported

Supported

Supported

FLOAT

Supported

Supported

Supported

DOUBLE

Supported

Supported

Supported

DECIMAL(pecision,scale)

Supported

Supported

Supported

VARCHAR(n)

Supported

Supported

Supported

CHAR(n)

Not supported

Supported

Supported

STRING

Supported

Supported

Supported

DATE

Supported

Supported

Supported

DATETIME

Supported

Supported

Supported

TIMESTAMP

Supported

Supported

Supported

BOOLEAN

Supported

Supported

Supported

ARRAY

Supported

Supported

Supported

MAP

Supported

Supported

Supported

STRUCT

Supported

Supported

Supported

Data type mappings

The following table lists the data type mappings based on which MaxCompute Reader converts data types.

Category

Data Integration data type

MaxCompute data type

Integer

LONG

BIGINT, INT, TINYINT, and SMALLINT

Boolean

BOOLEAN

BOOLEAN

Date and time

DATE

DATETIME, TIMESTAMP, and DATE

Floating point

DOUBLE

FLOAT, DOUBLE, and DECIMAL

Binary

BYTES

BINARY

Complex

STRING

ARRAY, MAP, and STRUCT

Important

If the data conversion fails or the data fails to be written to the destination, the data is regarded as dirty data. You can specify the maximum number of dirty data records allowed.

Prepare a MaxCompute environment before data synchronization

Before you read data from or write data to a MaxCompute table, you can determine whether to enable related properties based on your business requirements.

Connect to MaxCompute and obtain the permissions to perform project-level configurations

  • Start the MaxCompute client. For more information, see MaxCompute client (odpscmd).

  • Obtain the permissions to perform project-level configurations. Make sure that your account has the required permissions. You can use the account of a project owner to perform related operations. For more information, see Role planning.

Enable the ACID semantics

You can use the account of a project owner to run the following command on the MaxCompute client to enable the atomicity, consistency, isolation, and durability (ACID) semantics. For more information about the ACID semantics, see ACID semantics.

setproject odps.sql.acid.table.enable=true;

(Optional) Enable the MaxCompute V2.0 data type edition

If you want to use the TIMESTAMP data type in MaxCompute V2.0, use the account of a project owner to run the following command to enable the MaxCompute V2.0 data type edition:

setproject odps.sql.type.system.odps2=true;

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.

Add a MaxCompute data source

Before you develop a synchronization task for a MaxCompute data source, you must add a MaxCompute project to the desired workspace as a data source. For more information about how to add a MaxCompute data source, see Add a MaxCompute data source.

Note
  • Workspaces in standard mode allow you to isolate data sources. You can separately add data sources for the development and production environments to isolate the data sources. This keeps your data secure. For more information, see Isolate a data source in the development and production environments.

  • Before Alibaba Cloud releases a new version of data sources, the system automatically generates a MaxCompute data source named odps_first based on the first MaxCompute project that you associate with your workspace as a compute engine instance. If you select the MaxCompute data source when you configure a synchronization task, the synchronization task reads data from or writes data to the MaxCompute project based on which the MaxCompute data source is generated.

    You can go to the Data Source page in SettingCenter in the DataWorks console to view the name of the MaxCompute project. For more information, see Manage data sources.

Configure a batch synchronization task to synchronize data of a single table

Configure a real-time synchronization task to synchronize data of a single table

For more information about the configuration procedure, see Create a real-time synchronization node to synchronize incremental data from a single table and Configure a real-time synchronization task in DataStudio.

Configure a synchronization task to synchronize all data in a database

For information about how to configure a synchronization task to implement batch synchronization of all data in a database, one-time full synchronization and real-time incremental synchronization of data in a database, or real-time synchronization of data from tables in sharded databases, see Configure a synchronization task in Data Integration.

FAQ

For information about other common issues in Data Integration, see FAQ about Data Integration.

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Code for MaxCompute Reader

Important

You must delete the comments from the following code before you run the code.

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"odps",// The plug-in name. 
            "parameter":{
                "partition":[],// The names of the partitions from which you want to read data. 
                "isCompress":false,// Specifies whether to enable compression. 
                "datasource":"",// The name of the data source. 
                "column":[// The names of the columns. 
                    "id"
                ],
                "emptyAsNull":true,
                "table":""// The name of the table from which you want to read data. 
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1, // The maximum number of parallel threads. 
            "mbps":"12",// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

If you want to specify a Tunnel endpoint of MaxCompute, you can use the code editor to configure the data source. To configure the data source, replace "datasource":"", in the preceding code with the parameters of the data source. The following code provides an example:

"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"*****", 

Parameters in code for MaxCompute Reader

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

table

The name of the table from which you want to read data. The name is not case-sensitive.

Yes

No default value

partition

The names of the partitions from which you want to read data.

  • You can use Linux Shell wildcards to specify the partitions. An asterisk (*) indicates multiple numbers of characters, and a question mark (?) indicates a single character.

  • The partitions that you specify must exist in the source table. Otherwise, the system reports an error for the synchronization task. If you want the synchronization task to be successfully run even if the partitions that you specify do not exist in the source table, use the code editor to modify the code of the synchronization task. In addition, you must add "successOnNoPartition": true to the configurations of MaxCompute Reader.

For example, the partitioned table test contains four partitions: pt=1,ds=hangzhou, pt=1,ds=shanghai, pt=2,ds=hangzhou, and pt=2,ds=beijing. In this case, you can configure the partition parameter based on the following instructions:

  • To read data from the partition pt=1,ds=hangzhou, specify "partition":"pt=1,ds=shanghai".

  • To read data from all the ds partitions in the pt=1 partition, specify "partition":"pt=1,ds=*".

  • To read data from all the partitions in the test table, specify "partition":"pt=*,ds=*".

You can also specify other conditions to read data from partitions based on your business requirements.

  • To read data from the partition that stores the largest amount of data, add /*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR) to the configurations of MaxCompute Reader.

  • To filter data based on filter conditions, add /*query*/ pt+Expression to the configurations of MaxCompute Reader. For example, /*query*/ pt>=20170101 and pt<20170110 indicates that you want to read the data generated from January 1, 2017 to January 9, 2017 from all the pt partitions in the test table.

Note

MaxCompute Reader processes the content after /*query*/ as a WHERE clause.

Required only for partitioned tables

No default value

column

The names of the columns from which you want to read data. For example, the test table contains the id, name, and age columns.

  • To read the data in the columns in sequence, specify "column":["id","name","age"] or "column":["*"].

    Note

    We recommend that you do not use "column":["*"]. If you specify "column":["*"], MaxCompute Reader reads data from all the columns in a source table in sequence. If the column order, data type, or number of columns is changed in the source table, the columns in the source and destination tables may be inconsistent. As a result, the data synchronization may fail, or the data synchronization results do not meet your expectations.

  • To read the name and id fields in sequence, enter "column":["name","id"].

  • You can add constant fields to the source table to establish mappings between the source table columns and destination table columns. In this case, when you specify the column parameter, you must enclose each constant field in single quotation marks ('). For example, if you add the constant field 1988-08-08 08:08:08 to the source table and want to read data from the age, name, 1988-08-08 08:08:08, and id columns in sequence, specify "column":["age","name","'1988-08-08 08:08:08'","id"].

    The single quotation marks (') are used to identify constant columns. When MaxCompute Reader reads data from the source table, the constant column values that are read by MaxCompute Reader exclude the single quotation marks (').

    Note
    • MaxCompute Reader does not use SELECT statements to read data. Therefore, you cannot specify function fields.

    • The column parameter must explicitly specify all the columns from which you want to read data. The parameter cannot be left empty.

Yes

No default value

Code for MaxCompute Writer

Sample code:

{
    "type":"job",
    "version":"2.0",// The version number. 
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"odps",// The plug-in name. 
            "parameter":{
                "partition":"",// The name of the partitions to which you want to write data. 
                "truncate":true,// The write rule. 
                "compress":false,// Specifies whether to enable compression. 
                "datasource":"odps_first",// The name of the data source. 
            "column": [// The names of the columns. 
                "id",
                "name",
                "age",
                "sex",
                "salary",
                "interest"
                ],
                "emptyAsNull":false,// Specifies whether to convert empty strings to null. 
                "table":""// The name of the table to which you want to write data. 
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1, // The maximum number of parallel threads. 
            "mbps":"12",// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

If you want to specify a Tunnel endpoint of MaxCompute, you can configure the data source in the code editor. To configure the data source, replace "datasource":"", in the preceding code with detailed parameters of the data source. Example:

"accessId":"<yourAccessKeyId>",
 "accessKey":"<yourAccessKeySecret>",
 "endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
 "odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"**********", 

Parameters in code for MaxCompute Writer

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

table

The name of the table to which you want to write data. The name is not case-sensitive. You can specify only one table.

Yes

No default value

partition

The partition to which data is written. The last-level partition must be specified. For example, if you want to write data to a table with three-level partitions, set the partition parameter to a value that contains the third-level partition information, such as pt=20150101, type=1, biz=2.

  • To write data to a non-partitioned table, do not configure this parameter. The data is directly written to the destination table.

  • MaxCompute Writer does not support data write operations based on the partition route. To write data to a partitioned table, make sure that the data is written to the lowest-level partition.

Required only for partitioned tables

No default value

column

The names of the columns to which you want to write data. To write data to all the columns in the destination table, set the value to ["*"]. To write data to specific columns in the destination table, set this parameter to the names of the columns and separate the names with commas (,), such as "column": ["id","name"].

  • MaxCompute Writer can filter columns and change the order of columns. For example, a MaxCompute table has three columns: a, b, and c. If you want to write data only to Column c and Column b, you can enter "column": ["c","b"]. During data synchronization, the values in Column a are automatically set to NULL.

  • The column parameter must explicitly specify all the columns to which you want to write data. The parameter cannot be left empty.

Yes

No default value

truncate

To ensure the idempotence of write operations, set the truncate parameter to true. If a failed synchronization task is rerun due to a write failure, MaxCompute Writer deletes the data that has been written to the destination table and writes the source data again. This ensures that the same data is written for each rerun.

MaxCompute Writer uses MaxCompute SQL to delete data. MaxCompute SQL cannot ensure data atomicity. Therefore, the TRUNCATE operation is not an atomic operation. Conflicts may occur when multiple synchronization tasks delete data from the same table or partition in parallel.

To prevent this issue, we recommend that you do not execute multiple DDL statements to write data to the same partition at the same time. You can create different partitions for synchronization tasks that need to run in parallel in advance.

Yes

No default value