This topic describes how to change the source of a synchronization task whose destination type is MaxCompute from a MySQL data source to a PolarDB data source.
Limits
You can use this method to change the source of a data synchronization task from only a MySQL data source to only a PolarDB data source. You cannot use this method to change the source of a data synchronization task between other types of data sources.
Risks may occur if you call an API to modify the JSON code in the original configuration file. Therefore, we recommend that you do not use the modified configuration file for other business purposes or call an API to modify other configuration files. If you use the modified configuration file for other business purposes, an error may occur when the data synchronization task is run, and data quality issues may occur.
Sample code of POM dependencies
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.20</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dataworks-public</artifactId>
<version>3.4.4</version>
</dependency>Sample code of the SDK for Java
package com.alibaba.eas;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;
public class updateOfflineTask {
public static ListFilesResponse.Data.File ListFiles(String filePath, String fileName) throws Exception {
ListFilesRequest request = new ListFilesRequest();
request.setProjectId(1911L);
request.setFileFolderPath(filePath);
request.setKeyword(fileName);
ListFilesResponse response1 = client.getAcsResponse(request);
for(int i = 0 ; i < response1.getData().getFiles().size(); ++i) {
return response1.getData().getFiles().get(i);
}
return null;
}
public static String GetFiles(Long fileId) throws Exception {
GetFileRequest request = new GetFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
GetFileResponse response1 = client.getAcsResponse(request);
return response1.getData().getFile().getContent();
}
public static void UpdateDISyncTask(Long fileId, String content) throws Exception {
UpdateDISyncTaskRequest request = new UpdateDISyncTaskRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
request.setTaskContent(content);
request.setTaskType("DI_OFFLINE");
UpdateDISyncTaskResponse response1 = client.getAcsResponse(request);
}
public static Long submitFile(Long fileId) throws Exception {
SubmitFileRequest request = new SubmitFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
SubmitFileResponse acsResponse = client.getAcsResponse(request);
Long deploymentId = acsResponse.getData();
return deploymentId;
}
public static void getDeployment(Long deploymentId) throws Exception {
GetDeploymentRequest request = new GetDeploymentRequest();
request.setProjectId(1911L);
request.setDeploymentId(deploymentId);
GetDeploymentResponse acsResponse = client.getAcsResponse(request);
System.out.println(acsResponse.getData().getDeployment().getStatus());
}
public static Long deploy(Long fileId) throws Exception {
DeployFileRequest request = new DeployFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
DeployFileResponse acsResponse = client.getAcsResponse(request);
Long deploymentId = acsResponse.getData();
return deploymentId;
}
public static Long listNode(String nodeName) throws Exception {
ListNodesRequest request = new ListNodesRequest();
request.setProjectId(1911L);
request.setNodeName(nodeName);
request.setProjectEnv("PROD");
ListNodesResponse acsResponse = client.getAcsResponse(request);
List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
return nodesItemList.get(0).getNodeId();
}
public static void RunCycleDagNodes(Long nodeId) throws Exception {
RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
request.setIncludeNodeIds(nodeId.toString());
request.setName("rerun_job");
request.setParallelism(false);
request.setProjectEnv("PROD");
request.setRootNodeId(nodeId);
request.setStartBizDate("2021-09-29 00:00:00");
request.setEndBizDate("2021-09-29 00:00:00");
request.setProjectEnv("PROD");
RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
}
/*
Replace the values of the stepType and datasource parameters in the following code snippet with polardb:
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "mysql",
"parameter": {
"envType": 0,
"datasource": "mysql_from_polardb",
"column": [
"id",
"name",
"create_time",
"create_user"
],
"tableComment": "Test",
"connection": [
{
"selectedDatabase": "polardb_db1",
"datasource": "mysql_from_polardb",
"table": [
"lcl_test_demo"
]
}
],
"where": "",
"splitPk": "id",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "pt=${bizdate}",
"truncate": true,
"datasource": "odps_source",
"envType": 0,
"column": [
"id",
"name",
"create_time",
"create_user"
],
"emptyAsNull": false,
"tableComment": "Test",
"table": "lcl_test_demo"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"locale": "zh_CN",
"speed": {
"throttle": false,
"concurrent": 2
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
The following code snippet shows the replacement results:
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "polardb",
"parameter": {
"envType": 0,
"datasource": "polardb",
"column": [
"id",
"name",
"create_time",
"create_user"
],
"tableComment": "Test",
"connection": [
{
"selectedDatabase": "polardb_db1",
"datasource": "polardb",
"table": [
"lcl_test_demo"
]
}
],
"where": "",
"splitPk": "id",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "pt=${bizdate}",
"truncate": true,
"datasource": "odps_source",
"envType": 0,
"column": [
"id",
"name",
"create_time",
"create_user"
],
"emptyAsNull": false,
"tableComment": "Test",
"table": "lcl_test_demo"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"locale": "zh_CN",
"speed": {
"throttle": false,
"concurrent": 2
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
*/
public static String modifyContent(String content, String newStepType, String newDatasource){
JSONObject jsonObject = JSON.parseObject(content);
JSONArray steps = jsonObject.getJSONArray("steps");
if (steps != null) {
for (int i = 0; i < steps.size(); ++i) {
JSONObject step = steps.getJSONObject(i);
if (step != null && step.getString("category") != null && "reader".equals(step.getString("category"))) {
if (step.getString("stepType") != null && "mysql".equals(step.getString("stepType"))) {
step.put("stepType", newStepType);
JSONObject parameter = step.getJSONObject("parameter");
if (parameter != null) {
parameter.put("datasource", newDatasource);
JSONArray connections = parameter.getJSONArray("connection");
if (connections != null) {
for (int j = 0; j < connections.size(); ++j) {
JSONObject connection = connections.getJSONObject(j);
if (connection != null) {
connection.put("datasource", newDatasource);
}
}
}
}
}
}
}
}
return jsonObject.toJSONString();
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
String akId = "XXX";
String akSecret = "XXX";
String regionId = "cn-chengdu";
IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
String folderPath = "Business Flow/Test Workflow/Data Integration/";
String filename = "mysql_to_odps";
ListFilesResponse.Data.File file = ListFiles(folderPath, filename);
Long fileId = file.getFileId();
System.out.println(file.getFileId());
String content = GetFiles(fileId);
String contentModified = modifyContent(content, "polardb", "polardb_datasource");
// "polardb" indicates the type of the new data source. The original MySQL data source is replaced by a PolarDB data source.
//
UpdateDISyncTask(file.getFileId(), contentModified);
Long deployId = submitFile(fileId);
getDeployment(deployId);
Thread.sleep(10000);
getDeployment(deployId);
deployId = deploy(fileId);
getDeployment(deployId);
Thread.sleep(10000);
getDeployment(deployId);
Long nodeId = listNode(filename);
RunCycleDagNodes(nodeId);
}
}