在使用Logstash传输数据时,如果您需要通过合并字段来整合数据,例如将a字段和b字段合并为一个新的c字段,然后移除a字段和b字段,可以通过logstash-filter-mutate插件的多个模块实现。此插件为系统默认安装插件,无须再安装,且不支持卸载。本文介绍如何通过logstash-filter-mutate插件实现多字段合并。

背景信息

logstash-filter-mutate插件支持对事件中的字段进行重命名、删除、替换和修改操作。配置文件中的mutate按照下表中的顺序执行,详细信息请参见Mutate filter plugin
模块 输入类型
coerce hash
rename hash
update hash
replace hash
convert hash
gsub array
uppercase array
capitalize array
lowercase array
strip array
remove_field array
split hash
join hash
merge hash
copy hash

前提条件

  • 创建阿里云Elasticsearch实例。

    具体操作,请参见创建阿里云Elasticsearch实例,本文以7.10版本实例为例。

  • 开启目标Elasticsearch实例的自动创建索引功能。
    具体操作请参见配置YML参数
    说明 自动创建的索引可能不符合您的预期,不建议开启,本文仅供测试。在实际业务中,建议您先在目标Elasticsearch实例中创建索引,再通过Logstash传输数据。创建索引的具体操作请参见快速入门
  • 创建阿里云Logstash实例,需要与Elasticsearch实例在同一专有网络下。

    具体操作,请参见创建阿里云Logstash实例

  • 在源阿里云Elasticsearch中准备测试数据。
    本文使用的测试数据如下。其中源索引的名称为yc_text,待合并的字段为app.namemessage
    {
      "took" : 2,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 6,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "HpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app1",
              "annual_rate" : "31%",
              "describe" : "可以自助选择消息推送",
              "message" : "10000"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "H5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app2",
              "annual_rate" : "35%",
              "describe" : "每天收益到账消息推送",
              "message" : "10001"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app3",
              "annual rate" : "30",
              "describe" : "每天收益会消息推送",
              "message" : "10004"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IJIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app4",
              "annual_rate" : "38%",
              "describe" : "每天收益立即到账消息推送",
              "message" : "10002"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IZIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app5",
              "annual_rate" : "40%",
              "describe" : "每天收益到账消息推送",
              "message" : "10003"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "I5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app6",
              "annual_rate" : "33%",
              "describe" : "通过短信提示获取收益消息",
              "message" : "10005"
            }
          }
        ]
      }
    }

操作步骤

  1. 登录阿里云Elasticsearch控制台
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. 在左侧导航栏,单击Logstash实例,然后在Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理
  4. 管道列表区域,单击创建管道
    创建管道
  5. 创建管道任务页面,输入管道ID并配置管道。
    本文使用的管道配置如下。
    input {
        elasticsearch {
            hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "yc_text"
            docinfo => true
        }
    }
    filter {
        mutate {
            merge =>  { "message" => "app.name" }
        }
    
        mutate {
            rename => [ "message","anger" ]
        }
    
        mutate {
            remove_field => [ "app.name" ]
        }
    }
    output {
        elasticsearch {
            hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "yc_text_new"
            document_type => "%{[@metadata][_type]}"
            document_id => "%{[@metadata][_id]}"
        }
    }
                            
    以上管道配置的原理说明如下:
    1. 通过Logstash的filter.mutate.merge参数合并源索引yc_text中的app.namemessage两个字段。合并后,Logstash会使用message字段存放原messageapp.name两个字段的数据。
    2. 通过filter.mutate.rename参数将合并后的message字段重命名为anger
    3. 前两步完成后,app.name字段还会继续存在。为了避免出现重复数据,管道配置中使用filter.mutate.remove_fieldapp.name字段移除。
    4. 最后将合并后的字段anger字段传输到yc_text_new索引中。
    说明 logstash-filter-mutate插件会按照优先级执行mutate块中的定义的操作,详细信息请参见本文的背景信息。但是您可以通过使用不同的mutate块来控制这个顺序,例如以上管道配置中使用了三个mutate块来控制执行顺序为renamemergeremove_field

    更多管道配置说明,请参见通过配置文件管理管道Logstash配置文件说明

  6. 单击保存或者保存并部署
    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。
    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

验证结果

  1. 登录目标阿里云Elasticsearch的Kibana控制台。
    具体操作请参见登录Kibana控制台
  2. 根据页面提示进入Kibana主页,单击右上角的Dev tools
  3. Console页签,执行以下脚本,查询目标索引中的信息。
    GET yc_text_new/_search
    {
      "query": {
        "match_all": {}
      }
    }

    返回结果如下。

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 6,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "H5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "每天收益到账消息推送",
              "@version" : "1",
              "anger" : [
                "10001",
                "app2"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "35%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IZIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "每天收益到账消息推送",
              "@version" : "1",
              "anger" : [
                "10003",
                "app5"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "40%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "I5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "通过短信提示获取收益消息",
              "@version" : "1",
              "anger" : [
                "10005",
                "app6"
              ],
              "@timestamp" : "2021-12-15T03:45:25.322Z",
              "annual_rate" : "33%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "HpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "可以自助选择消息推送",
              "@version" : "1",
              "anger" : [
                "10000",
                "app1"
              ],
              "@timestamp" : "2021-12-15T03:45:25.298Z",
              "annual_rate" : "31%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IJIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "每天收益立即到账消息推送",
              "@version" : "1",
              "anger" : [
                "10002",
                "app4"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "38%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "每天收益会消息推送",
              "@version" : "1",
              "anger" : [
                "10004",
                "app3"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual rate" : "30"
            }
          }
        ]
      }
    }

    根据结果可以看到,新字段anger已经整合了旧字段messageapp.name的数据,且app.name字段已被移除。