To run a task based on a custom node, you must use a custom wrapper. Before you use a custom node, you must create a custom wrapper package and upload and deploy the package to DataWorks. This topic describes how to create a custom wrapper package.

Background information

The following methods are used during the running of DataWorks custom nodes and the development and use of wrappers:
  • submitJob(String codeFilePath, List args): submits a node that corresponds to a wrapper.
    Input parameters:
    • codeFilePath: specifies the absolute path that is used to store the developed code.
    • List args: specifies the scheduling parameters that you want to configure. The value is in the {"key"="value","key2"="value2"...} format.
    Returned results:
    • 0: indicates that the task succeeded.
    • 2: indicates that the scheduling system needs to rerun the task.
    • 1, 4, or 3: indicates that the task is terminated.
    • Other values: indicates that the task failed.
  • killJob(): listens to the task termination indication and triggers an operation. This method does not have input parameters or provide returned results.
    Note If the task is not processed at the business layer within 9 seconds, the wrapper is forcibly terminated and the task fails.

Download a dependency JAR package

Click alisa-wrapper-face-1.0.0.jar to download the dependency JAR package.

Package code

  1. Create a wrapper code project.
    Use an integrated development environment (IDE) to create a Maven project. The following description provides the structure requirements for files and the configuration requirements for the pom.xml file:
    • Structure requirements for files
      The following figure shows the structure requirements for files. alisa-wrapper-face-1.0.0.jar in the lib folder is the downloaded dependency JAR package. This package is introduced to the project in the same way as an on-premises package. Code engineering file structure
    • Configuration requirements for the pom.xml file
      The following code provides an example for the configuration of the pom.xml file. The class name of the wrapper (indicated by <mainclass>) is the classpath of the submitJob method, such as com.alibaba.dw.wrapper.
      <groupId>org.example</groupId>
      <artifactId>dw-plugin</artifactId>
      <version>1.0-SNAPSHOT</version>
      
      <dependencies>
          <dependency>
              <groupId>com.alibaba.dw</groupId>
              <artifactId>alisa-wrapper-face</artifactId>
              <version>1.0.0-SNAPSHOT</version>
              <scope>system</scope>
              <systemPath>${basedir}/lib/alisa-wrapper-face-1.0.0.jar</systemPath>
          </dependency>
          <dependency>
              <groupId>commons-lang</groupId>
              <artifactId>commons-lang</artifactId>
              <version>2.4</version>
          </dependency>
          <dependency>
              <groupId>commons-io</groupId>
              <artifactId>commons-io</artifactId>
              <version>2.6</version>
          </dependency>
          <dependency>
              <groupId>org.apache.commons</groupId>
              <artifactId>commons-lang3</artifactId>
              <version>3.8.1</version>
          </dependency>
      </dependencies>
      
      <build>
          <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-assembly-plugin</artifactId>
                  <version>2.5.5</version>
                  <configuration>
                      <archive>
                          <manifest>
                              <mainClass>com.alibaba.dw.wrapper.DemoWrapper</mainClass>
                          </manifest>
                      </archive>
                      <descriptorRefs>
                          <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                  </configuration>
                  <executions>
                      <execution>
                          <id>make-assembly</id>
                          <phase>package</phase>
                          <goals>
                              <goal>assembly</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
  2. Develop code.
    Example: Basic logic
    package com.alibaba.dw.alisa.wrapper;
    import java.io.File;
    import java.util.List;
    import com.alibaba.dw.alisawrapper.DwalisaWrapper;
    import com.alibaba.dw.alisawrapper.constants.Constant;
    /**
    **DwalisaWrapper is in the alisa-wrapper-face package.
    **/
    public class DemoWrapper extends DwalisaWrapper {
       /**
        * codeFilePath: the full path that is used to store the code. 
        * args: the input parameters. args[0] is used as codeFilePath. If the task has no code, args[0] is the first parameter. 
        * This method implements the main features of the wrapper, and the business logic is implemented within the method. 
        * The return code 0 indicates that the task succeeded. 
        * Return codes 1, 4, and 3 indicate that the task is terminated. 
        * Other return codes indicate that the task failed. 
        */
       @SuppressWarnings("deprecation")
       @Override
       public Integer submitJob(String codeFilePath, List<String> args) {
           try {
               System.err.println("your code->");
               // Implement the business code. The task is finished after this method is run. 
           } catch (Exception e) {
               System.err.println(e);
           }
           System.out.println("task finished...");
           return Constant.SUCCESSED_EXIT_CODE;
       }
       /**
        * After task termination is initiated, the method is called to perform business processing before the method is terminated. 
        * If the method is not terminated within 9 seconds, the system returns kill-9 to terminate the task. 
        *
        */
       @Override
       public void killJob() {
           System.err.println("After the task termination indication is detected, some business operations are performed to terminate the task from the server.");
       }
    }
  3. Configure the system to display the query result sets on pages.
    After the configuration, you can view the data that is queried by using a custom node on the related page in the DataWorks console.
    After the code is run, you must store the obtained result sets, such as queried table records, in a specific directory in accordance with the following rules:
    • Method to obtain the result sets
      String.format("%s/%s.data", System.getenv("TASK_EXEC_PATH"), System.getenv("ALISA_TASK_ID"));
      System.getenv indicates the method that is used to obtain environment variables. For more information, see Appendix: Use environment variables to obtain the information of the related node.
    • Storage format
      • File format: CSV. The fields in the file are separated by commas (,).
      • Row distribution: The first row is field names, and other rows are field values.
      Example:
      "name","age"
      "Alice","12"
  4. After code is developed, package the Maven project.
    Compress the dependencies of the project into a JAR package.
  5. Upload the wrapper package.
    If external dependencies such as Protobuf and Guava are used, you must compress the dependencies and the preceding JAR package into a ZIP package. Then, upload the ZIP package to DataWorks as a wrapper. For more information, see Create a wrapper.

Appendix: Use environment variables to obtain the information of the related node

You can use System.getenv("SKYNET_ONDUTY") to obtain the values of the environment variables.

  • SKYNET_ID: the ID of the node. This variable is available only when the node is scheduled to run.
  • SKYNET_BIZDATE: the data timestamp.
  • SKYNET_ONDUTY:
    • Indicates the operator ID during temporary running, data backfill, or tests.
    • Indicates the ID (employee ID or baseId) of the node owner.
  • SKYNET_TASKID: the node instance.
  • IDSKYNET_SYSTEM_ENV: the environment where the node runs, such as the development and production environment.
  • SKYNET_CYCTIME: the time at which the node instance is scheduled to run.
  • SKYNET_CONNECTION: the connection string of the data source associated with the node. In most cases, the string is in the JSON format.
  • SKYNET_TENANT_ID: the tenant ID.