All Products
Search
Document Center

Elasticsearch:Use Logstash to split data and extract it into fields

Last Updated:Mar 26, 2026

When log data arrives as a single pipe-delimited string, you can use the logstash-filter-mutate plugin to split that string into individual named fields before writing to Elasticsearch. This topic shows you how to configure the pipeline, explains how each mutate operation transforms the data, and shows you how to verify the result in Kibana.

Prerequisites

Before you begin, ensure that you have:

  • An Alibaba Cloud Elasticsearch cluster (V7.10 in this example). For setup instructions, see Create an Alibaba Cloud Elasticsearch cluster.

  • Auto Indexing enabled on the Elasticsearch cluster. For instructions, see Configure the YML file.

    Auto Indexing is used here for testing only. In production, create indexes manually before transferring data — auto-created indexes may not match your schema. See Getting started.
  • An Alibaba Cloud Logstash cluster in the same virtual private cloud (VPC) as the Elasticsearch cluster. For setup instructions, see Create an Alibaba Cloud Logstash cluster.

  • Test data prepared. This example uses a LogMessage data record collected by Beats, where fields are separated by vertical bars (|):

    LogMessage: |1390000****|jop|byORP|2022-04-18T14:18:16.633|/log/cms/send|200|pluginNums=0,pluginStatus=0||||||

    The pipeline splits this record and assigns values sequentially to: mobile, appName, type, timestamp, status, code, component, cid, serviceId, serviceName, serviceType, and param. For instructions on collecting logs with Beats, see Collect the logs of an ECS instance.

Background

The logstash-filter-mutate plugin lets you transform fields in log events — split, rename, delete, replace, and modify them. See the Mutate filter plugin reference for the full option list.

All filter plugins share a set of common configuration options:

Option Data type
add_field hash
add_tag array
enable_metric boolean
id string
periodic_flush boolean
remove_field array
remove_tag array

These options are optional. See Common options for details.

Create a pipeline

  1. Go to the Logstash Clusters page in the Alibaba Cloud Elasticsearch console.

  2. In the top navigation bar, select the region where your cluster resides. On the Logstash Clusters page, find your cluster and click its ID.

  3. In the left-side navigation pane, click Pipelines.

  4. On the Pipelines page, click Create Pipeline.

  5. In the Create wizard, enter a pipeline ID and configure the pipeline. A Logstash pipeline has three sections: Use the following configuration:

    • Input — defines the data source (Beats in this example)

    • Filter — transforms events before indexing

    • Output — sends processed data to its destination (Elasticsearch)

    1. gsub — Uses the regular expression \| to match each vertical bar (|) and replaces it with a vertical bar followed by a space (| ). The backslash escapes the pipe character, which has special meaning in regex. After gsub:

      LogMessage: | 1390000****| jop| byORP| 2022-04-18T14:18:16.633| /log/cms/send| 200| pluginNums=0,pluginStatus=0| | | | | |
    2. split — Splits the message field on the vertical bar (|), producing an array of values.

    3. add_field — Assigns each array element to a named field by index. After this step, each value has a leading space:

      "mobile":" 1390000****",
      "appName":" jop",
      "type":" byORP",
      "timestamp":" 2022-04-18T14:18:16.633",
      "status":" /log/cms/sen",
      "code":" 200",
      "component":" pluginNums=0,pluginStatus=0",
      "cid":" ",
      "serviceId":" ",
      "serviceName":" ",
      "serviceType":" ",
      "param":" "
    4. strip (second mutate block) — Removes the leading space from each field value.

    Important

    The input.beats.port value must be in the range 8000–9000.

    Warning

    Saving and deploying a pipeline triggers a restart of the Logstash cluster. Make sure the restart won't affect your running workloads before proceeding.

    Placeholder Description
    <yourIndexName> The name of the destination index
    es-cn-7mz2mu1zp0006****.elasticsearch.aliyuncs.com:9200 The host address of your Elasticsearch cluster
    <yourPassword> The password for the elastic account
    input {
        beats {
            port => 8001
        }
    }
    filter {
        mutate {
            gsub => ["message","\|","| "]
            split => ["message","|"]
            add_field => {
                "mobile" => "%{[message][1]}"
                "appName" => "%{[message][2]}"
                "type" => "%{[message][3]}"
                "timestamp" => "%{[message][4]}"
                "status" => "%{[message][5]}"
                "code" => "%{[message][6]}"
                "component" => "%{[message][7]}"
                "cid" => "%{[message][8]}"
                "serviceId" => "%{[message][9]}"
                "serviceName" => "%{[message][10]}"
                "serviceType" => "%{[message][11]}"
                "param" => "%{[message][12]}"
            }
        }
        mutate {
            strip => ["mobile","appName","type","timestamp","status","code","component","cid","serviceId","serviceName","serviceType","param"]
        }
    }
    output {
        elasticsearch {
            index => "<yourIndexName>"
            hosts => ["es-cn-7mz2mu1zp0006****.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "<yourPassword>"
        }
    }

    Replace the following placeholders in the output section: How the filter transforms your data The filter applies four operations in sequence: For more information about pipeline configuration, see Use configuration files to manage pipelines and Logstash configuration files.

  6. Click Save or Save and Deploy.

    • Save — Stores the pipeline settings without applying them. On the Pipelines page, find the pipeline and click Deploy Now in the Actions column to apply the settings and restart the cluster.

    • Save and Deploy — Stores the settings and immediately restarts the Logstash cluster to apply them.

Verify the result

  1. Log on to the Kibana console of your Elasticsearch cluster. For instructions, see Log on to the Kibana console.

    This example uses Elasticsearch V7.10.0. Steps may differ for other versions — refer to the actual console.
  2. In the upper-right corner, click Dev tools.

  3. On the Console tab, run the following query to retrieve documents from the index:

    GET <yourIndexName>/_search
    {
      "query": {
        "match_all": {}
      }
    }

    Use the same index name as the index parameter in your pipeline configuration. A successful result looks like this:

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "<yourIndexName>",
            "_type" : "_doc",
            "_id" : "Lb1UWoAB-6Zo6en4luDi",
            "_score" : 1.0,
            "_source" : {
              "mobile" : "1390000****",
              "appName" : "jop",
              "type" : "byORP",
              "timestamp" : "2022-04-18T14:18:16.633",
              "status" : "/log/cms/sen",
              "code" : "200",
              "component" : "pluginNums=0,pluginStatus=0",
              "cid" : "",
              "serviceId" : "",
              "serviceName" : "",
              "serviceType" : "",
              "param" : ""
            }
          }
        ]
      }
    }