本文为您介绍如何创建实时计算Flink版Elasticsearch(ES)维表,以及创建维表时使用的WITH参数和CACHE参数。

注意 本文仅适用于Blink 3.2.2及以上版本。

DDL定义

实时计算Flink版支持使用ES作为维表,示例代码如下。
 CREATE TABLE es_stream_sink(
  field1 LONG, 
  field2 VARBINARY, 
  field3 VARCHAR,
  PRIMARY KEY(field1),
  PERIOD FOR SYSTEM_TIME
) WITH (
  type ='elasticsearch',
  endPoint = '<yourEndPoint>',
  accessId = '<yourUsername>',
  accessKey = '<yourPassword>',
  index = '<yourIndex>',
  typeName = '<yourTypeName>'
);
说明 ES维表支持根据ES的PRIMARY KEY进行PRIMARY KEYUPDATE,且PRIMARY KEY只能为1个字段。

WITH参数

参数 说明 默认值 是否必选
type 维表类型 elasticsearch
endPoint Server地址,例如:http://127.0.0.1:9211
accessId 创建ES时的登录名。
说明 如果您通过Kibana插件操作ES,请填写Kibana登录ID。
accessKey 创建ES时的登录密码 。
说明 如果您通过Kibana插件操作ES,请填写Kibana登录密码。
index 索引名称,类似于数据库Database的名称。
typeName Type名称,类似于数据库的Table名称。
maxRetryTimes 异常重试次数 30
timeout 读取超时时长,单位为毫秒。 600000
discovery 是否开启节点发现。如果开启,客户端每5分钟刷新一次Server List。 false
compression 是否使用GZIP压缩Request Bodies。 true
multiThread 是否开启JestClient多线程。 true

CACHE参数

参数 说明 备注
cache 缓存策略
  • None(默认值):无缓存。
  • LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
cacheSize 缓存大小 选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs 缓存更新时间间隔 默认缓存不超时,单位为毫秒。不同缓存策略下的功能如下:
  • LRU:设置缓存失效的超时时长。
  • ALL:设置缓存加载的间隔时长,默认不重新加载。