All Products
Search
Document Center

DataWorks:Call an API to change the source of a data synchronization task from a MySQL data source to a PolarDB data source

Last Updated:Mar 12, 2024

This topic describes how to change the source of a synchronization task whose destination type is MaxCompute from a MySQL data source to a PolarDB data source.

Limits

You can use this method to change the source of a data synchronization task from only a MySQL data source to only a PolarDB data source. You cannot use this method to change the source of a data synchronization task between other types of data sources.

Note

Risks may occur if you call an API to modify the JSON code in the original configuration file. Therefore, we recommend that you do not use the modified configuration file for other business purposes or call an API to modify other configuration files. If you use the modified configuration file for other business purposes, an error may occur when the data synchronization task is run, and data quality issues may occur.

Sample code of POM dependencies

<dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>3.4.4</version>
 </dependency>

Sample code of the SDK for Java

package com.alibaba.eas;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class updateOfflineTask {

    public static ListFilesResponse.Data.File ListFiles(String filePath, String fileName) throws Exception {
        ListFilesRequest request = new ListFilesRequest();
        request.setProjectId(1911L);
        request.setFileFolderPath(filePath);
        request.setKeyword(fileName);
        ListFilesResponse response1 = client.getAcsResponse(request);
        for(int i = 0 ; i < response1.getData().getFiles().size(); ++i) {
            return response1.getData().getFiles().get(i);
        }
        return null;

    }

    public static String GetFiles(Long fileId) throws Exception {
        GetFileRequest request = new GetFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        GetFileResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFile().getContent();
    }
    public static void UpdateDISyncTask(Long fileId, String content) throws Exception {
        UpdateDISyncTaskRequest request = new UpdateDISyncTaskRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        request.setTaskContent(content);
        request.setTaskType("DI_OFFLINE");
        UpdateDISyncTaskResponse response1 = client.getAcsResponse(request);
    }

    public static  Long submitFile(Long fileId) throws  Exception {
        SubmitFileRequest request = new SubmitFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        SubmitFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    public static  void getDeployment(Long deploymentId) throws Exception {
        GetDeploymentRequest request = new GetDeploymentRequest();
        request.setProjectId(1911L);
        request.setDeploymentId(deploymentId);
        GetDeploymentResponse acsResponse = client.getAcsResponse(request);
        System.out.println(acsResponse.getData().getDeployment().getStatus());
    }

    public static  Long deploy(Long fileId) throws Exception {
        DeployFileRequest request = new DeployFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        DeployFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    public static Long listNode(String nodeName) throws Exception {
        ListNodesRequest request = new ListNodesRequest();
        request.setProjectId(1911L);
        request.setNodeName(nodeName);
        request.setProjectEnv("PROD");
        ListNodesResponse acsResponse = client.getAcsResponse(request);
        List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
        return nodesItemList.get(0).getNodeId();
    }

    public static void RunCycleDagNodes(Long nodeId) throws Exception {
        RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
        request.setIncludeNodeIds(nodeId.toString());
        request.setName("rerun_job");
        request.setParallelism(false);
        request.setProjectEnv("PROD");
        request.setRootNodeId(nodeId);
        request.setStartBizDate("2021-09-29 00:00:00");
        request.setEndBizDate("2021-09-29 00:00:00");
        request.setProjectEnv("PROD");
        RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
    }

  /*


  Replace the values of the stepType and datasource parameters in the following code snippet with polardb: 
  {
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "mysql",
            "parameter": {
                "envType": 0,
                "datasource": "mysql_from_polardb",
                "column": [
                    "id",
                    "name",
                    "create_time",
                    "create_user"
                ],
                "tableComment": "Test",
                "connection": [
                    {
                        "selectedDatabase": "polardb_db1",
                        "datasource": "mysql_from_polardb",
                        "table": [
                            "lcl_test_demo"
                        ]
                    }
                ],
                "where": "",
                "splitPk": "id",
                "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "pt=${bizdate}",
                "truncate": true,
                "datasource": "odps_source",
                "envType": 0,
                "column": [
                    "id",
                    "name",
                    "create_time",
                    "create_user"
                ],
                "emptyAsNull": false,
                "tableComment": "Test",
                "table": "lcl_test_demo"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "locale": "zh_CN",
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

The following code snippet shows the replacement results:
  {
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "polardb",
            "parameter": {
                "envType": 0,
                "datasource": "polardb",
                "column": [
                    "id",
                    "name",
                    "create_time",
                    "create_user"
                ],
                "tableComment": "Test",
                "connection": [
                    {
                        "selectedDatabase": "polardb_db1",
                        "datasource": "polardb",
                        "table": [
                            "lcl_test_demo"
                        ]
                    }
                ],
                "where": "",
                "splitPk": "id",
                "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "pt=${bizdate}",
                "truncate": true,
                "datasource": "odps_source",
                "envType": 0,
                "column": [
                    "id",
                    "name",
                    "create_time",
                    "create_user"
                ],
                "emptyAsNull": false,
                "tableComment": "Test",
                "table": "lcl_test_demo"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "locale": "zh_CN",
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

  */
    public static String modifyContent(String content, String newStepType, String newDatasource){
        JSONObject jsonObject = JSON.parseObject(content);
        JSONArray steps = jsonObject.getJSONArray("steps");
        if (steps != null) {
            for (int i = 0; i < steps.size(); ++i) {
                JSONObject step = steps.getJSONObject(i);
                if (step != null && step.getString("category") != null && "reader".equals(step.getString("category"))) {
                    if (step.getString("stepType") != null && "mysql".equals(step.getString("stepType"))) {
                        step.put("stepType", newStepType);
                        JSONObject parameter = step.getJSONObject("parameter");
                        if (parameter  != null) {
                            parameter.put("datasource", newDatasource);
                            JSONArray connections = parameter.getJSONArray("connection");
                            if (connections != null) {
                                for (int j = 0; j < connections.size(); ++j) {
                                    JSONObject connection = connections.getJSONObject(j);
                                    if (connection != null) {
                                        connection.put("datasource", newDatasource);
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }


        return  jsonObject.toJSONString();
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XXX";
        String akSecret = "XXX";
        String regionId = "cn-chengdu";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String folderPath = "Business Flow/Test Workflow/Data Integration/";
        String filename = "mysql_to_odps";
        ListFilesResponse.Data.File file = ListFiles(folderPath, filename);
        Long fileId = file.getFileId();
        System.out.println(file.getFileId());
        String content = GetFiles(fileId);
        String contentModified = modifyContent(content, "polardb", "polardb_datasource");
        //  "polardb" indicates the type of the new data source. The original MySQL data source is replaced by a PolarDB data source.
        //
        UpdateDISyncTask(file.getFileId(), contentModified);
        Long deployId = submitFile(fileId);
        getDeployment(deployId);
        Thread.sleep(10000);
        getDeployment(deployId);
        deployId = deploy(fileId);
        getDeployment(deployId);
        Thread.sleep(10000);
        getDeployment(deployId);
        Long nodeId = listNode(filename);
        RunCycleDagNodes(nodeId);
    }
}