在使用Logstash传输数据时,在某些业务使用场景中,您可能需要切分源端数据并提取到字段中再写入目标端Elasticsearch集群。例如,源端Logs日志中存在以竖线(|)分隔的数据,此时您可以通过Logstash按照|切分数据并提取到字段中,再输出到目标端Elasticsearch集群。本文介绍如何通过Logstash切分数据并提取到字段中。

背景信息

logstash-filter-mutate插件(过滤器插件)支持对事件中的字段进行切分、重命名、删除、替换和修改等操作,详细信息请参见Mutate filter plugin。所有的过滤器插件都支持以下常见的可选配置项,详细信息请参见Common Option
配置项输入类型
add_fieldhash
add_tagarray
enable_metricboolean
idstring
periodic_flushboolean
remove_fieldarray
remove_tagarray

前提条件

您已完成以下操作:
  • 创建阿里云Elasticsearch实例。

    具体操作,请参见创建阿里云Elasticsearch实例,本文以7.10版本实例为例。

  • 开启目标Elasticsearch实例的自动创建索引功能。
    具体操作请参见配置YML参数
    说明 自动创建的索引可能不符合您的预期,不建议开启,本文仅供测试。在实际业务中,建议您先在目标Elasticsearch实例中创建索引,再通过Logstash传输数据。创建索引的具体操作请参见快速入门
  • 创建阿里云Logstash实例,需要与Elasticsearch实例在同一专有网络下。

    具体操作,请参见创建阿里云Logstash实例

  • 准备测试数据。
    本文以Beats采集的Logs中的某一条数据为例,关于Beats采集数据的详细信息请参见采集ECS服务日志。如下测试数据中的LogMessage|分隔,并存在多个||||特殊符号。本文使用Logstash按照|切分数据,被切分的字段分别写入到对应的字段中:mobile、appName、type、timestamp、status、code、component、cid、serviceId、serviceName、serviceType、param,最后将字段输出到目标端Elasticsearch集群中。
    LogMessage: |1390000****|jop|byORP|2022-04-18T14:18:16.633|/log/cms/send|200|pluginNums=0,pluginStatus=0||||||

操作步骤

  1. 进入阿里云Elasticsearch控制台的Logstash页面
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理
  4. 单击创建管道
  5. 创建管道任务页面,输入管道ID并配置管道。
    本文使用的管道配置如下。
    input {
        beats {
            port => 8001
        }
    }
    filter {
        mutate {
            gsub => ["message","\|","| "]
            split => ["message","|"]
            add_field => {
                "mobile" => "%{[message][1]}"
                "appName" => "%{[message][2]}"
                "type" => "%{[message][3]}"
                "timestamp" => "%{[message][4]}"
                "status" => "%{[message][5]}"
                "code" => "%{[message][6]}"
                "component" => "%{[message][7]}"
                "cid" => "%{[message][8]}"
                "serviceId" => "%{[message][9]}"
                "serviceName" => "%{[message][10]}"
                "serviceType" => "%{[message][11]}"
                "param" => "%{[message][12]}"
            }
        }
        mutate {
            strip => ["mobile","appName","type","timestamp","status","code","component","cid","serviceId","serviceName","serviceType","param"]
        }
    }
    output {
        elasticsearch {
            index => "<yourIndexName>"
            hosts => ["es-cn-7mz2mu1zp0006****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "<yourPassword>"
        }
    }                
    重要
    • input.beat.port为Beats采集日志输入到当前Logstash管道的端口,需要使用8000~9000范围内的端口。
    • 以上管道配置中的gsub => ["message","\|","| "],第二个|后有一个空格。
    • 以上管道配置中的indexhostspassword参数值需要替换为您实际业务的索引名称、Elasticsearch集群的访问地址和集群的elastic账号对应的密码。
    以上管道配置的原理说明如下:
    1. 通过Logstash的filter.mutate.gsub参数,使用正则表达式\|去匹配LogMessage中的|,将|替换为| (即|+空格)。替换后的效果如下。
      LogMessage: | 1390000****| jop| byORP| 2022-04-18T14:18:16.633| /log/cms/send| 200| pluginNums=0,pluginStatus=0| | | | | |
    2. 通过filter.mutate.split参数将LogMessage按照|进行切分。
    3. 通过filter.mutate.add_field参数添加字段,即将切分后的LogMessage一一添加到对应的字段中。添加后的效果如下。
      "mobile":" 1390000****",
      "appName":" jop",
      "type":" byORP",
      "timestamp":" 2022-04-18T14:18:16.633",
      "status":" /log/cms/sen",
      "code":" 200",
      "component":" pluginNums=0,pluginStatus=0",
      "cid":" ",
      "serviceId":" ",
      "serviceName":" ",
      "serviceType":" ",
      "param":" "
    4. 通过filter.mutate.strip参数去除字段空格。由于添加后的每个字段前面都有一个空格,因此需要去除这些空格。

    更多管道配置说明,请参见通过配置文件管理管道Logstash配置文件说明

    警告 配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。
  6. 单击保存或者保存并部署
    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。
    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

验证结果

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。
    登录Kibana控制台的具体操作,请参见登录Kibana控制台
    说明 本文以阿里云Elasticsearch 7.10.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
  2. 单击右上角的Dev tools
  3. Console中,执行以下脚本,查询目标索引中的信息。
    GET <yourIndexName>/_search
    {
      "query": {
        "match_all": {}
      }
    }
    说明 <yourIndexName>需要与管道配置中的index参数值保持一致。

    预期结果如下。

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "<yourIndexName>",
            "_type" : "_doc",
            "_id" : "Lb1UWoAB-6Zo6en4luDi",
            "_score" : 1.0,
            "_source" : {
              "mobile" : "1390000****",
              "appName" : "jop",
              "type" : "byORP",
              "timestamp" : "2022-04-18T14:18:16.633",
              "status" : "/log/cms/sen",
              "code" : "200",
              "component" : "pluginNums=0,pluginStatus=0",
              "cid" : "",
              "serviceId" : "",
              "serviceName" : "",
              "serviceType" : "",
              "param" : ""
            }
          }
        ]
      }
    }