This topic describes how to use API operations to create, configure, and manage a data synchronization node in Data Integration.

Prerequisites

Limits

  • In DataWorks, you can call the CreateDISyncTask API operation to create only batch data synchronization nodes.
  • You can use only the code editor to configure data synchronization nodes that are created by calling the CreateDISyncTask API operation. For more information, see Create a sync node by using the code editor.
  • In DataWorks, you cannot use an API operation to create a workflow. You can call the CreateDISyncTask API operation to create data synchronization nodes only in existing workflows.

Preparations

  • Configure Maven dependencies.
    1. Open the pom.xml file of your Maven project and add aliyun-java-sdk-core to the file.
       <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
      </dependency>
    2. Open the pom.xml file of your Maven project and add aliyun-java-sdk-dataworks-public to the file.
      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>3.4.2</version>
      </dependency>
  • Authenticate an account.
    Before you can use an API operation to create a data synchronization node, you must run the following code to authenticate the Alibaba Cloud account that you want to use to log on to DataWorks. If the account passes the authentication, you can perform subsequent operations. If the account fails the authentication, an error is returned, and you must resolve the issue based on the error.
     DefaultProfile profile = DefaultProfile.getProfile(
                               "regionId",            // The ID of the region where your DataWorks workspace resides, such as cn-hangzhou. 
                               "<yourAccessKeyId>",   // The AccessKey ID of the Alibaba Cloud account that you want to use to access the DataWorks workspace. 
                               "<yourAccessSecret>"); // The AccessKey secret of the Alibaba Cloud account that you want to use to access the DataWorks workspace. 
     IAcsClient client = new DefaultAcsClient(profile);
    To obtain the AccessKey ID and AccessKey secret of your Alibaba Cloud account, log on to the DataWorks console, move the pointer over your profile picture, and then click AccessKey Management.

Overview

Procedure

  1. Create a data synchronization node in Data Integration.
    Call the CreateDISyncTask operation to create a data synchronization node in Data Integration. The following code provides an example of the settings of some parameters. For more information, see CreateDISyncTask.
    public void createFile() throws ClientException{
            CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
            request.setProjectId(181565L);
            request.setTaskType("DI_OFFLINE");
            request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"Comment for the table same\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_first\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"Comment for the table same\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
            request.setTaskParam("{\"FileFolderPath\":\"Business Flow/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
            request.setTaskName("new_di_task_0607_1416");
    
    
            String akId = "XXX";
            String akSecret = "XXXX";
            String regionId = "cn-hangzhou";
            IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
            DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
            IAcsClient client;
    
            client = new DefaultAcsClient(profile);
    
            CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
            Gson gson1 = new Gson();
    
            System.out.println(gson1.toJson(response1));
    }
  2. Configure scheduling dependencies for the node.
    Call the UpdateFile operation to configure scheduling dependencies for the node.
    public static void updateFile(Long fileId) throws Exception {
            UpdateFileRequest request = new UpdateFileRequest();
            request.setProjectId(2043L);
            request.setFileId(fileId);
            request.setAutoRerunTimes(3);
            request.setRerunMode("FAILURE_ALLOWED");
            request.setCronExpress("00 30 05 * * ?");
            request.setCycleType("DAY");
            request.setResourceGroupIdentifier("S_res_group_XXX");
            // Use an exclusive resource group for scheduling.
            request.setInputList("dataworks_di_autotest_root");
            UpdateFileResponse response1 = client.getAcsResponse(request);
        }
    The preceding code provides an example of the settings of some parameters. For more information, see UpdateFile.
  3. Commit the node.
    Call the SubmitFile operation to commit the node to the development environment of the scheduling system.
     public void submitFile() throws ClientException{
            SubmitFileRequest request = new SubmitFileRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // The ID is the value of the FileId parameter that is returned when you create the node. 
            request.setFileId(501576542L);
            request.setComment("Comment");
            SubmitFileResponse acsResponse = client.getAcsResponse(request);
    }
    The preceding code provides an example of the settings of some parameters. For more information, see SubmitFile.
  4. Query the status of the node.
    After you commit the node, the ID of the deployment task of the node is returned. You can call the GetDeployment operation to query the details about the deployment task based on the ID. If the value of the Status parameter in the response of the GetDeployment operation is 1, the deployment task is successful, and the node is committed. You can deploy a node only after the node is committed. If your node fails to be committed, you must handle the failure based on the error that is returned.
    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // The ID is the deployment task ID that is returned after you commit the node. 
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }
    The preceding code provides an example of the settings of some parameters. For more information, see GetDeployment.
  5. Deploy the node to the production environment.
    Call the DeployFile operation to deploy the node to the production environment.
    Note If your workspace is in standard mode, you must perform this step.
     public void deploy() throws ClientException{
            DeployFileRequest request = new DeployFileRequest();
            request.setProjectIdentifier("zxy_8221431");
            request.setFileId(501576542L);
            request.setComment("Comment");
            // You must specify the NodeId or FileId parameter. The value of NodeId is the value of the Node ID parameter in the General section of the Properties tab on the configuration tab of the node. 
            request.setNodeId(700004537241L);
            DeployFileResponse acsResponse = client.getAcsResponse(request);
        }
    The preceding code provides an example of some parameter settings. For more information, see DeployFile.
  6. Query the status of the node.
    After you deploy the node, the ID of the deployment task of the node is returned. You can call the GetDeployment operation to query the details about the deployment task based on the ID. If the value of the Status parameter in the response of the GetDeployment operation is 1, the deployment task is successful, and the node is deployed.
    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // The ID is the deployment task ID that is returned after you deploy the node. 
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }
    The preceding code provides an example of the settings of some parameters. For more information, see GetDeployment.

Manage a data synchronization node

After you create and configure a data synchronization node in Data Integration, you can perform the following operations on the node:
  • Modify the configuration of the node: You can call the UpdateDISyncTask operation to modify the configuration of the node. You can also use the TaskParam parameter to change the exclusive resource group that is used by the node. For more information, see UpdateDISyncTask.
    Note After you modify the configuration of a data synchronization node, you must commit and deploy the node again. For more information, see Overview.
  • Backfill data for the node: You can call the RunCycleDagNodes operation to backfill data for the node. This operation allows you to backfill data for multiple nodes in a workflow at the same time. For more information, see RunCycleDagNodes.

Example

The following example shows the complete code that is used to create and configure a batch data synchronization node in Data Integration:
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class createofflineTask {
    static Long createTask(String fileName) throws  Exception {
        Long projectId = 2043L;
        String taskType = "DI_OFFLINE";
        String taskContent = "{\n" +
                "    \"type\": \"job\",\n" +
                "    \"version\": \"2.0\",\n" +
                "    \"steps\": [\n" +
                "        {\n" +
                "            \"stepType\": \"mysql\",\n" +
                "            \"parameter\": {\n" +
                "                \"envType\": 0,\n" +
                "                \"datasource\": \"mysql_autotest_dev\",\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"connection\": [\n" +
                "                    {\n" +
                "                        \"datasource\": \"mysql_autotest_dev\",\n" +
                "                        \"table\": [\n" +
                "                            \"user\"\n" +
                "                        ]\n" +
                "                    }\n" +
                "                ],\n" +
                "                \"where\": \"\",\n" +
                "                \"splitPk\": \"\",\n" +
                "                \"encoding\": \"UTF-8\"\n" +
                "            },\n" +
                "            \"name\": \"Reader\",\n" +
                "            \"category\": \"reader\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"stepType\": \"odps\",\n" +
                "            \"parameter\": {\n" +
                "                \"partition\": \"pt=${bizdate}\",\n" +
                "                \"truncate\": true,\n" +
                "                \"datasource\": \"odps_first\",\n" +
                "                \"envType\": 0,\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"emptyAsNull\": false,\n" +
                "                \"tableComment\": \"null\",\n" +
                "                \"table\": \"user\"\n" +
                "            },\n" +
                "            \"name\": \"Writer\",\n" +
                "            \"category\": \"writer\"\n" +
                "        }\n" +
                "    ],\n" +
                "    \"setting\": {\n" +
                "        \"executeMode\": null,\n" +
                "        \"errorLimit\": {\n" +
                "            \"record\": \"\"\n" +
                "        },\n" +
                "        \"speed\": {\n" +
                "            \"concurrent\": 2,\n" +
                "            \"throttle\": false\n" +
                "        }\n" +
                "    },\n" +
                "    \"order\": {\n" +
                "        \"hops\": [\n" +
                "            {\n" +
                "                \"from\": \"Reader\",\n" +
                "                \"to\": \"Writer\"\n" +
                "            }\n" +
                "        ]\n" +
                "    }\n" +
                "}";
        CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
        request.setProjectId(projectId);
        request.setTaskType(taskType);
        request.setTaskContent(taskContent);
        request.setTaskName(fileName);
        request.setTaskParam("{\"FileFolderPath\":\"Business Flow/test/Data Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
        // Use an exclusive resource group for Data Integration. 
        CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFileId();
    }

    public static void updateFile(Long fileId) throws Exception {
        UpdateFileRequest request = new UpdateFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        request.setAutoRerunTimes(3);
        request.setRerunMode("FAILURE_ALLOWED");
        request.setCronExpress("00 30 05 * * ?");
        request.setCycleType("DAY");
        request.setResourceGroupIdentifier("S_res_group_XXX");
        // Use an exclusive resource group for scheduling.
        request.setInputList("dataworks_di_autotest_root");
        UpdateFileResponse response1 = client.getAcsResponse(request);
    }

    public static  Long submitFile(Long fileId) throws  Exception {
        SubmitFileRequest request = new SubmitFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        SubmitFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    public static  void getDeployment(Long deploymentId) throws Exception {
        GetDeploymentRequest request = new GetDeploymentRequest();
        request.setProjectId(2043L);
        request.setDeploymentId(deploymentId);
        GetDeploymentResponse acsResponse = client.getAcsResponse(request);
        System.out.println(acsResponse.getData().getDeployment().getStatus());
    }

    public static  Long deploy(Long fileId) throws Exception {
        DeployFileRequest request = new DeployFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        DeployFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    public static Long listNode(String nodeName) throws Exception {
        ListNodesRequest request = new ListNodesRequest();
        request.setProjectId(2043L);
        request.setNodeName(nodeName);
        request.setProjectEnv("PROD");
        ListNodesResponse acsResponse = client.getAcsResponse(request);
        List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
        return nodesItemList.get(0).getNodeId();
    }

    public static void RunCycleDagNodes(Long nodeId) throws Exception {
        RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
        request.setIncludeNodeIds(nodeId.toString());
        request.setName("rerun_job");
        request.setParallelism(false);
        request.setProjectEnv("PROD");
        request.setRootNodeId(nodeId);
        request.setStartBizDate("2021-09-29 00:00:00");
        request.setEndBizDate("2021-09-29 00:00:00");
        request.setProjectEnv("PROD");
        RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XX";
        String akSecret = "XX"; // Specify the AccessKey secret of the Alibaba Cloud account that you use to access your DataWorks workspace.
        String regionId = "cn-chengdu";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String taskName = "offline_job_0930_1648";
        Long fileId = createTask(taskName); // Create a data synchronization node in Data Integration. 
        updateFile(fileId);   // Configure scheduling properties for the node. 
        Long deployId = submitFile(fileId); // Commit the node. 
        getDeployment(deployId);  // Query the status of the node. 
        Thread.sleep(10000); // Wait until the node is committed. 
        getDeployment(deployId);  // Query the status of the node. 
        deployId = deploy(fileId);  // Deploy the node to the production environment. 
        getDeployment(deployId);    // Query the status of the node. 
        Thread.sleep(10000);        // Wait until the node is deployed. 
        getDeployment(deployId);    // Query the status of the node. 
        Long nodeId = listNode(taskName);  // Query the ID of the node. 
        RunCycleDagNodes(nodeId);   // Backfill data for the node. 
    }
}