SLS Indexing Service是E-MapReduce推出的一个Druid插件,用于从日志服务(Log Service,简称SLS)消费数据。

背景介绍

SLS Indexing Service优点如下:
  • 极为便捷的数据采集,可以利用SLS的多种数据采集方式实时将数据导入SLS。
  • 无需额外维护一个Kafka集群,省去了数据流的一个环节。
  • 支持Exactly-Once语义。

    因为SLS Indexing Service消费原理与Kafka Indexing Service类似,所以也支持Kafka Indexing Service一样的Exactly-Once语义。

  • 消费作业高可靠保证,例如,作业失败重试,集群重启等。

准备工作

  • 如果您还没有开通SLS服务,请先开通SLS服务,并配置好相应的Project和Logstore。
  • 准备好以下配置项内容:
    • SLS服务的Endpoint(请使用内网服务入口)。
    • 阿里云账号的AccessKey ID和对应的AccessKey Secret。

使用SLS Indexing Service

  1. 准备数据格式描述文件
    如果您熟悉 Kafka Indexing Service,那么 SLS Indexing Service 会非常简单。具体请参见Kafka Indexing Service的介绍,我们用同样的数据进行索引,那么数据源的数据格式描述文件如下(将其保存为 metrics-sls.json):
    {
        "type": "sls",
        "dataSchema": {
            "dataSource": "metrics-sls",
            "parser": {
                "type": "string",
                "parseSpec": {
                    "timestampSpec": {
                        "column": "time",
                        "format": "auto"
                    },
                    "dimensionsSpec": {
                        "dimensions": ["url", "user"]
                    },
                    "format": "json"
                }
            },
            "granularitySpec": {
                "type": "uniform",
                "segmentGranularity": "hour",
                "queryGranularity": "none"
            },
            "metricsSpec": [{
                    "type": "count",
                    "name": "views"
                },
                {
                    "name": "latencyMs",
                    "type": "doubleSum",
                    "fieldName": "latencyMs"
                }
            ]
        },
        "ioConfig": {
            "project": <your_project>,
            "logstore": <your_logstore>,
            "endpoint": "cn-hangzhou-intranet.log.aliyuncs.com", (以杭州为例,注意使用内网服务入口)
            "accessKeyId": <your_access_key_id>,
            "accessKeySec": <your_access_key_secret>,
            "collectMode": "simple"/"other"
            "taskCount": 1,
            "replicas": 1,
            "taskDuration": "PT1H"
        },
        "tuningConfig": {
            "type": "sls",
            "maxRowsInMemory": "100000"
        }
    }
    对比Kafka Indexing Service一节中的介绍,我们发现两者基本上是一样的。这里简要列一下需要注意的字段:
    • type: sls。
    • dataSchema.parser.parseSpec.format:与ioConfig.consumerProperties.logtail.collection-mode有关,也就是与SLS日志的收集模式有关。如果是极简模式(simple)收集,那么该处原本文件是什么格式,就填什么格式。如果是非极简模式(other)收集,那么此处取值为json。
    • ioConfig.project:您要收集的日志的project。
    • ioConfig.logstore: 您要收集的日志的logstore。
    • ioConfig.consumerProperties.endpoint: SLS内网服务地址,例如杭州对应 cn-hangzhou-intranet.log.aliyuncs.com
    • ioConfig.consumerProperties.access-key-id:阿里云账号的AccessKey ID。
    • ioConfig.consumerProperties.access-key-secret: 阿里云账号的AccessKeySecret。
    • ioConfig.consumerProperties.logtail.collection-mode: SLS日志收集模式,极简模式填simple,其他情况填 other。
    注意 上述配置文件中的ioConfig 配置格式仅适用于EMR-3.20.0及之前版本。自EMR-3.21.0开始,ioConfig配置变更如下:
    "ioConfig": {
            "project": <your_project>,
            "logstore": <your_logstore>,
            "endpoint": "cn-hangzhou-intranet.log.aliyuncs.com", (以杭州为例,注意使用内网服务入口)
            "accessKeyId": <your_access_key_id>,
            "accessKeySec": <your_access_key_secret>,
            "collectMode": "simple"/"other"
            "taskCount": 1,
            "replicas": 1,
            "taskDuration": "PT1H"
        },

    即,取消了 consumerProperties 层级、access-key-idaccess-key-secretlogtail.collection-mode 变更为 accessKeyIdaccessKeySeccollectMode

  2. 执行下述命令添加SLS supervisor。
    curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-sls.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor
    注意 其中--negotiate、-u、-b、-c等选项是针对安全Druid集群。
  3. 向SLS中导入数据。

    您可以采用多种方式向SLS中导入数据。具体请参见 SLS 文档。

  4. 在Druid端进行相关查询。