本文为您介绍如何创建实时计算Flink版自定义结果表,自定义结果表可以满足您各种差异化的输出需求。
注意
- 本文仅适用于Blink 1.4.5及以上版本。
- 本文仅适用于独享模式。
搭建环境
您可以通过以下两种方式搭建自定义结果表的开发环境:- 直接使用示例中的环境。
为了便于您快速开发业务,实时计算Flink版提供如下自定义结果表示例:说明 示例中已为您配置对应版本的开发环境,您无需搭建环境。
- 下载JAR包自行搭建环境
说明 Maven工程中引用以下依赖包时,Scope设置为
<scope>provided</scope>
。- 实时计算Flink版3.0版本
- JAR包下载
请在POM文件中添加如下信息,完成
flink-table_2.11
JAR包的自动下载。<profiles> <profile> <id>allow-snapshots</id> <activation><activeByDefault>true</activeByDefault></activation> <repositories> <repository> <id>snapshots-repo</id> <url>https://oss.sonatype.org/content/repositories/snapshots</url> <releases><enabled>false</enabled></releases> <snapshots><enabled>true</enabled></snapshots> </repository> </repositories> </profile> </profiles>
- 依赖包
<dependencies> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>blink-connector-common</artifactId> <version>blink-3.2.1-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>blink-connector-custom</artifactId> <version>blink-3.2.1-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>flink-table_2.11</artifactId> <version>blink-3.2.1-SNAPSHOT</version> <scope>provided</scope> </dependency> </dependencies>
- JAR包下载
- 实时计算Flink版2.0版本
- JAR包下载
- 依赖包
<dependencies> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>blink-table</artifactId> <version>blink-2.2.4-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>blink-2.2.4-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>blink-2.2.4-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>blink-connector-common</artifactId> <version>blink-2.2.4-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>blink-connector-custom</artifactId> <version>blink-2.2.4-SNAPSHOT</version> <scope>provided</scope> </dependency> </dependencies>
- 实时计算Flink版1.0版本
- JAR包下载
- 依赖包
<dependencies> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>blink-connector-common</artifactId> <version>blink-1.4-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>blink-connector-custom</artifactId> <version>blink-1.4-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>blink-1.4-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>blink-1.4-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>blink-table</artifactId> <version>blink-1.4-SNAPSHOT</version> <scope>provided</scope> </dependency> </dependencies>
- 实时计算Flink版3.0版本
接口说明
自定义结果表Class需要继承自定义Sink插件的基类CustomSinkBase,并使用如下方法实现。
protected Map<String,String> userParamsMap;// userParamsMap是自定义SQL的WITH语句中定义的键值对,所有的键均为小写。
protected Set<String> primaryKeys;// primaryKeys是自定义的主键字段名。
protected List<String> headerFields;// headerFields是标记为header的字段列表。
protected RowTypeInfo rowTypeInfo;//字段类型和名称。
/**
* 初始化方法。每次初始建立和Failover的时候会调用一次。
*
* @param taskNumber taskNumber为当前节点的编号。
* @param numTasks numTasks为Sink节点的总数。
* @throws IOException
*/
public abstract void open(int taskNumber,int numTasks) throws IOException;
/**
* close方法,释放资源。
*
* @throws IOException
*/
public abstract void close() throws IOException;
/**
* 处理插入单行数据。
*
* @param row
* @throws IOException
*/
public abstract void writeAddRecord(Row row) throws IOException;
/**
* 处理删除单行数据。
*
* @param row
* @throws IOException
*/
public abstract void writeDeleteRecord(Row row) throws IOException;
/**
* 如果进行批量插入,该方法需要把线程中缓存的数据全部刷入下游存储;如果不进行批量插入,可以不使用该方法。
*
* @throws IOException
*/
public abstract void sync() throws IOException;
/**
* 返回类名。
*/
public String getName();
自定义Redis结果表示例
下载实时计算Flink版3.0版本示例,进入blink_customersink_3x目录,执行
mvn clean package
命令,再在实时计算Flink版开发控制台上传刚编译成功后的JAR包blink_customersink_3x/target/blink-customersink-3.x-1.0-SNAPSHOT-jar-with-dependencies.jar,引用资源之后,对于自定义的Sink插件,需要指明type = 'custom'
,并且指明实现接口的Class。
注意 本示例仅作为自定义结果表开发参考,不适合直接作为生产使用。
create table in_table(
kv varchar
)with(
type = 'random'
);
create table out_table(
`key` varchar,
`value` varchar
)with(
type = 'custom',
class = 'com.alibaba.blink.customersink.RedisSink',
-- 1. 可以定义更多自定义参数, 在open函数中通过userParamsMap获取。
-- 2. with参数里key大小写不敏感。在实时计算Flink版中,参数key的值直接处理为全小写。建议您在引用数据存储的DDL中使用小写声明key。
host = 'r-uf****.redis.rds.aliyuncs.com',
port = '6379',
db = '0',
batchsize = '10',
password = '<yourHostPassword>'
);
insert into out_table
select
substring(kv,0,4) as `key`,
substring(kv,0,6) as `value`
from in_table;
Redis Sink插件的参数说明如下。
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
host | Redis实例内网连接地址(host) | 是 | 无 |
port | Redis实例端口号 | 是 | 无 |
password | Redis连接密码 | 是 | 无 |
db | Redis Database编号 | 否 | 默认值为0,表示db0。 |
batchsize | 每次批量写入的条数 | 否 | 默认值为1,表示不批量写入。 |