To run a task based on a custom node, you must use a custom wrapper. Before you use a custom node, you must create a custom wrapper package and upload and deploy the package to DataWorks. This topic describes how to create a custom wrapper package.

Background information

The following methods are used during the running of DataWorks custom nodes and the development and use of wrappers:
  • submitJob(String codeFilePath, List args): submits a node that corresponds to a wrapper.
    Input parameters:
    • codeFilePath: specifies the absolute path that is used to store the developed code.
    • List args: specifies the scheduling parameters that you want to configure. The value is in the {"key"="value","key2"="value2"...} format.
    Returned results:
    • 0: indicates that the task succeeded.
    • 2: indicates that the scheduling system needs to rerun the task.
    • 1, 4, or 3: indicates that the task is terminated.
    • Other values: indicates that the task failed.
  • killJob(): listens to the task termination indication and triggers an operation. This method does not have input parameters or provide returned results.
    Note If the task is not processed at the business layer within 9 seconds, the wrapper is forcibly terminated and the task fails.

Download a dependency JAR package

Click alisa-wrapper-face-1.0.0.jar to download the dependency JAR package.

Package code

  1. Create a wrapper code project.
    Use an integrated development environment (IDE) to create a Maven project. The following description provides the structure requirements for files and the configuration requirements for the pom.xml file:
    • Structure requirements for files
      The following figure shows the structure requirements for files. alisa-wrapper-face-1.0.0.jar in the lib folder is the downloaded dependency JAR package. This package is introduced to the project in the same way as an on-premises package. Code engineering file structure
    • Configuration requirements for the pom.xml file
      The following code provides an example for the configuration of the pom.xml file. The class name of the wrapper (indicated by <mainclass>) is the classpath of the submitJob method, such as com.alibaba.dw.wrapper.
      <groupId>org.example</groupId>
      <artifactId>dw-plugin</artifactId>
      <version>1.0-SNAPSHOT</version>
      
      <dependencies>
          <dependency>
              <groupId>com.alibaba.dw</groupId>
              <artifactId>alisa-wrapper-face</artifactId>
              <version>1.0.0-SNAPSHOT</version>
              <scope>system</scope>
              <systemPath>${basedir}/lib/alisa-wrapper-face-1.0.0.jar</systemPath>
          </dependency>
          <dependency>
              <groupId>commons-lang</groupId>
              <artifactId>commons-lang</artifactId>
              <version>2.4</version>
          </dependency>
          <dependency>
              <groupId>commons-io</groupId>
              <artifactId>commons-io</artifactId>
              <version>2.6</version>
          </dependency>
          <dependency>
              <groupId>org.apache.commons</groupId>
              <artifactId>commons-lang3</artifactId>
              <version>3.8.1</version>
          </dependency>
      </dependencies>
      
      <build>
          <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-assembly-plugin</artifactId>
                  <version>2.5.5</version>
                  <configuration>
                      <archive>
                          <manifest>
                              <mainClass>com.alibaba.dw.wrapper.DemoWrapper</mainClass>
                          </manifest>
                      </archive>
                      <descriptorRefs>
                          <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                  </configuration>
                  <executions>
                      <execution>
                          <id>make-assembly</id>
                          <phase>package</phase>
                          <goals>
                              <goal>assembly</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
  2. Develop code.
    Example 1: Basic logic
    package com.alibaba.dw.alisa.wrapper;
    import java.io.File;
    import java.util.List;
    import com.alibaba.dw.alisawrapper.DwalisaWrapper;
    import com.alibaba.dw.alisawrapper.constants.Constant;
    /**
    **DwalisaWrapper is in the alisa-wrapper-face package.
    **/
    public class DemoWrapper extends DwalisaWrapper {
       /**
        * codeFilePath: the full path that is used to store the code. 
        * args: the input parameters. args[0] is used as codeFilePath. If the task has no code, args[0] is the first parameter. 
        * This method implements the main features of the wrapper, and the business logic is implemented within the method. 
        * The return code 0 indicates that the task succeeded. 
        * Return codes 1, 4, and 3 indicate that the task is terminated. 
        * Other return codes indicate that the task failed. 
        */
       @SuppressWarnings("deprecation")
       @Override
       public Integer submitJob(String codeFilePath, List<String> args) {
           try {
               System.err.println("your code->");
               // Implement the business code. The task is finished after this method is run. 
           } catch (Exception e) {
               System.err.println(e);
           }
           System.out.println("task finished...");
           return Constant.SUCCESSED_EXIT_CODE;
       }
       /**
        * After task termination is initiated, the method is called to perform business processing before the method is terminated. 
        * If the method is not terminated within 9 seconds, the system returns kill-9 to terminate the task. 
        *
        */
       @Override
       public void killJob() {
           System.err.println("After the task termination indication is detected, some business operations are performed to terminate the task from the server.");
       }
    }
    Example 2: A custom node developed based on a data source

    MysqlWrapper.java

    package com.alibaba.dw.alisa.wrapper;
    
    import java.io.File;
    import java.nio.charset.StandardCharsets;
    import java.sql.Connection;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.Statement;
    import java.util.List;
    
    import org.apache.commons.io.FileUtils;
    
    import com.alibaba.dw.alisa.wrapper.util.ConnectionManager;
    import com.alibaba.dw.alisa.wrapper.util.SqlUtils;
    import com.alibaba.dw.alisawrapper.DwalisaWrapper;
    import com.alibaba.dw.alisawrapper.constants.Constant;
    import com.alibaba.dw.alisawrapper.utils.TaskDirUtils;
    import com.csvreader.CsvWriter;
    
    public class MysqlWrapper extends DwalisaWrapper {
    
        private Connection conn = null;
        private Statement stmt = null;
    
        private static final int MAX_ROWS = 10000;
    
        /**
         * codeFilePath: the full path that is used to store the code. args: the input parameters. args[0] is used as codeFilePath. If the task has no code, args[0] is the first parameter.
         * This method implements the main features of the wrapper, and the business logic is implemented within the method. The return code 0 indicates that the task succeeded. Return codes 1, 4, and 3 indicate that the task is terminated. The return code 1 indicates that the task failed. Obtain the information of a MaxCompute project: Obtain environment variables: id: ODPS_ACCESSID.
         * key:ODPS_ACCESSKEY endpoint:ODPS_ENDPOINT
         */
        @Override
        public Integer submitJob(String codeFilePath, List<String> args) {
            try {
                System.out.println("code-content: ");
                File file = new File(codeFilePath);
                String sqlContent = FileUtils.readFileToString(file);
                String execContent = getParaValueContent(sqlContent, args);
                System.out.println(execContent);
                executeJdbcSql(execContent);
            } catch (Exception e) {
                System.err.println(e);
                return Constant.FAILED_EXIT_CODE;
            }
            return Constant.SUCCESSED_EXIT_CODE;
        }
    
        private String getParaValueContent(String sqlContent, List<String> args) throws Exception {
            String content = sqlContent;
            if (args == null || args.size() <= 0) {
                return content;
            }
            // Replace ${...} parameters
            for (String keyValue : args) {
                System.out.println(args);
                if (keyValue.contains("=")) {
                    String[] param = keyValue.split("=");
                    String target = "${" + param[0] + "}";
                    String replacement = param[1];
                    content = content.replace(target, replacement);
                } else {
                    System.err.println("param format is invalid, key=value");
                    throw new Exception("param exception!");
                }
            }
            return content;
        }
    
        /**
         * The method that is used to kill the task. After task termination is initiated, the method is called to perform business processing before the method is terminated. If the method is not terminated within 9 seconds, the system returns kill-9 to terminate the task.
         */
        @Override
        public void killJob() {
            System.out.println("Accept kill signal...");
            try {
                if (stmt != null) {
                    stmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
                System.out.println("Kill succeed");
            } catch (Exception e) {
                System.err.println("kill job error! " + e.getMessage());
            }
        }
    
        private void executeJdbcSql(String sqlContent) throws Exception {
            List<String> sqlList = SqlUtils.splitSql(sqlContent);
            try {
                /**
                 * (1) Perform authentication and obtain the connection string.
                 */
                System.out.println("Connecting to Server...");
                String jdbcConn = System.getenv("SKYNET_CONNECTION");
                // Obtain and parse the connection string to establish a connection. The string is in the JSON format.
                conn = ConnectionManager.getJDBCConnection(jdbcConn);
                System.out.println("Connected to Server!");
    
                stmt = conn.createStatement();
                stmt.setMaxRows(MAX_ROWS);
    
                for (String sql : sqlList) {
                    System.out.println("start run... sql: " + sql);
                    /**
                     * (2) Execute SQL statements.
                     */
                    boolean hasResult = stmt.execute(sql);
                    if (hasResult) {
                        /**
                         * (3) Generate results and store the results in a file.
                         */
                        storeResult(stmt.getResultSet());
                    }
                }
    
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("sql execute failed! " + e.getMessage());
                throw e;
            } finally {
                if (stmt != null) {
                    stmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
                System.out.println("release sql connection...");
                System.out.println("Job Finished!");
            }
        }
    
        protected void storeResult(ResultSet rs) throws Exception {
            ResultSetMetaData rsmd = rs.getMetaData();
            int columnsNumber = rsmd.getColumnCount();
            // Obtain the path that is used to store the result file.
            String resultFilePath = TaskDirUtils.getDataFile();
            FileUtils.writeStringToFile(new File(resultFilePath), "");
            CsvWriter csvWriter = new CsvWriter(resultFilePath, ',', StandardCharsets.UTF_8);
            csvWriter.setTextQualifier('"');
            csvWriter.setUseTextQualifier(false);
            csvWriter.setForceQualifier(true);
            String[] headerList = new String[columnsNumber];
            for (int i = 0; i < columnsNumber; i++) {
                headerList[i] = rsmd.getColumnName(i + 1);
            }
            csvWriter.writeRecord(headerList);
            while (rs.next()) {
                String[] lineEles = new String[columnsNumber];
                for (int i = 0; i < columnsNumber; i++) {
                    lineEles[i] = rs.getString(i + 1);
                }
                csvWriter.writeRecord(lineEles, true);
            }
            csvWriter.flush();
            csvWriter.close();
            System.out.println("store result finished.");
        }
    }

    ConnectionManager.java

    public class ConnectionManager {
    
        public static Connection getJDBCConnection(String connectionJson) throws Exception {
            Connection connection = null;
            try {
                String jdbcUrl = null;
                String userName = null;
                String password = null;
                if(StringUtils.isNotBlank(connectionJson)){
                    JSONObject connectionObj = JSON.parseObject(connectionJson);
                    jdbcUrl = connectionObj.getString("jdbcUrl");
                    userName = connectionObj.getString("username");
                    password = connectionObj.getString("password");
                }else{
                    throw new Exception("member in connection is null!");
                }
                String driverClassName = getDriverClassName(jdbcUrl);
                System.out.println("load driver class: " + driverClassName);
                Class.forName(driverClassName);
                DriverManager.setLoginTimeout(20);
                connection = DriverManager.getConnection(jdbcUrl, userName, password);
            } catch (ClassNotFoundException e) {
                System.err.println("acquire connection failed! " + e);
                System.exit(1);
            }
            return connection;
        }
    
            private static String getDriverClassName(String jdbcUrl) throws Exception{
    
            if(StringUtils.contains(jdbcUrl, "jdbc:mysql")){
                return "com.mysql.cj.jdbc.Driver";
            }else{
                throw new Exception("unsupported jdbc driver");
            }
    
        }
  3. Configure the system to display the query result sets on pages.
    After the configuration, you can view the data that is queried by using a custom node on the related page in the DataWorks console.
    After the code is run, you must store the obtained result sets, such as queried table records, in a specific directory in accordance with the following rules:
    • Method to obtain the result sets
      String dataFilePath =String.format("%s/%s.data", System.getenv(Constant.TASK_EXEC_PATH), System.getenv(Constant.ALISA_TASK_ID));
      System.getenv indicates the method that is used to obtain environment variables. For more information, see Appendix: Use environment variables to obtain the information of the related node.
    • Storage format
      • File format: CSV. The fields in the file are separated by commas (,).
      • Row distribution: The first row is field names, and other rows are field values.
      Example:
      "name","age"
      "Alice","12"
  4. After code is developed, package the Maven project.
    Compress the dependencies of the project into a JAR package.
  5. Upload the wrapper package.
    If external dependencies such as Protobuf and Guava are used, you must compress the dependencies and the preceding JAR package into a ZIP package. Then, upload the ZIP package to DataWorks as a wrapper. For more information, see Create a wrapper.

Appendix: Use environment variables to obtain the information of the related node

You can use System.getenv("SKYNET_ONDUTY") to obtain the values of the environment variables.

  • SKYNET_ID: the ID of the node. This variable is available only when the node is scheduled to run.
  • SKYNET_BIZDATE: the data timestamp.
  • SKYNET_ONDUTY:
    • Indicates the operator ID during temporary running, data backfill, or tests.
    • Indicates the ID (employee ID or baseId) of the node owner.
  • SKYNET_TASKID: the node instance.
  • IDSKYNET_SYSTEM_ENV: the environment where the node runs, such as the development and production environment.
  • SKYNET_CYCTIME: the time at which the node instance is scheduled to run.
  • SKYNET_CONNECTION: the connection string. In most cases, the string is in the JSON format.
  • SKYNET_TENANT_ID: the tenant ID.