本文为您介绍日志服务SLS源表的DDL定义、WITH参数、类型映射、属性字段和代码示例。

什么是日志服务

日志服务SLS是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。

前提条件

已创建日志服务Project和Logstore,详情请参见步骤二:创建Project和Logstore

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持日志服务SLS Connector。

DDL定义

create table sls_source(
  a int,
  b int,
  c varchar
) with (
  'connector' = 'sls',  
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessKey>',
  'startTime' = '<yourStartTime>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'consumerGroup' = '<yourConsumerGroupName>'
);
说明
  • SLS暂不支持MAP类型的数据。
  • SLS对于不存在字段会置为Null。
  • DDL定义中字段顺序可以为无序,建议和物理表中字段顺序保持一致。

WITH参数

参数 说明 是否必选 备注
connector 源表类型。 固定值为sls
endPoint 消费端点信息。 服务入口
accessId AccessKey ID。 无。
accessKey AccessKey Secret。 无。
project SLS项目名称。 无。
logStore LogStore名称。 无。
startTime 消费日志的开始时间。 格式为yyyy-MM-dd hh:mm:ss。默认从当前时间开始消费。
stopTime 消费日志的结束时间。 格式为yyyy-MM-dd hh:mm:ss
consumerGroup 消费组名称。 您可以自定义消费组名(没有固定格式)。
batchGetSize 单次读取logGroup的条数。 默认值为100。
说明
  • batchGetSize设置不能超过1000,否则会报错。
  • batchGetSize设置单次读取logGroup的条数。如果单条logItem的大小和batchGetSize都很大,则可能会导致频繁的垃圾回收(Garbage Collection),您可以适当减小batchGetSize参数值。
exitAfterFinish 在数据消费完成后,Flink程序是否退出。 参数取值如下:
  • true:数据消费完后,Flink程序退出。
  • false(默认值):数据消费完后,Flink程序不退出。
说明 仅实时计算引擎VVR 4.0.13版本支持该参数。

类型映射

日志服务和Flink字段类型对应关系如下。建议您使用该对应关系进行DDL声明。
日志服务字段类型 Flink字段类型
STRING VARCHAR

属性字段

字段名 字段类型 说明
__source__ STRING METADATA VIRTUAL 消息源。
__topic__ STRING METADATA VIRTUAL 消息主题。
__timestamp__ BIGINT METADATA VIRTUAL 日志时间。
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL 消息TAG。对于属性"__tag__:__receive_time__":"1616742274"'__receive_time__''1616742274'会被作为KV对,记录在Map中,在SQL中通过__tag__['__receive_time__']的方式访问。
说明 仅在VVR 3.0.1版本及以后版本支持获取以上SLS属性字段。

代码示例

CREATE TEMPORARY TABLE sls_input(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessid' ='xx',
  'accesskey' ='xxx',
  'starttime' = '2001-08-01 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE print_sink(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING ,
  `__source__` STRING ,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'print',
  'logger'='true'
);

INSERT INTO print_sink
SELECT 
 `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;