本文为您介绍如何使用OpenAPI创建数据集成同步任务,同步来源端数据至去向端。

前提条件

使用限制

  • DataWorks当前仅支持使用OpenAPI创建数据集成离线同步任务。
  • 调用CreateDISyncTask创建数据集成同步任务,仅支持使用脚本模式配置同步任务内容,详情请参见通过脚本模式配置任务
  • DataWorks暂不支持使用OpenAPI创建业务流程,您需要使用现有的业务流程创建数据同步任务。

配置环境依赖及账号认证

  • 配置Maven依赖。
    1. 打开Maven项目下的pom.xml文件,添加aliyun-java-sdk-core
      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
      </dependency>
    2. 打开Maven项目下的pom.xml文件,添加aliyun-java-sdk-dataworks-public
      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>3.3.18</version>
      </dependency>
  • 客户端认证。
    使用OpenAPI创建数据同步任务前,需要调用如下语句对登录阿里云的账号相关信息进行认证。如果阿里云的账号信息认证通过,则继续执行后续任务,如果认证不通过,则该调用会报错,您需要根据实际报错处理相关问题。
     DefaultProfile profile = DefaultProfile.getProfile(
                               "regionId",            //DataWorks工作空间所在的地域,例如cn-hangzhou。
                               "<yourAccessKeyId>",   //登录DataWorks工作空间的阿里云账号的AccessKey ID。
                               "<yourAccessSecret>"); //登录DataWorks工作空间的阿里云账号的AccessKey Secret。
     IAcsClient client = new DefaultAcsClient(profile);
    您可以登录DataWorks控制台鼠标悬停至右上角的用户头像,单击AccessKey管理,进入AccessKey管理页面获取AccessKey ID和AccessKey Secret。

配置流程

完成上述配置环境依赖及账号认证后,您可以通过OpenAPI调用相关接口,创建数据同步任务,同步来源端数据至去向端。配置流程如下:
  1. 创建数据集成任务。
  2. 配置任务的调度依赖。
  3. 提交数据集成任务。
  4. 发布同步任务至生产环境。

配置步骤

  1. 创建数据集成任务。
    调用CreateDISyncTask接口,创建数据集成任务。如下代码仅示例部分参数的配置,更多参数详情请参见CreateDISyncTask
    public void createFile() throws ClientException{
            CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
            request.setProjectId(181565L);
            request.setTaskType("DI_OFFLINE");
            request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"same表comment\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_first\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"same表comment\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
            request.setTaskParam("{\"FileFolderPath\":\"业务流程/new_biz/数据集成\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
            request.setTaskName("new_di_task_0607_1416");
    
    
            String akId = "XXX";
            String akSecret = "XXXX";
            String regionId = "cn-hangzhou";
            IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
            DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
            IAcsClient client;
    
            client = new DefaultAcsClient(profile);
    
            CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
            Gson gson1 = new Gson();
    
            System.out.println(gson1.toJson(response1));
    }
  2. 设置任务的调度依赖。
    调用UpdateFile接口,设置数据集成任务的调度依赖,参数详情请参见UpdateFile
  3. 提交数据集成任务。
    调用SubmitFile接口,提交数据集成任务至调度系统的开发环境。任务提交后,Response会返回deploymentId,您可以调用GetDeployment接口,通过deploymentId获取本次发布包的详细信息。
     public void submitFile() throws ClientException{
            SubmitFileRequest request = new SubmitFileRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // 此节点ID为创建节点时返回的ID,对应数据库File表的file_id。
            request.setFileId(501576542L);
            request.setComment("备注");
            SubmitFileResponse acsResponse = client.getAcsResponse(request);
          //调用GetDeployment接口,获取本次发布的具体情况。
          Long deploymentId = acsResponse.getData();
            log.info(acsResponse.toString());
    }
    上述代码仅示例部分参数的配置,更多参数详情请参见SubmitFileGetDeployment
  4. 发布同步任务到生产环境。
    调用DeployFile接口,发布数据集成同步任务至生产环境。
    说明 仅标准模式的工作空间涉及执行该发布操作。
     public void deploy() throws ClientException{
            DeployFileRequest request = new DeployFileRequest();
            request.setProjectIdentifier("zxy_8221431");
            request.setFileId(501576542L);
            request.setComment("备注");
            //NodeId和file_id二选一。NodeId的值为调度配置中基础属性的节点ID。
            request.setNodeId(700004537241L);
            DeployFileResponse acsResponse = client.getAcsResponse(request);
             //调用GetDeployment接口,获取本次发布的具体情况。
            Long deploymentId = acsResponse.getData();
            log.info(acsResponse.getData().toString());
        }
    上述代码仅示例部分参数的配置,更多参数详情请参见DeployFile
  5. 获取发布包详情。
    任务发布后,Response会返回deploymentId,您可以调用GetDeployment接口,通过deploymentId获取本次发布包的详细信息。当GetDeployment接口的返回参数Status取值为1时,则表示此次发布成功。
    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          //DeploymentId为提交或发布的返回值。
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }
    上述代码仅示例部分参数的配置,更多参数详情请参见GetDeployment

修改同步任务的相关配置

成功创建数据集成同步任务后,您可以调用UpdateDISyncTask接口更新任务的Content,或通过TaskParam来更新使用的独享资源组。更新后,您需要重新提交、发布同步任务,详情请参见配置流程