All Products
Search
Document Center

Elasticsearch:Use the logstash-filter-mutate plug-in to merge fields

Last Updated:May 10, 2023

Logstash provides the logstash-filter-mutate plug-in that allows you to merge fields. For example, when you use Logstash to transfer data, if you want to merge field a and field b as a new field c and remove field a and field b, you can use the configuration options provided by the logstash-filter-mutate plug-in to perform the operation. logstash-filter-mutate is a built-in plug-in and cannot be removed. This topic describes how to use the logstash-filter-mutate plug-in to merge fields.

Background information

The logstash-filter-mutate plug-in allows you to rename, delete, replace, or modify fields in events. The following table lists the configuration options supported by the logstash-filter-mutate plug-in. The configuration options in the same mutate block are executed in the default order as they are listed in the following table. For more information, see Mutate filter plugin.

Configuration option

Data type of input

coerce

hash

rename

hash

update

hash

replace

hash

convert

hash

gsub

array

uppercase

array

capitalize

array

lowercase

array

strip

array

remove_field

array

split

hash

join

hash

merge

hash

copy

hash

Prerequisites

  • An Alibaba Cloud Elasticsearch cluster is created.

    For more information, see Create an Alibaba Cloud Elasticsearch cluster. In this example, an Elasticsearch V7.10 cluster is used.

  • The Auto Indexing feature is enabled for the destination Elasticsearch cluster.

    For more information, see Configure the YML file.

    Note

    We recommend that you disable Auto Indexing because the indexes that are automatically created may not meet your business requirements. In this topic, the Auto Indexing feature is used only for test. In actual business scenarios, we recommend that you create an index in the destination Elasticsearch cluster and use Logstash to transfer data to the index. For more information about how to create an index, see Getting started.

  • An Alibaba Cloud Logstash cluster is created in the virtual private cloud (VPC) where the Elasticsearch cluster resides.

    For more information, see Create an Alibaba Cloud Logstash cluster.

  • Test data is prepared in the source Elasticsearch cluster.

    In this example, the following test data is used. The index in the source Elasticsearch cluster is yc_text. The fields that need to be merged are app.name and message.

    {
      "took" : 2,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 6,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "HpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app1",
              "annual_rate" : "31%",
              "describe" : "Select whether to push messages for returns",
              "message" : "10000"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "H5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app2",
              "annual_rate" : "35%",
              "describe" : "Push messages on a daily basis when returns are credited to your account",
              "message" : "10001"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app3",
              "annual rate" : "30",
              "describe" : "Push messages on a daily basis for returns",
              "message" : "10004"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IJIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app4",
              "annual_rate" : "38%",
              "describe" : "Push messages immediately on a daily basis when returns are credited to your account",
              "message" : "10002"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "IZIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app5",
              "annual_rate" : "40%",
              "describe" : "Push messages on a daily basis when returns are credited to your account",
              "message" : "10003"
            }
          },
          {
            "_index" : "yc_text",
            "_type" : "_doc",
            "_id" : "I5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "app.name" : "app6",
              "annual_rate" : "33%",
              "describe" : "Push messages for returns by using text messages",
              "message" : "10005"
            }
          }
        ]
      }
    }

Procedure

  1. Go to the Logstash Clusters page of the Alibaba Cloud Elasticsearch console.
  2. Navigate to the desired cluster.
    1. In the top navigation bar, select the region where the cluster resides.
    2. On the Logstash Clusters page, find the cluster and click its ID.
  3. In the left-side navigation pane of the page that appears, click Pipelines.
  4. On the Pipelines page, click Create Pipeline.
  5. In the Create Task wizard, enter a pipeline ID and configure the pipeline.

    In this example, the following configurations are used for the pipeline. In the configurations, the source and destination Elasticsearch clusters are the same. You can specify different Elasticsearch clusters based on your business requirements.

    input {
        elasticsearch {
            hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "yc_text"
            docinfo => true
        }
    }
    filter {
        mutate {
            merge =>  { "message" => "app.name" }
        }
    
        mutate {
            rename => [ "message","anger" ]
        }
    
        mutate {
            remove_field => [ "app.name" ]
        }
    }
    output {
        elasticsearch {
            hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "yc_text_new"
            document_type => "%{[@metadata][_type]}"
            document_id => "%{[@metadata][_id]}"
        }
    }
                            

    The following descriptions show how Logstash works after the pipeline is configured:

    1. Logstash merges the app.name and message fields in the source index yc_text based on the setting of the filter.mutate.merge parameter. After the fields are merged, Logstash uses the message field to store the data of the message and app.name fields.

    2. Logstash renames the message field obtained after the merge based on the setting of the filter.mutate.rename parameter. The message field is renamed anger.

    3. After the preceding operations are complete, the app.name field still exists. To prevent duplicate data, Logstash removes the app.name field. Logstash removes fields based on the setting of the filter.mutate.remove_field parameter.

    4. Logstash transfers the anger field to the destination index yc_text_new.

    Note

    The logstash-filter-mutate plug-in executes the configuration options in the same mutate block in the default order. For more information about the default order, see Background information. You can use separate mutate blocks to customize the order in which the configuration options are executed. For example, in the preceding configuration, three mutate blocks are used to define the rename, merge, and remove_field configuration options. This way, they are executed in the order as expected.

    For more information about pipeline configurations, see Use configuration files to manage pipelines and Logstash configuration files.

    Warning After you configure the parameters, you must save the settings and deploy the pipeline. This triggers a restart of the Logstash cluster. Before you can proceed, make sure that the restart does not affect your services.
  6. Click Save or Save and Deploy.
    • Save: After you click this button, the system stores the pipeline settings and triggers a cluster change. However, the settings do not take effect. After you click Save, the Pipelines page appears. On the Pipelines page, find the created pipeline and click Deploy in the Actions column. Then, the system restarts the Logstash cluster to make the settings take effect.
    • Save and Deploy: After you click this button, the system restarts the Logstash cluster to make the settings take effect.

Verify the result

  1. Log on to the Kibana console of the destination Elasticsearch cluster.

    For more information, see Log on to the Kibana console.

  2. Go to the homepage of the Kibana console as prompted and click Dev tools in the upper-right corner.

  3. On the Console tab of the page that appears, run the following command to query the destination index in the Elasticsearch cluster:

    GET yc_text_new/_search
    {
      "query": {
        "match_all": {}
      }
    }

    If the command is successfully run, the following result is returned:

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 6,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "H5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "Push messages on a daily basis when returns are credited to your account",
              "@version" : "1",
              "anger" : [
                "10001",
                "app2"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "35%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IZIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "Push messages on a daily basis when returns are credited to your account",
              "@version" : "1",
              "anger" : [
                "10003",
                "app5"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "40%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "I5IduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "Push messages for returns by using text messages",
              "@version" : "1",
              "anger" : [
                "10005",
                "app6"
              ],
              "@timestamp" : "2021-12-15T03:45:25.322Z",
              "annual_rate" : "33%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "HpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "Select whether to push messages for returns",
              "@version" : "1",
              "anger" : [
                "10000",
                "app1"
              ],
              "@timestamp" : "2021-12-15T03:45:25.298Z",
              "annual_rate" : "31%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IJIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "Push messages immediately on a daily basis when returns are credited to your account",
              "@version" : "1",
              "anger" : [
                "10002",
                "app4"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual_rate" : "38%"
            }
          },
          {
            "_index" : "yc_text_new",
            "_type" : "_doc",
            "_id" : "IpIduH0BWiRrY8Azn65i",
            "_score" : 1.0,
            "_source" : {
              "describe" : "Push messages on a daily basis for returns",
              "@version" : "1",
              "anger" : [
                "10004",
                "app3"
              ],
              "@timestamp" : "2021-12-15T03:45:25.321Z",
              "annual rate" : "30"
            }
          }
        ]
      }
    }

    The result shows that the anger field stores the data of the message and app.name fields, and the app.name field is removed.