DataWorks自定义节点中运行任务时,需要调用自定义插件,因此在使用自定义节点前您需要创建好自定义插件包,并上传发布至DataWorks,便于使用自定义节点运行任务时使用。本文为您介绍如何创建自定义插件包。

背景信息

DataWorks的自定义节点运行时,插件开发使用过程中涉及以下2个接口。
  • submitJob(String codeFilePath, List args):提交插件任务的接口。
    包含以下入参。
    • codeFilePath:为页面开发的代码存储绝对路径。
    • List args:为页面配置的调度参数,格式为{"key"="value","key2"="value2"...}
    返回参数如下。
    • 0:表示任务运行成功。
    • 2:表示告知调度系统重新运行任务。
    • 14、或3:表示任务被终止。
    • 其他数值:表示任务运行失败。
  • killJob():此接口主要用于监听任务终止(kill)信号,然后会触发该接口调用。此接口没有入参和返回参数。
    说明 若业务层面在9秒内未处理完毕,整个插件进程会被强制终止,整个任务运行失败。

下载依赖JAR

点击以下链接下载依赖JAR包:alisa-wrapper-face-1.0.0.jar

打包代码工程包

  1. 新建插件代码工程。
    用IDE工具新建一个Maven工程,文件结构和pom.xml文件配置要求如下所示。
    • 文件结构要求
      文件结构要求参考下图,其中lib文件夹下alisa-wrapper-face-1.0.0.jar为上述下载的依赖JAR包,需要用本地依赖引入的方式引入代码工程中。代码工程文件结构
    • pom.xml文件配置要求
      pom.xml文件示例如下,其中插件的类名(<mainclass>)为submitJob方法所在的类路径,如com.alibaba.dw.wrapper
      <groupId>org.example</groupId>
      <artifactId>dw-plugin</artifactId>
      <version>1.0-SNAPSHOT</version>
      
      <dependencies>
          <dependency>
              <groupId>com.alibaba.dw</groupId>
              <artifactId>alisa-wrapper-face</artifactId>
              <version>1.0.0-SNAPSHOT</version>
              <scope>system</scope>
              <systemPath>${basedir}/lib/alisa-wrapper-face-1.0.0.jar</systemPath>
          </dependency>
          <dependency>
              <groupId>commons-lang</groupId>
              <artifactId>commons-lang</artifactId>
              <version>2.4</version>
          </dependency>
          <dependency>
              <groupId>commons-io</groupId>
              <artifactId>commons-io</artifactId>
              <version>2.6</version>
          </dependency>
          <dependency>
              <groupId>org.apache.commons</groupId>
              <artifactId>commons-lang3</artifactId>
              <version>3.8.1</version>
          </dependency>
      </dependencies>
      
      <build>
          <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-assembly-plugin</artifactId>
                  <version>2.5.5</version>
                  <configuration>
                      <archive>
                          <manifest>
                              <mainClass>com.alibaba.dw.wrapper.DemoWrapper</mainClass>
                          </manifest>
                      </archive>
                      <descriptorRefs>
                          <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                  </configuration>
                  <executions>
                      <execution>
                          <id>make-assembly</id>
                          <phase>package</phase>
                          <goals>
                              <goal>assembly</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
  2. 代码开发。
    代码示例一:基本逻辑
    package com.alibaba.dw.alisa.wrapper;
    import java.io.File;
    import java.util.List;
    import com.alibaba.dw.alisawrapper.DwalisaWrapper;
    import com.alibaba.dw.alisawrapper.constants.Constant;
    /**
    **DwalisaWrapper 在alisa-wrapper-face包
    **/
    public class DemoWrapper extends DwalisaWrapper {
       /**
        * codeFilePath: 代码存储的全路径。
        * args:传入的参数。注意其中args[0]是作为codeFilePath,如果任务无代码,则args[0]即为第一个参数。
        * 该方法实现该插件的主要功能内容,业务逻辑都在次方法内部实现。
        * 返回码0:表示任务成功。
        * 返回码1、4、3:表示任务被终止。
        * 返回码为其他数值:表示任务失败。
        */
       @SuppressWarnings("deprecation")
       @Override
       public Integer submitJob(String codeFilePath, List<String> args) {
           try {
               System.err.println("your code->");
               //此处实现业务代码,此方法执行完毕后任务执行结束。
           } catch (Exception e) {
               System.err.println(e);
           }
           System.out.println("task finished...");
           return Constant.SUCCESSED_EXIT_CODE;
       }
       /**
        * 为终止任务方法,一旦发起终止任务时,会调用方法作为一些业务上的处理之后再退出。
        * 注意:一旦9秒未退出,则会直接返回kill-9,终止该任务进程。
        *
        */
       @Override
       public void killJob() {
           System.err.println("收到了终止信号,需要做一些业务操作把任务从服务端终止");
       }
    }
    代码示例二:基于数据源开发的自定义节点

    MysqlWrapper.java

    package com.alibaba.dw.alisa.wrapper;
    
    import java.io.File;
    import java.nio.charset.StandardCharsets;
    import java.sql.Connection;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.Statement;
    import java.util.List;
    
    import org.apache.commons.io.FileUtils;
    
    import com.alibaba.dw.alisa.wrapper.util.ConnectionManager;
    import com.alibaba.dw.alisa.wrapper.util.SqlUtils;
    import com.alibaba.dw.alisawrapper.DwalisaWrapper;
    import com.alibaba.dw.alisawrapper.constants.Constant;
    import com.alibaba.dw.alisawrapper.utils.TaskDirUtils;
    import com.csvreader.CsvWriter;
    
    public class MysqlWrapper extends DwalisaWrapper {
    
        private Connection conn = null;
        private Statement stmt = null;
    
        private static final int MAX_ROWS = 10000;
    
        /**
         * codeFilePath: 代码存储的全路径 args:传入的参数;注意其中args[0] 是作为codeFilePath,如果任务无代码,则args[0]即为第一个参数
         * 该方法实现该插件的主要功能内容,业务逻辑都在次方法内部实现; 返回码0:表示任务成功; 返回码143:表示任务被kill; 返回码1:表示任务失败 获取odps的信息:环境变量获取: id:ODPS_ACCESSID
         * key:ODPS_ACCESSKEY endpoint:ODPS_ENDPOINT
         */
        @Override
        public Integer submitJob(String codeFilePath, List<String> args) {
            try {
                System.out.println("code-content: ");
                File file = new File(codeFilePath);
                String sqlContent = FileUtils.readFileToString(file);
                String execContent = getParaValueContent(sqlContent, args);
                System.out.println(execContent);
                executeJdbcSql(execContent);
            } catch (Exception e) {
                System.err.println(e);
                return Constant.FAILED_EXIT_CODE;
            }
            return Constant.SUCCESSED_EXIT_CODE;
        }
    
        private String getParaValueContent(String sqlContent, List<String> args) throws Exception {
            String content = sqlContent;
            if (args == null || args.size() <= 0) {
                return content;
            }
            // 替换${...}参数
            for (String keyValue : args) {
                System.out.println(args);
                if (keyValue.contains("=")) {
                    String[] param = keyValue.split("=");
                    String target = "${" + param[0] + "}";
                    String replacement = param[1];
                    content = content.replace(target, replacement);
                } else {
                    System.err.println("param format is invalid,key=value");
                    throw new Exception("param exception!");
                }
            }
            return content;
        }
    
        /**
         * 为kill任务方法,一旦发起kill任务时候,会调用方法作为一些业务上的处理之后再退出; 注意:一旦9秒未退出,则会直接kill-9该任务进程
         */
        @Override
        public void killJob() {
            System.out.println("Accept kill signal...");
            try {
                if (stmt != null) {
                    stmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
                System.out.println("Kill succeed");
            } catch (Exception e) {
                System.err.println("kill job error! " + e.getMessage());
            }
        }
    
        private void executeJdbcSql(String sqlContent) throws Exception {
            List<String> sqlList = SqlUtils.splitSql(sqlContent);
            try {
                /**
                 * (1)认证,获取连接
                 */
                System.out.println("Connecting to Server...");
                String jdbcConn = System.getenv("SKYNET_CONNECTION");
                // 获取连接串(json字符串)进行解析生成connection
                conn = ConnectionManager.getJDBCConnection(jdbcConn);
                System.out.println("Connected to Server!");
    
                stmt = conn.createStatement();
                stmt.setMaxRows(MAX_ROWS);
    
                for (String sql : sqlList) {
                    System.out.println("start run... sql: " + sql);
                    /**
                     * (2) 执行sql
                     */
                    boolean hasResult = stmt.execute(sql);
                    if (hasResult) {
                        /**
                         * (3) 写结果文件
                         */
                        storeResult(stmt.getResultSet());
                    }
                }
    
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("sql execute failed! " + e.getMessage());
                throw e;
            } finally {
                if (stmt != null) {
                    stmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
                System.out.println("release sql connection...");
                System.out.println("Job Finished!");
            }
        }
    
        protected void storeResult(ResultSet rs) throws Exception {
            ResultSetMetaData rsmd = rs.getMetaData();
            int columnsNumber = rsmd.getColumnCount();
            // 获取结果文件路径
            String resultFilePath = TaskDirUtils.getDataFile();
            FileUtils.writeStringToFile(new File(resultFilePath), "");
            CsvWriter csvWriter = new CsvWriter(resultFilePath, ',', StandardCharsets.UTF_8);
            csvWriter.setTextQualifier('"');
            csvWriter.setUseTextQualifier(false);
            csvWriter.setForceQualifier(true);
            String[] headerList = new String[columnsNumber];
            for (int i = 0; i < columnsNumber; i++) {
                headerList[i] = rsmd.getColumnName(i + 1);
            }
            csvWriter.writeRecord(headerList);
            while (rs.next()) {
                String[] lineEles = new String[columnsNumber];
                for (int i = 0; i < columnsNumber; i++) {
                    lineEles[i] = rs.getString(i + 1);
                }
                csvWriter.writeRecord(lineEles, true);
            }
            csvWriter.flush();
            csvWriter.close();
            System.out.println("store result finished.");
        }
    }

    ConnectionManager.java

    public class ConnectionManager {
    
        public static Connection getJDBCConnection(String connectionJson) throws Exception {
            Connection connection = null;
            try {
                String jdbcUrl = null;
                String userName = null;
                String password = null;
                if(StringUtils.isNotBlank(connectionJson)){
                    JSONObject connectionObj = JSON.parseObject(connectionJson);
                    jdbcUrl = connectionObj.getString("jdbcUrl");
                    userName = connectionObj.getString("username");
                    password = connectionObj.getString("password");
                }else{
                    throw new Exception("member in connection is null!");
                }
                String driverClassName = getDriverClassName(jdbcUrl);
                System.out.println("load driver class: " + driverClassName);
                Class.forName(driverClassName);
                DriverManager.setLoginTimeout(20);
                connection = DriverManager.getConnection(jdbcUrl, userName, password);
            } catch (ClassNotFoundException e) {
                System.err.println("acquire connection failed! " + e);
                System.exit(1);
            }
            return connection;
        }
    
            private static String getDriverClassName(String jdbcUrl) throws Exception{
    
            if(StringUtils.contains(jdbcUrl, "jdbc:mysql")){
                return "com.mysql.cj.jdbc.Driver";
            }else{
                throw new Exception("unsupported jdbc driver");
            }
    
        }
  3. 设置数据查询结果集被页面展示。
    如果您希望后续使用自定义节点时,进行数据查询等操作时,结果能在DataWorks的页面中直接展示,您需参考本步骤设置数据查询结果集被页面展示。
    从submitJob接口业务运行代码后,需将获取到的结果集(比如查询表记录)按照以下规则存储到指定目录中:
    • 结果集文件获取方式
      String dataFilePath =String.format("%s/%s.data", System.getenv(Constant.TASK_EXEC_PATH), System.getenv(Constant.ALISA_TASK_ID));
      其中System.getenv为获取环境变量的方式,详细介绍可参考附录:通过环境变量获取对应的节点信息
    • 存储格式
      • 文件格式:CSV格式,字段间以逗号分隔。
      • 行分布:首行为列名称,后续各行为具体字段的数据。
      样例:
      "name","age"
      "李三","12"
  4. 代码开发完成后进行Maven构建打包。
    将本身依赖的包打成JAR包。
  5. 后续步骤:上传插件包。
    如果依赖了protobuf、guava的包,或者其他外部依赖JAR包,需将这些依赖的包和上述步骤打成的JAR包合并为ZIP包后,作为插件上传至DataWorks。上传插件的详细操作可参考新增节点插件

附录:通过环境变量获取对应的节点信息

获取环境变量值的方式:System.getenv("SKYNET_ONDUTY")

  • SKYNET_ID:节点ID,只有在调度运行才生效,数据开发直接运行是无该参数的。
  • SKYNET_BIZDATE:业务日期。
  • SKYNET_ONDUTY:
    • 临时运行、补数据、测试时:表示操作者ID。
    • 日常调度:节点责任人ID(工号/baseid)。
  • SKYNET_TASKID:节点实例。
  • IDSKYNET_SYSTEM_ENV:对应的项目环境,包括:dev 、prod。
  • SKYNET_CYCTIME:节点实例的定时时间。
  • SKYNET_CONNECTION:连接串(一般是json)。
  • SKYNET_TENANT_ID:租户ID。