When you use Logstash to transfer data from a source to an Elasticsearch cluster, you may need to split data in the source into multiple values, assign the values to fields, and then write the fields to the Elasticsearch cluster in specific business scenarios. For example, if logs in the source contain data that is separated by vertical bars (|), you can use Logstash to split the data into multiple values by vertical bar (|), assign the values to fields, and then write the fields to the Elasticsearch cluster. This topic describes how to use Logstash to split data into values, assign the values to fields, and then write the fields to an Elasticsearch cluster.

Background information

logstash-filter-mutate is a filter plug-in that allows you to perform specific operations on fields in events. For example, you can use this plug-in to split, rename, delete, replace, and modify fields in events. For more information about the logstash-filter-mutate plug-in, see Mutate filter plugin. The following table lists the common configuration items supported by all filter plug-ins. The configuration items are optional. For more information, see Common Options.
Configuration item Data type of input
add_field hash
add_tag array
enable_metric boolean
id string
periodic_flush boolean
remove_field array
remove_tag array

Prerequisites

The following operations are performed:
  • An Elasticsearch cluster is created.

    For more information, see Create an Alibaba Cloud Elasticsearch cluster. In this topic, an Elasticsearch V7.10 cluster is used.

  • The Auto Indexing feature is enabled for the Elasticsearch cluster.
    For more information, see Configure the YML file.
    Note In this topic, the Auto Indexing feature is used only for testing. In actual business scenarios, this feature is not recommended because the indexes that are automatically created may not meet your business requirements. Instead, we recommend that you manually create an index in the Elasticsearch cluster before you use Logstash to transfer data to the index. For more information about how to create an index, see Getting started.
  • A Logstash cluster is created in the VPC where the Elasticsearch cluster resides.

    For more information, see Create an Alibaba Cloud Logstash cluster.

  • Test data is prepared.
    In this example, a data record named LogMessage in the logs collected by Beats is used as test data. For more information about how to configure Beats to collect logs, see Collect the logs of an ECS instance. The data in the LogMessage data record is separated by vertical bars (|). Logstash is used to split the data in the data record into multiple values by vertical bar (|). The values are sequentially assigned to the following fields: mobile, appName, type, timestamp, status, code, component, cid, serviceId, serviceName, serviceType, and param. Then, the fields are transferred to the Elasticsearch cluster.
    LogMessage: |1390000****|jop|byORP|2022-04-18T14:18:16.633|/log/cms/send|200|pluginNums=0,pluginStatus=0||||||

Procedure

  1. Log on to the Elasticsearch console.
  2. Navigate to the desired cluster.
    1. In the top navigation bar, select the region where the cluster resides.
    2. In the left-side navigation pane, click Logstash Clusters. On the Logstash Clusters page, find the cluster and click its ID.
  3. In the left-side navigation pane of the page that appears, click Pipelines.
  4. On the Pipelines page, click Create Pipeline.
  5. In the Create Task wizard, enter a pipeline ID and configure the pipeline.
    In this example, the following configurations are used for the pipeline:
    input {
        beats {
            port => 8001
        }
    }
    filter {
        mutate {
            gsub => ["message","\|","| "]
            split => ["message","|"]
            add_field => {
                "mobile" => "%{[message][1]}"
                "appName" => "%{[message][2]}"
                "type" => "%{[message][3]}"
                "timestamp" => "%{[message][4]}"
                "status" => "%{[message][5]}"
                "code" => "%{[message][6]}"
                "component" => "%{[message][7]}"
                "cid" => "%{[message][8]}"
                "serviceId" => "%{[message][9]}"
                "serviceName" => "%{[message][10]}"
                "serviceType" => "%{[message][11]}"
                "param" => "%{[message][12]}"
            }
        }
        mutate {
            strip => ["mobile","appName","type","timestamp","status","code","component","cid","serviceId","serviceName","serviceType","param"]
        }
    }
    output {
        elasticsearch {
            index => "<yourIndexName>"
            hosts => ["es-cn-7mz2mu1zp0006****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "<yourPassword>"
        }
    }                
    Notice
    • The input.beats.port parameter specifies the port number over which the logs collected by Beats are transferred to the Logstash pipeline. The value of this parameter must range from 8000 to 9000.
    • In the gsub => ["message","\|","| "] setting in the filter part, a whitespace is inserted after the second vertical bar (|).
    • When you use the preceding configurations, you must set the index parameter to the name of the index that you use in your business, the hosts parameter to the host of your Elasticsearch cluster, and the password parameter to the password of your elastic account in the output part.
    The following descriptions show how Logstash works after the pipeline is configured:
    1. The regular expression \| is used in the filter.mutate.gsub parameter to match the vertical bars (|) in the LogMessage data record and replace each of the matched vertical bars (|) with a vertical bar that is followed by a whitespace (| ). Replacement result:
      LogMessage: | 1390000****| jop| byORP| 2022-04-18T14:18:16.633| /log/cms/send| 200| pluginNums=0,pluginStatus=0| | | | | |
    2. The filter.mutate.split parameter is used to split the data in the LogMessage data record by vertical bar (|).
    3. The filter.mutate.add_field parameter is used to sequentially assign the values obtained after the split to the fields. The following results are obtained after the values are assigned to the fields:
      "mobile":" 1390000****",
      "appName":" jop",
      "type":" byORP",
      "timestamp":" 2022-04-18T14:18:16.633",
      "status":" /log/cms/sen",
      "code":" 200",
      "component":" pluginNums=0,pluginStatus=0",
      "cid":" ",
      "serviceId":" ",
      "serviceName":" ",
      "serviceType":" ",
      "param":" "
    4. The filter.mutate.strip parameter is used to remove the whitespace before each value.

    For more information about pipeline configurations, see Use configuration files to manage pipelines and Logstash configuration files.

  6. Click Save or Save and Deploy.
    • Save: After you click this button, the system stores the pipeline settings and triggers a cluster change. However, the settings do not take effect. After you click Save, the Pipelines page appears. On the Pipelines page, find the created pipeline and click Deploy in the Actions column. Then, the system restarts the Logstash cluster to make the settings take effect.
    • Save and Deploy: After you click this button, the system restarts the Logstash cluster to make the settings take effect.

Verify the result

  1. Log on to the Kibana console of your Elasticsearch cluster and go to the homepage of the Kibana console as prompted.
    For more information about how to log on to the Kibana console, see Log on to the Kibana console.
    Note In this example, an Elasticsearch V7.10.0 cluster is used. Operations on clusters of other versions may differ. The actual operations in the console prevail.
  2. In the upper-right corner of the page that appears, click Dev tools.
  3. On the Console tab of the page that appears, run the following command to query the index in the Elasticsearch cluster:
    GET <yourIndexName>/_search
    {
      "query": {
        "match_all": {}
      }
    }
    Note The value of the <yourIndexName> parameter must be the same as the value of the index parameter in the pipeline configurations.

    If the command is successfully run, the following result is returned:

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "<yourIndexName>",
            "_type" : "_doc",
            "_id" : "Lb1UWoAB-6Zo6en4luDi",
            "_score" : 1.0,
            "_source" : {
              "mobile" : "1390000****",
              "appName" : "jop",
              "type" : "byORP",
              "timestamp" : "2022-04-18T14:18:16.633",
              "status" : "/log/cms/sen",
              "code" : "200",
              "component" : "pluginNums=0,pluginStatus=0",
              "cid" : "",
              "serviceId" : "",
              "serviceName" : "",
              "serviceType" : "",
              "param" : ""
            }
          }
        ]
      }
    }