All Products
Search
Document Center

:Best practices: End-to-end data ETL operations

Last Updated:Nov 20, 2025

This topic uses a case study to demonstrate how to use the DataWorks OpenAPI for data development.

Background information

Consider a scenario where a developer needs to synchronize data from an ApsaraDB RDS database to a partitioned MaxCompute table. The developer then wants to display reports with the analyzed data on a page in a self-managed system. The DataWorks OpenAPI can be used to implement this entire process, which includes the following steps:

Prerequisites

The DataWorks OpenAPI software development kit (SDK) is installed. For more information, see Use OpenAPI.

Note

In addition to Java, DataWorks provides SDKs for other programming languages, such as Python, PHP, C#, and Go. You must install the SDK that is appropriate for your development environment.

Precautions

By default, you do not need to explicitly specify the endpoint for the DataWorks OpenAPI. However, if you use an earlier version of aliyun-java-sdk-core, the SDK may fail to find the DataWorks OpenAPI endpoint. In this case, you can use the following code to send requests without upgrading the SDK.

// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
config.setEndpoint("dataworks.cn-hangzhou.aliyuncs.com");
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
com.aliyun.dataworks_public20200518.Client client = new com.aliyun.dataworks_public20200518.Client(config);

The preceding code explicitly specifies the DataWorks OpenAPI endpoint. An endpoint in the dataworks.${regionId}.aliyuncs.com format is accessible over the internet. However, some users need to call OpenAPI operations in a virtual private cloud (VPC) environment. To do this, change the domain name from dataworks.${regionId}.aliyuncs.com to dataworks-vpc.${regionId}.aliyuncs.com. This lets you send requests to the DataWorks OpenAPI from a VPC environment, even without an internet connection. For more information about region IDs, see Regions and zones.

Step 1: Create an RDS data source

You can use the tenant-level API to create engines and data sources, and view information about projects. In this scenario, a MaxCompute partitioned table exists in a MaxCompute engine. When you create a MaxCompute workspace in the DataWorks console, a data source for the MaxCompute engine is automatically created. Therefore, you only need to call the CreateConnection API to create an RDS data source.

CreateDataSourceRequest createRequest = new CreateDataSourceRequest();
createRequest.setProjectId(-1L);
createRequest.setName("TEST_CONNECTION");
createRequest.setDataSourceType("mysql");
createRequest.setEnvType(1);
createRequest.setContent("{\"password\":\"12345\"}");
Long dataSourceId;

try {
    CreateDataSourceResponse createResponse = client.createDataSource(createRequest);
    Assert.assertNotNull(createResponse.getBody().getData());
    dataSourceId = createResponse.getBody().getData();

    UpdateDataSourceRequest updateRequest = new UpdateDataSourceRequest();
    updateRequest.setDataSourceId(dataSourceId);
    updateRequest.setDescription("1");
    updateRequest.setContent("{\"database\":\"xsaxsa\",\"username\":\"xsaxsa\",\"tag\":\"rds\",\"regionId\":\"cn-shanghai\",\"rdsOwnerId\":\"xasxsa\",\"password\":\"xsaxsa\",\"instanceName\":\"rm-xsaxsa\"}");
    UpdateDataSourceResponse updateDataSourceResponse = client.updateDataSource(updateRequest);
    Assert.assertTrue(updateDataSourceResponse.getBody().getData());

    DeleteDataSourceRequest deleteRequest = new DeleteDataSourceRequest();
    deleteRequest.setDataSourceId(dataSourceId);
    DeleteDataSourceResponse deleteResponse = client.deleteDataSource(deleteRequest);
    Assert.assertTrue(deleteResponse.getBody().getData());
} catch (Exception e) {
    e.printStackTrace();
    Assert.fail();
}

You can use the UpdateConnection and DeleteConnection operations to update and delete data source information.

Note

The set of APIs for managing project members includes CreateProjectMember, DeleteProjectMember, RemoveProjectMemberFromRole, and ListProjectMembers.

Step 3: Develop, deploy, and schedule tasks

You can use the Data Development API to manage, commit, and publish files to generate auto triggered tasks. These tasks are run based on a timed schedule. The file type is determined by the FileType field. Many FileType values are supported. You can call the ListProgramTypeCount API in the Operation Center to obtain a list of all supported nodes.

// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
CreateFileRequest createFileRequest = new CreateFileRequest();
createFileRequest.setFileFolderPath("Workflow/POP API Test/MaxCompute/test_folder_3");
createFileRequest.setProjectId(-1L);
createFileRequest.setFileName("create_file_name");
createFileRequest.setFileType(10);
String projectIdentifier = "Enter your workspace name";
createFileRequest.setInputList(projectIdentifier+"_root");
createFileRequest.setContent("SHOW TABLES;");
createFileRequest.setFileDescription("create file");
createFileRequest.setRerunMode("ALL_ALLOWED");
CreateFileResponse createFileResponse = client.createFile(createFileRequest);

The content field stores the code for SQL scripts, Shell scripts, and data synchronization scripts. For more information about the format of data synchronization scripts, see Configure a task in the code editor. After you use the CreateFile operation to create a script, you can use the UpdateFile and DeleteFile operations to manage it. Similar to the process in the console, after you finish developing a file, you must commit and deploy it to generate recurring instances. Note that you must poll the DeploymentId that is returned by the SubmitFile operation. The deployment is successful only when the status returned by the GetDeployment operation is complete (status.finished()).

// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
SubmitFileRequest request = new SubmitFileRequest();
request.setFileId(fileId);
request.setComment("submit file");
SubmitFileResponse submitFileResponse = client.submitFile(request);

//Check the submission result
DeploymentStatus status = null;
GetDeploymentResponseBody.GetDeploymentResponseBodyDataDeployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
  GetDeploymentRequest getDeploymentRequest = new GetDeploymentRequest();
  getDeploymentRequest.setDeploymentId(submitFileResponse.getBody().getData());
  GetDeploymentResponse getDeploymentResponse = client.getDeployment(getDeploymentRequest);
  LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getBody().getRequestId());
  Assert.assertNotNull(getDeploymentResponse.getBody().getData());
  deployment = getDeploymentResponse.getBody().getData().getDeployment();
  Assert.assertNotNull(deployment);
  Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
  Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
  Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
  Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
  Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
  Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
  status = Enums.find(DeploymentStatus.class, deployment.getStatus());
  Assert.assertNotNull(status);
  if (status.finished()) {
    LOGGER.info("Deployment finished - FinalStatus[{}]", status);
    break;
  }
  LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
  retryTimes++;
  SleepUtils.seconds(10L);
}

If you are developing in a project that is in standard mode, you must also deploy the file after you commit it. This action submits the file to the scheduling system, where it becomes an auto triggered task. To deploy a file, use the DeployFile operation. Similar to the commit process, you must also use the GetDeployment operation to poll the deployment status.

  DeployFileRequest request = new DeployFileRequest();
request.setFileId(fileId);
request.setComment("deploy file");
DeployFileResponse deployFileResponse = client.deployFile(request);
Long deploymentId = deployFileResponse.getBody().getData();
//Check the deployment result
DeploymentStatus status = null;
GetDeploymentResponseBody.GetDeploymentResponseBodyDataDeployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
    GetDeploymentRequest getDeploymentRequest = new GetDeploymentRequest();
    getDeploymentRequest.setDeploymentId(deploymentId);
    GetDeploymentResponse getDeploymentResponse = client.getDeployment(getDeploymentRequest);
    LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getBody().getRequestId());
    Assert.assertNotNull(getDeploymentResponse.getBody().getData());
    deployment = getDeploymentResponse.getBody().getData().getDeployment();
    Assert.assertNotNull(deployment);
    LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
                        deploymentId, new Gson().toJson(deployment));
    Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
    Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
    status = Enums.find(DeploymentStatus.class, deployment.getStatus());
    Assert.assertNotNull(status);
    if (status.finished()) {
        LOGGER.info("Deployment finished - FinalStatus[{}]", status);
        break;
    }
    LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
    retryTimes++;
    SleepUtils.seconds(10L);
}
Note

In addition to managing files, you can also use Data Development APIs to manage folders, resources, and functions.

Step 4: Configure O&M and monitoring

After you use APIs to create auto triggered tasks, the DataWorks platform generates and runs scheduling instances daily based on a schedule. You can use Operation Center APIs to perform O&M on auto triggered tasks and recurring instances. You can use APIs such as GetNode, GetInstance, and ListInstances to view auto triggered tasks and recurring instances and to monitor their running status. The following code provides an example.

GetInstanceRequest request = new GetInstanceRequest();
request.setInstanceId(INSTANCE_ID);
request.setProjectEnv(PROJECT_ENV);
try {
  GetInstanceResponse response = client.getInstance(request);
  Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));
  BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);
  Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());
  Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());
  Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());
  Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());
  Assert.assertEquals("kzh", bizInstanceDto.getNodeName());
  Assert.assertEquals("", bizInstanceDto.getParamValues());
  Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());
  Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());
  Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());
  Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());
  Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());
} catch (Exception e) {
  e.printStackTrace();
  Assert.fail();
}

If an instance runs abnormally, you can use operations such as RestartInstance, SetSuccessInstance, SuspendInstance, or ResumeInstance to manage it. You can use APIs such as CreateRemind and UpdateRemind to create custom alert rules. This ensures that baselines are generated smoothly every day. If an exception occurs, an alert is sent to notify the relevant personnel to perform manual intervention.

CreateRemindRequest createRemindRequest = new CreateRemindRequest();
createRemindRequest.setRemindName("REMIND_CREATE_TEST");
createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());
createRemindRequest.setRemindType(RemindType.ERROR.name());
createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());
createRemindRequest.setDndEnd("08:00");
createRemindRequest.setNodeIds("-1");
createRemindRequest.setMaxAlertTimes(1);
createRemindRequest.setAlertInterval(1800);
createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());
createRemindRequest.setAlertTargets(MosadConstants.POP_UID);
try {
    CreateRemindResponse createResponse = client.createRemind(createRemindRequest);
    MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));
    Assert.assertTrue(createResponse.getData() > 0);
} catch (Exception ex) {
    ex.printStackTrace();
    return;
}
Note

The Operation Center provides APIs related to auto triggered tasks, manually triggered workflows, baseline queries, and alert configurations and queries.

Step 5: Configure data quality monitoring

In this scenario, you can use the APIs described earlier to sync data from an RDS database to a MaxCompute table on a daily schedule. If you are concerned that dirty data or data loss may affect your online business, you can use the Data Quality monitoring APIs provided by DataWorks. If an exception occurs during table data generation, an alert is immediately triggered and sent to the rule subscribers.

CreateQualityRuleRequest request = new CreateQualityRuleRequest();
request.setBlockType(0);
request.setComment("test-createTemplateRuleSuccess");
request.setCriticalThreshold("50");
request.setEntityId(entityId);
request.setOperator("abs");
request.setPredictType(0);
request.setProjectName(PROJECT_NAME);
request.setProperty("table_count");
request.setPropertyType("table");
request.setRuleName("createTemplateRuleSuccess");
request.setRuleType(0);
request.setTemplateId(7);
request.setWarningThreshold("10");
try {
  CreateQualityRuleResponse response = client.createQualityRule(request);
  Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));
  Long templateRuleId = Long.parseLong(data.toString());
  Assert.assertTrue(templateRuleId > 0);
  return templateRuleId;
} catch (Exception e) {
  e.printStackTrace();
  Assert.assertFalse(true);
  return null;
}
Note

You can use the Data Quality API set, which includes operations such as CreateQualityRule, GetQualityFollower, and CreateQualityRelativeNode, to manage data quality rules. For more information, see the related API documentation.

Step 6: Generate a DataService Studio API

You have used metadata APIs to create tables, Data Development APIs to create files and auto triggered tasks, and Data Quality and Operation Center APIs to configure monitoring rules. The data for the partitioned MaxCompute table can now be generated smoothly. The final step is to use the DataService Studio OpenAPI to generate a DataService Studio API based on the data in the partitioned MaxCompute table. This API can then provide data services to your system.

CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();
createRequest.setTenantId(tenantId);
createRequest.setProjectId(projectId);
createRequest.setApiMode(apiMode);
createRequest.setApiName(apiName);
createRequest.setApiPath(apiPath);
createRequest.setApiDescription("test");
createRequest.setGroupId(groupId);
createRequest.setVisibleRange(visibleRange);
createRequest.setTimeout(10000);
createRequest.setProtocols(protocols);
createRequest.setRequestMethod(requestMethod);
createRequest.setResponseContentType(responseType);

CreateDataServiceApiResponse createResponse = client.createDataServiceApi(createRequest);
Long apiId = createResponse.getBody().getData();
Assert.assertNotNull(apiId);

GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();
getRequest.setTenantId(tenantId);
getRequest.setProjectId(projectId);
getRequest.setApiId(apiId);
GetDataServiceApiResponse getResponse = client.getDataServiceApi(getRequest);
GetDataServiceApiResponseBody.GetDataServiceApiResponseBodyData data = getResponse.getBody().getData();
Assert.assertEquals(apiId, data.getApiId());
Assert.assertEquals(0L, data.getFolderId().longValue());

You can use the CreateDataServiceApi and PublishDataServiceApi operations to convert table data into DataService Studio APIs. This completes the entire data production pipeline. Using the preceding DataWorks OpenAPI operations, you can achieve a seamless connection between your local system and the cloud-based system.