全部产品
Search
文档中心

大数据开发治理平台 DataWorks:MySQL数据源转PolarDB数据源的OpenAPI最佳实践

更新时间:Mar 08, 2024

本文为您介绍MySQL升级为PolarDB后,如何将之前从MySQL同步到MaxCompute的任务批量修改为从PolarDB同步到MaxCompute。

使用限制

目前仅适用于客户场景为MySQL升级到PolarDB时使用,其他任何数据源替换场景均不适用。

说明

因为API中直接修改原始配置文件JSON存在风险,因此不建议作为其他业务目的使用,否则会导致同步任务运行出错甚至出现数据质量问题。

POM 依赖示例代码

<dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>3.4.4</version>
 </dependency>

Java Sdk调用示例代码

package com.alibaba.eas;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class updateOfflineTask {

    public static ListFilesResponse.Data.File ListFiles(String filePath, String fileName) throws Exception {
        ListFilesRequest request = new ListFilesRequest();
        request.setProjectId(1911L);
        request.setFileFolderPath(filePath);
        request.setKeyword(fileName);
        ListFilesResponse response1 = client.getAcsResponse(request);
        for(int i = 0 ; i < response1.getData().getFiles().size(); ++i) {
            return response1.getData().getFiles().get(i);
        }
        return null;

    }

    public static String GetFiles(Long fileId) throws Exception {
        GetFileRequest request = new GetFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        GetFileResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFile().getContent();
    }
    public static void UpdateDISyncTask(Long fileId, String content) throws Exception {
        UpdateDISyncTaskRequest request = new UpdateDISyncTaskRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        request.setTaskContent(content);
        request.setTaskType("DI_OFFLINE");
        UpdateDISyncTaskResponse response1 = client.getAcsResponse(request);
    }

    public static  Long submitFile(Long fileId) throws  Exception {
        SubmitFileRequest request = new SubmitFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        SubmitFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    public static  void getDeployment(Long deploymentId) throws Exception {
        GetDeploymentRequest request = new GetDeploymentRequest();
        request.setProjectId(1911L);
        request.setDeploymentId(deploymentId);
        GetDeploymentResponse acsResponse = client.getAcsResponse(request);
        System.out.println(acsResponse.getData().getDeployment().getStatus());
    }

    public static  Long deploy(Long fileId) throws Exception {
        DeployFileRequest request = new DeployFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        DeployFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    public static Long listNode(String nodeName) throws Exception {
        ListNodesRequest request = new ListNodesRequest();
        request.setProjectId(1911L);
        request.setNodeName(nodeName);
        request.setProjectEnv("PROD");
        ListNodesResponse acsResponse = client.getAcsResponse(request);
        List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
        return nodesItemList.get(0).getNodeId();
    }

    public static void RunCycleDagNodes(Long nodeId) throws Exception {
        RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
        request.setIncludeNodeIds(nodeId.toString());
        request.setName("rerun_job");
        request.setParallelism(false);
        request.setProjectEnv("PROD");
        request.setRootNodeId(nodeId);
        request.setStartBizDate("2021-09-29 00:00:00");
        request.setEndBizDate("2021-09-29 00:00:00");
        request.setProjectEnv("PROD");
        RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
    }

  /*


  把下面的配置中的 mysql 和 mysql_from_polardb 进行替换,
  {
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "mysql",
            "parameter": {
                "envType": 0,
                "datasource": "mysql_from_polardb",
                "column": [
                    "id",
                    "name",
                    "create_time",
                    "create_user"
                ],
                "tableComment": "配置表",
                "connection": [
                    {
                        "selectedDatabase": "polardb_db1",
                        "datasource": "mysql_from_polardb",
                        "table": [
                            "lcl_test_demo"
                        ]
                    }
                ],
                "where": "",
                "splitPk": "id",
                "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "pt=${bizdate}",
                "truncate": true,
                "datasource": "odps_source",
                "envType": 0,
                "column": [
                    "id",
                    "name",
                    "create_time",
                    "create_user"
                ],
                "emptyAsNull": false,
                "tableComment": "配置表",
                "table": "lcl_test_demo"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "locale": "zh_CN",
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

替换完之后的结果是
  {
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "polardb",
            "parameter": {
                "envType": 0,
                "datasource": "polardb",
                "column": [
                    "id",
                    "name",
                    "create_time",
                    "create_user"
                ],
                "tableComment": "配置表",
                "connection": [
                    {
                        "selectedDatabase": "polardb_db1",
                        "datasource": "polardb",
                        "table": [
                            "lcl_test_demo"
                        ]
                    }
                ],
                "where": "",
                "splitPk": "id",
                "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "pt=${bizdate}",
                "truncate": true,
                "datasource": "odps_source",
                "envType": 0,
                "column": [
                    "id",
                    "name",
                    "create_time",
                    "create_user"
                ],
                "emptyAsNull": false,
                "tableComment": "配置表",
                "table": "lcl_test_demo"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "locale": "zh_CN",
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

  */
    public static String modifyContent(String content, String newStepType, String newDatasource){
        JSONObject jsonObject = JSON.parseObject(content);
        JSONArray steps = jsonObject.getJSONArray("steps");
        if (steps != null) {
            for (int i = 0; i < steps.size(); ++i) {
                JSONObject step = steps.getJSONObject(i);
                if (step != null && step.getString("category") != null && "reader".equals(step.getString("category"))) {
                    if (step.getString("stepType") != null && "mysql".equals(step.getString("stepType"))) {
                        step.put("stepType", newStepType);
                        JSONObject parameter = step.getJSONObject("parameter");
                        if (parameter  != null) {
                            parameter.put("datasource", newDatasource);
                            JSONArray connections = parameter.getJSONArray("connection");
                            if (connections != null) {
                                for (int j = 0; j < connections.size(); ++j) {
                                    JSONObject connection = connections.getJSONObject(j);
                                    if (connection != null) {
                                        connection.put("datasource", newDatasource);
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }


        return  jsonObject.toJSONString();
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XXX";
        String akSecret = "XXX";
        String regionId = "cn-chengdu";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String folderPath = "业务流程/重兴/数据集成/";
        String filename = "mysql_to_odps";
        ListFilesResponse.Data.File file = ListFiles(folderPath, filename);
        Long fileId = file.getFileId();
        System.out.println(file.getFileId());
        String content = GetFiles(fileId);
        String contentModified = modifyContent(content, "polardb", "polardb_datasource");
        //  "polardb" 代表替换的目标数据源,这里的实例是把 mysql 替换成 polardb
        //
        UpdateDISyncTask(file.getFileId(), contentModified);
        Long deployId = submitFile(fileId);
        getDeployment(deployId);
        Thread.sleep(10000);
        getDeployment(deployId);
        deployId = deploy(fileId);
        getDeployment(deployId);
        Thread.sleep(10000);
        getDeployment(deployId);
        Long nodeId = listNode(filename);
        RunCycleDagNodes(nodeId);
    }
}