This topic describes how to use API operations to create, configure, and manage a data synchronization node in Data Integration.

Prerequisites

Limits

  • In DataWorks, you can use API operations to create, configure, and manage only batch synchronization nodes.
  • You can use only the code editor to configure data synchronization nodes that are created by calling the CreateDISyncTask API operation. For more information, see Configure a batch synchronization node by using the code editor.
  • In DataWorks, you cannot use an API operation to create a workflow. You can use an API operation to create a synchronization node only in an existing workflow.

Preparations

  • Configure Maven dependencies.
    1. Open the pom.xml file of your Maven project and add aliyun-java-sdk-core to the file.
      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
      </dependency>
    2. Open the pom.xml file of the Maven project and add aliyun-java-sdk-dataworks-public to the file.
      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>3.3.18</version>
      </dependency>
    3. Open the pom.xml file of the Maven project and add credentials-java to the file. We recommend that you use credentials-java of the latest version.
      <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>credentials-java</artifactId>
          <version>0.2.11</version>
      </dependency>
      Note You can use the Alibaba Cloud Credentials tool to manage your AccessKey pair. For information about how to use the Alibaba Cloud Credentials tool, see Configure credentials.
  • Authenticate an account.
    Before you can use an API operation to create a data synchronization node, you must run the following code to authenticate the Alibaba Cloud account that you want to use to access DataWorks. If the account passes the authentication, you can perform subsequent operations. If the account fails the authentication, an error is returned, and you must resolve the issue based on the error.
    // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/en/alibaba-cloud-sdk-262060/latest/configure-credentials-378659                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

Overview

Procedure

  1. Create a data synchronization node in Data Integration.
    You can call the CreateDISyncTask operation to create a data synchronization node in Data Integration. The following code provides an example of the settings of some parameters. For more information, see CreateDISyncTask.
    public void createFile() throws ClientException{
            CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
            request.setProjectId(181565L);
            request.setTaskType("DI_OFFLINE");
            request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"Comment for the table same\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_first\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"Comment for the table same\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
            request.setTaskParam("{\"FileFolderPath\":\"Business Flow/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
            request.setTaskName("new_di_task_0607_1416");
    
    
            String regionId = "cn-hangzhou";
            // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/en/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
            IAcsClient client;
    
            client = new DefaultAcsClient(profile);
    
            CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
            Gson gson1 = new Gson();
    
            System.out.println(gson1.toJson(response1));
    }
  2. Call the UpdateFile operation to configure scheduling dependencies for the node.
    The following table describes the request parameters of the operation.
    ParameterTypeRequiredExampleDescription
    ActionStringYesUpdateFile

    The operation that you want to perform.

    FileFolderPathStringNoBusiness Flow/1/Data Integration/Folder 1/Folder 2

    The path of the file.

    ProjectIdLongNo10000

    The ID of the DataWorks workspace. You can log on to the DataWorks console and go to the Workspace Management page to obtain the workspace ID.

    FileNameStringNoods_user_info_d

    The name of the file. You can set the FileName parameter to a new value to change the file name.

    You can call the ListFiles operation to query the ID of the file whose name you want to change. Then, you can set the FileId parameter to the ID and set the FileName parameter to a new value when you call the UpdateFile operation.

    FileDescriptionStringNoFile description

    The description of the file.

    ContentStringNoSELECT "1";

    The code of the file. The code format varies based on the file type. To view the code format for a specific file type, go to Operation Center, right-click a node of the file type, and then select View Code.

    AutoRerunTimesIntegerYes3

    The number of automatic reruns that are allowed after an error occurs.

    AutoRerunIntervalMillisIntegerNo120000

    The interval between two consecutive automatic reruns after an error occurs. Unit: milliseconds. Maximum value: 1800000 (30 minutes).

    This parameter corresponds to the Rerun Interval parameter that is displayed after the Auto Rerun upon Error check box is selected in the Schedule section of the Properties tab in the DataWorks console.

    The interval that you specify in the DataWorks console is measured in minutes. Pay attention to the conversion between the units of time when you call the operation.

    RerunModeStringNoALL_ALLOWED

    Specifies whether the node that corresponds to the file can be rerun. Valid values:

    • ALL_ALLOWED: The node can be rerun regardless of whether it is successfully run or fails to run.
    • FAILURE_ALLOWED: The node can be rerun only after it fails to run.
    • ALL_DENIED: The node cannot be rerun regardless of whether it is successfully run or fails to run.

    This parameter corresponds to the Rerun parameter in the Schedule section of the Properties tab in the DataWorks console.

    StopBooleanNofalse

    Specifies whether to suspend the scheduling of the node. Valid values:

    • true: suspends the scheduling of the node.
    • false: does not suspend the scheduling of the node.

    This parameter corresponds to the Recurrence parameter in the Schedule section of the Properties tab in the DataWorks console.

    ParaValueStringNox=a y=b z=c

    The scheduling parameters of the node.

    This parameter corresponds to the Parameters section of the Properties tab in the DataWorks console. For more information, see Configure scheduling parameters.

    StartEffectDateLongNo936923400000

    The start time of automatic scheduling. Set this parameter to a UNIX timestamp representing the number of milliseconds that have elapsed since January 1, 1970, 00:00:00 UTC.

    This parameter corresponds to the start time specified for the Validity Period parameter in the Schedule section of the Properties tab in the DataWorks console.

    EndEffectDateLongNo4155787800000

    The end time of automatic scheduling. Set this parameter to a UNIX timestamp representing the number of milliseconds that have elapsed since January 1, 1970, 00:00:00 UTC.

    This parameter corresponds to the end time specified for the Validity Period parameter in the Schedule section of the Properties tab in the DataWorks console.

    CronExpressStringNo00 00-59/5 1-23 * * ?

    The CRON expression that represents the periodic scheduling policy of the node. This parameter corresponds to the Cron Expression parameter in the Schedule section of the Properties tab in the DataWorks console. After you configure the Scheduling Cycle and Run At parameters in the DataWorks console, DataWorks automatically generates a value for the Cron Expression parameter.

    Examples:

    • CRON expression for a node that is scheduled to run at 05:30 every day: 00 30 05 * * ?
    • CRON expression for a node that is scheduled to run at the fifteenth minute of each hour: 00 15 * * * ?
    • CRON expression for a node that is scheduled to run every 10 minutes: 00 00/10 * * * ?
    • CRON expression for a node that is scheduled to run every 10 minutes from 08:00 to 17:00 every day: 00 00-59/10 8-23 * * * ?
    • CRON expression for a node that is scheduled to run at 00:20 on the first day of each month: 00 20 00 1 * ?
    • CRON expression for a node that is scheduled to run every three months starting from 00:10 on January 1: 00 10 00 1 1-12/3 ?
    • CRON expression for a node that is scheduled to run at 00:05 every Tuesday and Friday: 00 05 00 * * 2,5

    The scheduling system of DataWorks imposes the following limits on CRON expressions:

    • A node can be scheduled to run at a minimum interval of 5 minutes.
    • The earliest time a node can be scheduled to run every day is 05:00.
    CycleTypeStringNoNOT_DAY

    The type of the scheduling cycle of the node. Valid values: NOT_DAY and DAY. The value NOT_DAY indicates that the node is scheduled to run by minute or hour. The value DAY indicates that the node is scheduled to run by day, week, or month.

    This parameter corresponds to the Scheduling Cycle parameter in the Schedule section of the Properties tab in the DataWorks console.

    DependentTypeStringNoUSER_DEFINE

    The type of the cross-cycle scheduling dependency of the node. Valid values:

    • SELF: The instance generated for the node in the current cycle depends on the instance generated for the node in the previous cycle.
    • CHILD: The instance generated for the node in the current cycle depends on the instances generated for the descendant nodes at the nearest level of the node in the previous cycle.
    • USER_DEFINE: The instance generated for the node in the current cycle depends on the instances generated for one or more specified nodes in the previous cycle.
    • NONE: No cross-cycle scheduling dependency type is specified for the node.
    DependentNodeIdListStringNo5,10,15,20

    The ID of the node on which the node corresponding to the file depends when the DependentType parameter is set to USER_DEFINE. If you specify multiple IDs, separate them with commas (,).

    The value of this parameter corresponds to the ID of the node that you specified after you select Previous Cycle and set Depend On to Other Nodes in the Dependencies section of the Properties tab in the DataWorks console.

    InputListStringNoproject_root,project.file1,project.001_out

    The output name of the parent file on which the current file depends. If you specify multiple output names, separate them with commas (,).

    This parameter corresponds to the Output Name parameter below Parent Nodes in the Dependencies section of the Properties tab in the DataWorks console.

    ProjectIdentifierStringNodw_project

    The name of the DataWorks workspace. You can log on to the DataWorks console and go to the Workspace Management page to obtain the workspace name.

    You must configure this parameter or the ProjectId parameter to determine the DataWorks workspace to which the operation is applied.

    FileIdLongYes100000001

    The ID of the file. You can call the ListFiles operation to obtain the ID.

    OutputListStringNodw_project.ods_user_info_d

    The output name of the file.

    This parameter corresponds to the Output Name parameter below Output in the Dependencies section of the Properties tab in the DataWorks console.

    ResourceGroupIdentifierStringNodefault_group

    The identifier of the resource group that is used to run the node. You can call the ListResourceGroups operation to query the available resource groups in the workspace.

    ConnectionNameStringNoodps_first

    The name of the data source that is used to run the node. You can call the ListDataSources operation to query the available data sources of the workspace.

    OwnerStringNo18023848927592

    The ID of the file owner.

    AutoParsingBooleanNotrue

    Specifies whether to enable the automatic parsing feature for the file. Valid values:

    • true: enables the automatic parsing feature for the file.
    • false: disables the automatic parsing feature for the file.

    This parameter corresponds to the Analyze Code parameter that is displayed after Same Cycle is selected in the Dependencies section of the Properties tab in the DataWorks console.

    SchedulerTypeStringNoNORMAL

    The scheduling type of the node. Valid values:

    • NORMAL: The node is an auto triggered node.
    • MANUAL: The node is a manually triggered node. Manually triggered nodes cannot be automatically scheduled. You can go to the Manually Triggered Workflows pane to view manually triggered nodes.
    • PAUSE: The node is a paused node.
    • SKIP: The node is a dry-run node. Dry-run nodes are started as scheduled, but the system sets the status of the nodes to successful when it starts to run them.
    AdvancedSettingsStringNo{"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}

    The advanced configurations of the node.

    This parameter is valid only for an EMR Spark Streaming node or an EMR Streaming SQL node. This parameter corresponds to the Advanced Settings tab of the node in the DataWorks console.

    The value of this parameter must be in the JSON format.

    StartImmediatelyBooleanNotrue

    Specifies whether to run a node immediately after the node is deployed to the production environment. Valid values:

    • true: runs the node immediately after the node is deployed to the production environment.
    • false: does not run the node immediately after the node is deployed to the production environment.

    This parameter is valid only for an EMR Spark Streaming node or an EMR Streaming SQL node. This parameter corresponds to the Start Method parameter in the Schedule section of the Configure tab in the DataWorks console.

    InputParametersStringNo[{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]

    The input parameters of the node. The value of this parameter must be in the JSON format. For more information about the input parameters, see the InputContextParameterList parameter in the Response parameters section of the GetFile operation.

    This parameter corresponds to the Input Parameters table in the Input and Output Parameters section of the Properties tab in the DataWorks console.

    OutputParametersStringNo[{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]

    The output parameters of the node. The value of this parameter must be in the JSON format. For more information about the output parameters, see the OutputContextParameterList parameter in the Response parameters section of the GetFile operation.

    This parameter corresponds to the Output Parameters table in the Input and Output Parameters section of the Properties tab in the DataWorks console.

  3. Commit the node.
    Call the SubmitFile operation to commit the node to the development environment of the scheduling system. After you commit the node, the system returns a response that contains the ID of the related deployment task. You can call the GetDeployment operation to obtain the details of the deployment task based on the ID.
     public void submitFile() throws ClientException{
            SubmitFileRequest request = new SubmitFileRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // The ID is the value of the FileId parameter that is returned when you create the node. 
            request.setFileId(501576542L);
            request.setComment("Comment");
            SubmitFileResponse acsResponse = client.getAcsResponse(request);
          // The ID is the deployment task ID that is returned after you commit or deploy the node. 
          Long deploymentId = acsResponse.getData();
            log.info(acsResponse.toString());
    }
    The preceding code provides an example of the settings of some parameters. For more information, see SubmitFile and GetDeployment.
  4. Deploy the node to the production environment.
    Call the DeployFile operation to deploy the node to the production environment.
    Note If your workspace is in standard mode, you must perform this step.
     public void deploy() throws ClientException{
            DeployFileRequest request = new DeployFileRequest();
            request.setProjectIdentifier("zxy_8221431");
            request.setFileId(501576542L);
            request.setComment("Comment");
            // You must specify the NodeId or FileId parameter. The value of NodeId is the value of the Node ID parameter in the General section of the Properties tab on the configuration tab of the node. 
            request.setNodeId(700004537241L);
            DeployFileResponse acsResponse = client.getAcsResponse(request);
             // The ID is the deployment task ID that is returned after you commit or deploy the node. 
            Long deploymentId = acsResponse.getData();
            log.info(acsResponse.getData().toString());
        }
    The preceding code provides an example of the settings of some parameters. For more information, see DeployFile.
  5. Query the status of the node.
    After you deploy the node, the system returns a response that contains the ID of the related deployment task. You can call the GetDeployment operation to obtain the details of the deployment task based on the ID. If the value of the Status parameter in the response of the GetDeployment operation is 1, the node is deployed.
    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // The ID is the deployment task ID that is returned after you commit or deploy the node. Call the GetDeployment operation to obtain the details of the related deployment task. 
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }
    The preceding code provides an example of the settings of some parameters. For more information, see GetDeployment.

Modify the configuration of the node

After you create and configure a data synchronization node in Data Integration, you can call the UpdateDISyncTask operation to modify the configuration of the node. You can also use the TaskParam parameter to change the exclusive resource group that is used by the node. After you modify the configuration of a data synchronization node, you must commit and deploy the node again. For more information, see Overview.

Sample code

POM dependencies

<dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>3.4.1</version>
 </dependency>

SDK for Java

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class createofflineTask {
    static Long createTask(String fileName) throws  Exception {
        Long projectId = 2043L;
        String taskType = "DI_OFFLINE";
        String taskContent = "{\n" +
                "    \"type\": \"job\",\n" +
                "    \"version\": \"2.0\",\n" +
                "    \"steps\": [\n" +
                "        {\n" +
                "            \"stepType\": \"mysql\",\n" +
                "            \"parameter\": {\n" +
                "                \"envType\": 0,\n" +
                "                \"datasource\": \"mysql_autotest_dev\",\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"connection\": [\n" +
                "                    {\n" +
                "                        \"datasource\": \"mysql_autotest_dev\",\n" +
                "                        \"table\": [\n" +
                "                            \"user\"\n" +
                "                        ]\n" +
                "                    }\n" +
                "                ],\n" +
                "                \"where\": \"\",\n" +
                "                \"splitPk\": \"\",\n" +
                "                \"encoding\": \"UTF-8\"\n" +
                "            },\n" +
                "            \"name\": \"Reader\",\n" +
                "            \"category\": \"reader\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"stepType\": \"odps\",\n" +
                "            \"parameter\": {\n" +
                "                \"partition\": \"pt=${bizdate}\",\n" +
                "                \"truncate\": true,\n" +
                "                \"datasource\": \"odps_first\",\n" +
                "                \"envType\": 0,\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"emptyAsNull\": false,\n" +
                "                \"tableComment\": \"null\",\n" +
                "                \"table\": \"user\"\n" +
                "            },\n" +
                "            \"name\": \"Writer\",\n" +
                "            \"category\": \"writer\"\n" +
                "        }\n" +
                "    ],\n" +
                "    \"setting\": {\n" +
                "        \"executeMode\": null,\n" +
                "        \"errorLimit\": {\n" +
                "            \"record\": \"\"\n" +
                "        },\n" +
                "        \"speed\": {\n" +
                "            \"concurrent\": 2,\n" +
                "            \"throttle\": false\n" +
                "        }\n" +
                "    },\n" +
                "    \"order\": {\n" +
                "        \"hops\": [\n" +
                "            {\n" +
                "                \"from\": \"Reader\",\n" +
                "                \"to\": \"Writer\"\n" +
                "            }\n" +
                "        ]\n" +
                "    }\n" +
                "}";
        CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
        request.setProjectId(projectId);
        request.setTaskType(taskType);
        request.setTaskContent(taskContent);
        request.setTaskName(fileName);
        request.setTaskParam("{\"FileFolderPath\":\"Business Flow/test/Data Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
        // Use an exclusive resource group for Data Integration. 
        CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFileId();
    }

    public static void updateFile(Long fileId) throws Exception {
        UpdateFileRequest request = new UpdateFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        request.setAutoRerunTimes(3);
        request.setRerunMode("FAILURE_ALLOWED");
        request.setCronExpress("00 30 05 * * ?");
        request.setCycleType("DAY");
        request.setResourceGroupIdentifier("S_res_group_XXX");
        // Use an exclusive resource group for scheduling.
        request.setInputList("dataworks_di_autotest_root");

        request.setAutoParsing(true);
        request.setDependentNodeIdList("5,10,15,20");
        request.setDependentType("SELF");
        request.setStartEffectDate(0L);
        request.setEndEffectDate(4155787800000L);
        request.setFileDescription("description");
        request.setStop(false);
        request.setParaValue("x=a y=b z=c");
        request.setSchedulerType("NORMAL");
        request.setAutoRerunIntervalMillis(120000);
        UpdateFileResponse response1 = client.getAcsResponse(request);
    }


    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XX";
        // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/en/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String taskName = "offline_job_0930_1648";
        Long fileId = createTask(taskName); // Create a data synchronization node in Data Integration.
        updateFile(fileId);   // Configure scheduling properties for the node.
    }
}