DataWorks自定义节点中运行任务时,需要调用自定义插件,因此在使用自定义节点前您需要创建好自定义插件包,并上传发布至DataWorks,便于使用自定义节点运行任务时使用。本文为您介绍如何创建自定义插件包。

背景信息

DataWorks的自定义节点运行时,插件开发使用过程中涉及以下2个接口。
  • submitJob(String codeFilePath, List args):提交插件任务的接口。
    包含以下入参。
    • codeFilePath:为页面开发的代码存储绝对路径。
    • List args:为页面配置的调度参数,格式为{"key"="value","key2"="value2"...}
    返回参数如下。
    • 0:表示任务运行成功。
    • 2:表示告知调度系统重新运行任务。
    • 14、或3:表示任务被终止。
    • 其他数值:表示任务运行失败。
  • killJob():此接口主要用于监听任务终止(kill)信号,然后会触发该接口调用。此接口没有入参和返回参数。
    说明 若业务层面在9秒内未处理完毕,整个插件进程会被强制终止,整个任务运行失败。

下载依赖JAR

点击以下链接下载依赖JAR包:alisa-wrapper-face-1.0.0.jar

打包代码工程包

  1. 新建插件代码工程。
    用IDE工具新建一个maven工程,文件结构和pom.xml文件配置要求如下所示。
    • 文件结构要求
      文件结构要求参考下图,其中lib文件夹下alisa-wrapper-face-1.0.0.jar为上述下载的依赖JAR包,需要用本地依赖引入的方式引入代码工程中。代码工程文件结构
    • pom.xml文件配置要求
      pom.xml文件示例如下,其中插件的类名(<mainclass>)为submitJob方法所在的类路径,如com.alibaba.dw.wrapper
      <groupId>org.example</groupId>
      <artifactId>dw-plugin</artifactId>
      <version>1.0-SNAPSHOT</version>
      
      <dependencies>
          <dependency>
              <groupId>com.alibaba.dw</groupId>
              <artifactId>alisa-wrapper-face</artifactId>
              <version>1.0.0-SNAPSHOT</version>
              <scope>system</scope>
              <systemPath>${basedir}/lib/alisa-wrapper-face-1.0.0.jar</systemPath>
          </dependency>
          <dependency>
              <groupId>commons-lang</groupId>
              <artifactId>commons-lang</artifactId>
              <version>2.4</version>
          </dependency>
          <dependency>
              <groupId>commons-io</groupId>
              <artifactId>commons-io</artifactId>
              <version>2.6</version>
          </dependency>
          <dependency>
              <groupId>org.apache.commons</groupId>
              <artifactId>commons-lang3</artifactId>
              <version>3.8.1</version>
          </dependency>
      </dependencies>
      
      <build>
          <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-assembly-plugin</artifactId>
                  <version>2.5.5</version>
                  <configuration>
                      <archive>
                          <manifest>
                              <mainClass>com.alibaba.dw.wrapper.DemoWrapper</mainClass>
                          </manifest>
                      </archive>
                      <descriptorRefs>
                          <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                  </configuration>
                  <executions>
                      <execution>
                          <id>make-assembly</id>
                          <phase>package</phase>
                          <goals>
                              <goal>assembly</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
  2. 代码开发。
    代码示例Demo如下所示。
    package com.alibaba.dw.alisa.wrapper;
    import java.io.File;
    import java.util.List;
    import com.alibaba.dw.alisawrapper.DwalisaWrapper;
    import com.alibaba.dw.alisawrapper.constants.Constant;
    /**
    **DwalisaWrapper 在alisa-wrapper-face包
    **/
    public class DemoWrapper extends DwalisaWrapper {
       /**
        * codeFilePath: 代码存储的全路径。
        * args:传入的参数。注意其中args[0]是作为codeFilePath,如果任务无代码,则args[0]即为第一个参数。
        * 该方法实现该插件的主要功能内容,业务逻辑都在次方法内部实现。
        * 返回码0:表示任务成功。
        * 返回码1、4、3:表示任务被终止。
        * 返回码为其他数值:表示任务失败。
        */
       @SuppressWarnings("deprecation")
       @Override
       public Integer submitJob(String codeFilePath, List<String> args) {
           try {
               System.err.println("your code->");
               //此处实现业务代码,此方法执行完毕后任务执行结束。
           } catch (Exception e) {
               System.err.println(e);
           }
           System.out.println("task finished...");
           return Constant.SUCCESSED_EXIT_CODE;
       }
       /**
        * 为终止任务方法,一旦发起终止任务时,会调用方法作为一些业务上的处理之后再退出。
        * 注意:一旦9秒未退出,则会直接返回kill-9,终止该任务进程。
        *
        */
       @Override
       public void killJob() {
           System.err.println("收到了终止信号,需要做一些业务操作把任务从服务端终止");
       }
    }
  3. 设置数据查询结果集被页面展示。
    如果您希望后续使用自定义节点时,进行数据查询等操作时,结果能在DataWorks的页面中直接展示,您需参考本步骤设置数据查询结果集被页面展示。
    从submitJob接口业务运行代码后,需将获取到的结果集(比如查询表记录)按照以下规则存储到指定目录中:
    • 结果集文件获取方式
      String dataFilePath =String.format("%s/%s.data", System.getenv(Constant.TASK_EXEC_PATH), System.getenv(Constant.ALISA_TASK_ID));
      其中System.getenv为获取环境变量的方式,详细介绍可参考附录:通过环境变量获取对应的节点信息
    • 存储格式
      • 文件格式:CSV格式,字段间以逗号分隔。
      • 行分布:首行为列名称,后续各行为具体字段的数据。
      样例:
      "name","age"
      "李三","12"
  4. 代码开发完成后进行maven构建打包。
    将本身依赖的包打成JAR包。
  5. 后续步骤:上传插件包。
    如果依赖了protobuf、guava的包,或者其他外部依赖JAR包,需将这些依赖的包和上述步骤打成的JAR包合并为ZIP包后,作为插件上传至DataWorks。上传插件的详细操作可参考新增节点插件

附录:通过环境变量获取对应的节点信息

获取环境变量值的方式:System.getenv("SKYNET_ONDUTY")

  • SKYNET_ID:节点ID,只有在调度运行才生效,数据开发直接运行是无该参数的。
  • SKYNET_BIZDATE:业务日期。
  • SKYNET_ONDUTY:
    • 临时运行、补数据、测试时:表示操作者ID。
    • 日常调度:节点责任人ID(工号/baseid)。
  • SKYNET_TASKID:节点实例。
  • IDSKYNET_SYSTEM_ENV:对应的项目环境,包括:dev 、prod。
  • SKYNET_CYCTIME:节点实例的定时时间。
  • SKYNET_CONNECTION:连接串(一般是json)。
  • SKYNET_TENANT_ID:租户ID。