本文为您介绍日志服务SLS源表的DDL定义、WITH参数、类型映射、属性字段和代码示例。
什么是日志服务
日志服务SLS是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。
前提条件
已创建日志服务Project和Logstore,详情请参见步骤二:创建Project和Logstore。
使用限制
仅Flink计算引擎VVR 2.0.0及以上版本支持日志服务SLS Connector。
DDL定义
create table sls_source(
a int,
b int,
c varchar
) with (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessKey>',
'startTime' = '<yourStartTime>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'consumerGroup' = '<yourConsumerGroupName>'
);
说明
- SLS暂不支持MAP类型的数据。
- SLS对于不存在字段会置为Null。
- DDL定义中字段顺序可以为无序,建议和物理表中字段顺序保持一致。
WITH参数
参数 | 说明 | 是否必选 | 备注 |
---|---|---|---|
connector | 源表类型。 | 是 | 固定值为sls 。
|
endPoint | 消费端点信息。 | 是 | 服务入口。 |
accessId | AccessKey ID。 | 是 | 无。 |
accessKey | AccessKey Secret。 | 是 | 无。 |
project | SLS项目名称。 | 是 | 无。 |
logStore | LogStore名称。 | 是 | 无。 |
startTime | 消费日志的开始时间。 | 否 | 格式为yyyy-MM-dd hh:mm:ss 。默认从当前时间开始消费。
|
stopTime | 消费日志的结束时间。 | 否 | 格式为yyyy-MM-dd hh:mm:ss 。
|
consumerGroup | 消费组名称。 | 否 | 您可以自定义消费组名(没有固定格式)。 |
batchGetSize | 单次读取logGroup的条数。 | 否 | 默认值为100。
说明
|
exitAfterFinish | 在数据消费完成后,Flink程序是否退出。 | 否 | 参数取值如下:
说明 仅实时计算引擎VVR 4.0.13版本支持该参数。
|
类型映射
日志服务和Flink字段类型对应关系如下。建议您使用该对应关系进行DDL声明。
日志服务字段类型 | Flink字段类型 |
---|---|
STRING | VARCHAR |
属性字段
字段名 | 字段类型 | 说明 |
---|---|---|
__source__ |
STRING METADATA VIRTUAL | 消息源。 |
__topic__ |
STRING METADATA VIRTUAL | 消息主题。 |
__timestamp__ |
BIGINT METADATA VIRTUAL | 日志时间。 |
__tag__ |
MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | 消息TAG。对于属性"__tag__:__receive_time__":"1616742274",'__receive_time__'和'1616742274'会被作为KV对,记录在Map中,在SQL中通过__tag__['__receive_time__']的方式访问。 |
说明 仅在VVR 3.0.1版本及以后版本支持获取以上SLS属性字段。
代码示例
CREATE TEMPORARY TABLE sls_input(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessid' ='xx',
'accesskey' ='xxx',
'starttime' = '2001-08-01 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE print_sink(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING ,
`__source__` STRING ,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'print',
'logger'='true'
);
INSERT INTO print_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;