本文为您介绍开源Flink 1.11如何实时写入数据至Hologres。
前提条件
背景信息
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-1.13</artifactId>
<version>1.0.1</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
Flink SQL写入数据至Hologres代码示例
String createHologresTable =
String.format(
"create table sink("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s'"
+ ")",
database, tableName, userName, password, endPoint);
tEnv.executeSql(createHologresTable);
createScanTable(tEnv);
tEnv.executeSql("insert into sink select * from source");
更多详尽的代码示例请参见hologres-connector-flink-examples,包括如下示例。- FlinkSQLToHoloExample:一个使用纯Flink SQL接口实现的应用,将数据写入至Hologres。
- FlinkDSAndSQLToHoloExample:一个混合Flink DataStream以及SQL 接口实现的应用,写入Hologres前,将DataStream转换成Table,之后再用Flink SQL写入Hologres。
- FlinkDataStreamToHoloExample:一个使用纯Flink DataStream接口实现的应用,将数据写入至Hologres。
- FlinkRoaringBitmapAggJob:一个使用Flink及RoaringBitmap,结合Hologres维表,实现实时去重统计UV的应用,并将统计结果写入Hologres。
Hologres Flink Connector参数说明
参数 | 是否必填 | 说明 |
---|---|---|
connector | 是 | 结果表类型,固定值为hologres。 |
dbname | 是 | Hologres的数据库名称。 |
tablename | 是 | Hologres接收数据的表名称。 |
username | 是 | 当前阿里云账号的AccessKey ID。 您可以单击AccessKey 管理,获取AccessKey ID。 |
password | 是 | 当前阿里云账号的AccessKey Secret。 您可以单击AccessKey 管理,获取AccessKey Secret。 |
endpoint | 是 | Hologres的VPC网络地址。进入Hologres管理控制台的实例详情页,从实例配置获取Endpoint。 说明 endpoint需包含端口号,格式为 ip:port 同一个区域使用VPC网络地址,跨区域请使用公共网络。 |
- 连接参数
参数 是否必填 说明 connectionSize 否 单个Flink Hologres Task所创建的JDBC连接池大小。 默认值:3,和吞吐成正比。
connectionPoolName 否 连接池名称,同一个TaskManager中,表配置同名的连接池名称可以共享连接池。 无默认值,每个表默认使用自己的连接池。如果设置连接池名称,则所有表的connectionSize需要相同
fixedConnectionMode 否 写入和点查不占用连接数(beta功能,需要connector版本>=1.2.0,hologres引擎版本>=1.3) 默认值:false
jdbcRetryCount 否 当连接故障时,写入和查询的重试次数。 默认值:10。
jdbcRetrySleepInitMs 否 每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs。 默认值:1000ms。
jdbcRetrySleepStepMs 否 每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs。 默认值:5000ms。
jdbcConnectionMaxIdleMs 否 写入线程和点查线程数据库连接的最大Idle时间,超过连接将被释放。 默认值:60000ms。
jdbcMetaCacheTTL 否 TableSchema信息的本地缓存时间。 默认值:60000ms。
jdbcMetaAutoRefreshFactor 否 当TableSchema cache剩余存活时间短于 metaCacheTTL/metaAutoRefreshFactor
将自动刷新cache。默认值:-1,表示不自动刷新。
- 写入参数
参数 是否必填 说明 mutatetype 否 数据写入模式,详情请参见流式语义。 默认值:insertorignore。
ignoredelete 否 是否忽略撤回消息。通常Flink的Group By会产生回撤消息,回撤消息到Hologres connector会产生Delete请求。 默认值:true,仅在使用流式语义时生效。
createparttable 否 当写入分区表时,是否自动根据分区值自动创建分区表。Flink 全托管2.1.x及以上版本支持自动创建分区表。 - false(默认值):不会自动创建。
- true:自动创建。
ignoreNullWhenUpdate 否 当 mutatetype='insertOrUpdate'
时,是否忽略更新写入数据中的Null值。默认值:false。
jdbcWriteBatchSize 否 Hologres Sink节点数据攒批的最大批大小。 默认值:256
jdbcWriteBatchByteSize 否 Hologres Sink节点单个线程数据攒批的最大字节大小。 默认值:2097152(2 * 1024 * 1024),2MB。
jdbcWriteBatchTotalByteSize 否 Hologres Sink节点所有数据攒批的最大字节大小。 默认值:20971520(20 * 1024 * 1024),20MB。
jdbcWriteFlushInterval 否 Hologres Sink节点数据攒批的最长Flush等待时间。 默认值:10000,即10秒。
jdbcEnableDefaultForNotNullColumn 否 设置为true时,not null且未在表上设置default的字段传入null时,将以默认值写入。String类型默认为"",Number类型默认为0,Date、timestamp、timestamptz 类型默认为1970-01-01 00:00:00。 默认值:true。
jdbcCopyWriteMode 否 是否使用fixed copy方式写入。fixed copy是hologres1.3新增的能力,相比insert方法,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批),但不支持回撤。 默认值:false。
jdbcCopyWriteFormat 否 底层是否走二进制协议。 - binary(默认值):表示使用二进制模式,二进制会更快。
- text:为文本模式。
jdbcCopyWriteDirectConnect 否 copy模式是否直连。copy的瓶颈往往是VIP endpoint的网络吞吐,因此copy写入模式下会测试当前环境能否直连holo fe,支持的话默认使用直连。此参数设置为false则不进行直连。 默认值:true。
- 点查参数
参数 是否必填 说明 jdbcReadBatchSize 否 维表点查最大批次大小。 默认值:128。
jdbcReadBatchQueueSize 否 维表点查请求缓冲队列大小。 默认值:256。
async 否 是否采用异步方式同步数据。 默认值:false。异步模式可以并发地处理多个请求和响应,从而连续的请求之间不需要阻塞等待,提高查询的吞吐。但在异步模式下,无法保证请求的绝对顺序。
cache 否 缓存策略。 默认值:None。Hologres仅支持以下两种缓存策略:None:无缓存。LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。
cachesize 否 缓存大小。 默认值:10000。选择LRU缓存策略后,可以设置缓存大小。
cachettlms 否 更新缓存的时间间隔,单位为毫秒。 当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。
cacheempty 否 是否缓存join结果为空的数据。 默认值:true,表示缓存join结果为空的数据。false:表示不缓存join结果为空的数据。
- 数据类型映射
当前Flink全托管与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射。