本文为您介绍如何使用IntelliJ IDEA开发实时计算Flink版自定义函数,包括搭建开发环境和实时计算Flink版作业中引用自定义函数。
配置Maven
- 下载Maven。
- 登录Maven官网下载页面。
- 下载apache-maven-3.5.3-bin.tar.gz。
- 解压下载的安装包到指定目录,例如:/Users/<userName>/Documents/maven。
- 配置环境变量。
- 在Terminal中,执行
vim ~/.bash_profile
命令。
- 在.bash_profile文件中添加如下命令。
export M2_HOME=/Users/<userName>/Documents/maven/apache-maven-3.5.3
export PATH=$PATH:$M2_HOME/bin
- 保存并关闭.bash_profile文件。
- 执行
source ~/.bash_profile
命令使配置生效。
- 执行
mvn -v
命令查看配置是否生效。如果打印如下信息,则配置生效。
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-04T03:39:06+08:00)
Maven home: /Users/<userName>/Documents/maven/apache-maven-3.5.0
Java version: 1.8.0_121, vendor: Oracle Corporation
Java home: /Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.12.6", arch: "x86_64", family: "mac"
搭建开发环境
- 下载UDX示例。
- 在Linux环境下解压下载文件。
tar xzvf RealtimeCompute-udxDemo.gz
- 打开IntelliJ IDEA,单击Open打开下载的UDX示例。

实时计算Flink版作业引用JAR包
- 创建Package。
- 右键单击。

- 在New Package中输入Package名称。本文以
com.hjc.test.blink.sql.udx
为例。
- 单击OK。
- 创建Class。
- 右键单击。
- 在Create New Class中,输入Class名称。
- 单击OK。
- 在Class中输入以下代码。
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class StringLengthUdf extends ScalarFunction {
// 可选,open方法可以不编写。
// 如果编写open方法,需要声明'import org.apache.flink.table.functions.FunctionContext;'。
@Override
public void open(FunctionContext context) {
}
public long eval(String a) {
return a == null ? 0 : a.length();
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
//可选,close方法可以不编写。
@Override
public void close() {
}
}
- 在Terminal中,执行
mvn package
或mvn assembly:assembly
,将项目写入JAR包。
说明
- 如果需要将第三方依赖写入JAR包,请使用
mvn assembly:assembly
。
- 编译后的JAR包为RealtimeCompute-udxDemo/target/RTCompute-udx-1.0-SNAPSHOT.jar或RealtimeCompute-udxDemo/target/RTCompute-udx-1.0-SNAPSHOT-jar-with-dependencies.jar(将第三方依赖写入JAR包)。
- 实时计算Flink版作业引用JAR包。
- 登录实时计算控制台。
- 在顶部菜单中,单击开发。
- 在左侧的导航栏中,单击资源引用。
- 在资源引用页签的右上角,单击新建资源。
参数名称 |
说明 |
上传方式 |
实时计算控制台上仅支持本地上传。
说明 本地上传JAR包的大小上限为300 MB。如果JAR包大小超过300 MB,请在集群绑定的OSS控制台上,或通过OpenAPI的方式上传JAR包。
|
资源选择 |
单击选择资源,选择需要引用的资源。
|
资源名称 |
输入资源名称。 |
资源备注 |
输入资源备注信息。 |
资源类型 |
选择引用资源类型,JAR、DICTIONARY或PYTHON。 |
- 在资源引用页签中,鼠标悬停在对应作业的右侧的更多上。
- 在下拉列表中,选择引用。
- 在作业的编辑窗口顶部,输入自定义函数声明,示例如下。
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';