DataWorks自定义节点中运行任务时,需要调用自定义插件,因此在使用自定义节点前您需要创建好自定义插件包,并上传发布至DataWorks,便于使用自定义节点运行任务时使用。本文为您介绍如何创建自定义插件包。
背景信息
DataWorks的自定义节点运行时,插件开发使用过程中涉及以下2个接口。
submitJob(String codeFilePath, List args)
:提交插件任务的接口。
包含以下入参。
- codeFilePath:为页面开发的代码存储绝对路径。
- List args:为页面配置的调度参数,格式为
{"key"="value","key2"="value2"...}
。
返回参数如下。
- 0:表示任务运行成功。
- 2:表示告知调度系统重新运行任务。
- 1、4、或3:表示任务被终止。
- 其他数值:表示任务运行失败。
killJob()
:此接口主要用于监听任务终止(kill)信号,然后会触发该接口调用。此接口没有入参和返回参数。
说明 若业务层面在9秒内未处理完毕,整个插件进程会被强制终止,整个任务运行失败。
打包代码工程包
- 新建插件代码工程。
用IDE工具新建一个Maven工程,文件结构和pom.xml文件配置要求如下所示。
- 文件结构要求
文件结构要求参考下图,其中
lib文件夹下
alisa-wrapper-face-1.0.0.jar为上述下载的依赖JAR包,需要用本地依赖引入的方式引入代码工程中。

- pom.xml文件配置要求
pom.xml文件示例如下,其中插件的类名(
<mainclass>
)为submitJob方法所在的类路径,如
com.alibaba.dw.wrapper
。
<groupId>org.example</groupId>
<artifactId>dw-plugin</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba.dw</groupId>
<artifactId>alisa-wrapper-face</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>system</scope>
<systemPath>${basedir}/lib/alisa-wrapper-face-1.0.0.jar</systemPath>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<mainClass>com.alibaba.dw.wrapper.DemoWrapper</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 代码开发。
代码示例一:基本逻辑
package com.alibaba.dw.alisa.wrapper;
import java.io.File;
import java.util.List;
import com.alibaba.dw.alisawrapper.DwalisaWrapper;
import com.alibaba.dw.alisawrapper.constants.Constant;
/**
**DwalisaWrapper 在alisa-wrapper-face包
**/
public class DemoWrapper extends DwalisaWrapper {
/**
* codeFilePath: 代码存储的全路径。
* args:传入的参数。注意其中args[0]是作为codeFilePath,如果任务无代码,则args[0]即为第一个参数。
* 该方法实现该插件的主要功能内容,业务逻辑都在次方法内部实现。
* 返回码0:表示任务成功。
* 返回码1、4、3:表示任务被终止。
* 返回码为其他数值:表示任务失败。
*/
@SuppressWarnings("deprecation")
@Override
public Integer submitJob(String codeFilePath, List<String> args) {
try {
System.err.println("your code->");
//此处实现业务代码,此方法执行完毕后任务执行结束。
} catch (Exception e) {
System.err.println(e);
}
System.out.println("task finished...");
return Constant.SUCCESSED_EXIT_CODE;
}
/**
* 为终止任务方法,一旦发起终止任务时,会调用方法作为一些业务上的处理之后再退出。
* 注意:一旦9秒未退出,则会直接返回kill-9,终止该任务进程。
*
*/
@Override
public void killJob() {
System.err.println("收到了终止信号,需要做一些业务操作把任务从服务端终止");
}
}
代码示例二:基于数据源开发的自定义节点
MysqlWrapper.java
package com.alibaba.dw.alisa.wrapper;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.List;
import org.apache.commons.io.FileUtils;
import com.alibaba.dw.alisa.wrapper.util.ConnectionManager;
import com.alibaba.dw.alisa.wrapper.util.SqlUtils;
import com.alibaba.dw.alisawrapper.DwalisaWrapper;
import com.alibaba.dw.alisawrapper.constants.Constant;
import com.alibaba.dw.alisawrapper.utils.TaskDirUtils;
import com.csvreader.CsvWriter;
public class MysqlWrapper extends DwalisaWrapper {
private Connection conn = null;
private Statement stmt = null;
private static final int MAX_ROWS = 10000;
/**
* codeFilePath: 代码存储的全路径 args:传入的参数;注意其中args[0] 是作为codeFilePath,如果任务无代码,则args[0]即为第一个参数
* 该方法实现该插件的主要功能内容,业务逻辑都在次方法内部实现; 返回码0:表示任务成功; 返回码143:表示任务被kill; 返回码1:表示任务失败 获取odps的信息:环境变量获取: id:ODPS_ACCESSID
* key:ODPS_ACCESSKEY endpoint:ODPS_ENDPOINT
*/
@Override
public Integer submitJob(String codeFilePath, List<String> args) {
try {
System.out.println("code-content: ");
File file = new File(codeFilePath);
String sqlContent = FileUtils.readFileToString(file);
String execContent = getParaValueContent(sqlContent, args);
System.out.println(execContent);
executeJdbcSql(execContent);
} catch (Exception e) {
System.err.println(e);
return Constant.FAILED_EXIT_CODE;
}
return Constant.SUCCESSED_EXIT_CODE;
}
private String getParaValueContent(String sqlContent, List<String> args) throws Exception {
String content = sqlContent;
if (args == null || args.size() <= 0) {
return content;
}
// 替换${...}参数
for (String keyValue : args) {
System.out.println(args);
if (keyValue.contains("=")) {
String[] param = keyValue.split("=");
String target = "${" + param[0] + "}";
String replacement = param[1];
content = content.replace(target, replacement);
} else {
System.err.println("param format is invalid,key=value");
throw new Exception("param exception!");
}
}
return content;
}
/**
* 为kill任务方法,一旦发起kill任务时候,会调用方法作为一些业务上的处理之后再退出; 注意:一旦9秒未退出,则会直接kill-9该任务进程
*/
@Override
public void killJob() {
System.out.println("Accept kill signal...");
try {
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
System.out.println("Kill succeed");
} catch (Exception e) {
System.err.println("kill job error! " + e.getMessage());
}
}
private void executeJdbcSql(String sqlContent) throws Exception {
List<String> sqlList = SqlUtils.splitSql(sqlContent);
try {
/**
* (1)认证,获取连接
*/
System.out.println("Connecting to Server...");
String jdbcConn = System.getenv("SKYNET_CONNECTION");
// 获取连接串(json字符串)进行解析生成connection
conn = ConnectionManager.getJDBCConnection(jdbcConn);
System.out.println("Connected to Server!");
stmt = conn.createStatement();
stmt.setMaxRows(MAX_ROWS);
for (String sql : sqlList) {
System.out.println("start run... sql: " + sql);
/**
* (2) 执行sql
*/
boolean hasResult = stmt.execute(sql);
if (hasResult) {
/**
* (3) 写结果文件
*/
storeResult(stmt.getResultSet());
}
}
} catch (Exception e) {
e.printStackTrace();
System.err.println("sql execute failed! " + e.getMessage());
throw e;
} finally {
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
System.out.println("release sql connection...");
System.out.println("Job Finished!");
}
}
protected void storeResult(ResultSet rs) throws Exception {
ResultSetMetaData rsmd = rs.getMetaData();
int columnsNumber = rsmd.getColumnCount();
// 获取结果文件路径
String resultFilePath = TaskDirUtils.getDataFile();
FileUtils.writeStringToFile(new File(resultFilePath), "");
CsvWriter csvWriter = new CsvWriter(resultFilePath, ',', StandardCharsets.UTF_8);
csvWriter.setTextQualifier('"');
csvWriter.setUseTextQualifier(false);
csvWriter.setForceQualifier(true);
String[] headerList = new String[columnsNumber];
for (int i = 0; i < columnsNumber; i++) {
headerList[i] = rsmd.getColumnName(i + 1);
}
csvWriter.writeRecord(headerList);
while (rs.next()) {
String[] lineEles = new String[columnsNumber];
for (int i = 0; i < columnsNumber; i++) {
lineEles[i] = rs.getString(i + 1);
}
csvWriter.writeRecord(lineEles, true);
}
csvWriter.flush();
csvWriter.close();
System.out.println("store result finished.");
}
}
ConnectionManager.java
public class ConnectionManager {
public static Connection getJDBCConnection(String connectionJson) throws Exception {
Connection connection = null;
try {
String jdbcUrl = null;
String userName = null;
String password = null;
if(StringUtils.isNotBlank(connectionJson)){
JSONObject connectionObj = JSON.parseObject(connectionJson);
jdbcUrl = connectionObj.getString("jdbcUrl");
userName = connectionObj.getString("username");
password = connectionObj.getString("password");
}else{
throw new Exception("member in connection is null!");
}
String driverClassName = getDriverClassName(jdbcUrl);
System.out.println("load driver class: " + driverClassName);
Class.forName(driverClassName);
DriverManager.setLoginTimeout(20);
connection = DriverManager.getConnection(jdbcUrl, userName, password);
} catch (ClassNotFoundException e) {
System.err.println("acquire connection failed! " + e);
System.exit(1);
}
return connection;
}
private static String getDriverClassName(String jdbcUrl) throws Exception{
if(StringUtils.contains(jdbcUrl, "jdbc:mysql")){
return "com.mysql.cj.jdbc.Driver";
}else{
throw new Exception("unsupported jdbc driver");
}
}
- 设置数据查询结果集被页面展示。
如果您希望后续使用自定义节点时,进行数据查询等操作时,结果能在DataWorks的页面中直接展示,您需参考本步骤设置数据查询结果集被页面展示。
从submitJob接口业务运行代码后,需将获取到的结果集(比如查询表记录)按照以下规则存储到指定目录中:
- 代码开发完成后进行Maven构建打包。
将本身依赖的包打成JAR包。
- 后续步骤:上传插件包。
如果依赖了protobuf、guava的包,或者其他外部依赖JAR包,需将这些依赖的包和上述步骤打成的JAR包合并为ZIP包后,作为插件上传至DataWorks。上传插件的详细操作可参考
新增节点插件。
附录:通过环境变量获取对应的节点信息
获取环境变量值的方式:System.getenv("SKYNET_ONDUTY")
。
- SKYNET_ID:节点ID,只有在调度运行才生效,数据开发直接运行是无该参数的。
- SKYNET_BIZDATE:业务日期。
- SKYNET_ONDUTY:
- 临时运行、补数据、测试时:表示操作者ID。
- 日常调度:节点责任人ID(工号/baseid)。
- SKYNET_TASKID:节点实例。
- IDSKYNET_SYSTEM_ENV:对应的项目环境,包括:dev 、prod。
- SKYNET_CYCTIME:节点实例的定时时间。
- SKYNET_CONNECTION:连接串(一般是json)。
- SKYNET_TENANT_ID:租户ID。