The logstash-filter-mutate plug-in lets you rename, delete, replace, and modify fields in Logstash events. A common use case is merging two fields into one, renaming the merged result, and removing the originals. This topic walks through that workflow using Alibaba Cloud Logstash and Elasticsearch.
Plugin reference
The logstash-filter-mutate plug-in is built in and cannot be removed. It supports the following configuration options, listed in their default execution order within a single mutate block.
| Configuration option | Input type | Description |
|---|---|---|
coerce | hash | Set the default value of an existing field that is null |
rename | hash | Rename a field |
update | hash | Update an existing field with a new value |
replace | hash | Replace the value of a field, or create the field if it does not exist |
convert | hash | Convert a field value to a different type |
gsub | array | Replace all matches of a regular expression in a string field |
uppercase | array | Convert a string field to uppercase |
capitalize | array | Capitalize a string field |
lowercase | array | Convert a string field to lowercase |
strip | array | Remove leading and trailing whitespace from a string field |
remove_field | array | Remove one or more fields |
split | hash | Split a field by a separator into an array |
join | hash | Join an array field into a string using a separator |
merge | hash | Merge two fields into one array |
copy | hash | Copy an existing field to a new field |
For the full reference, see Mutate filter plugin.
Processing order
Options in the same mutate block always execute in the order shown in the table above, regardless of the order you write them. To run options in a custom sequence, put each option in its own mutate block.
For example, the following pipeline uses three separate mutate blocks so that merge runs before rename, and rename runs before remove_field:
filter {
mutate {
merge => { "message" => "app.name" }
}
mutate {
rename => [ "message", "anger" ]
}
mutate {
remove_field => [ "app.name" ]
}
}If you placed all three options in a single mutate block, rename would execute before merge (because rename precedes merge in the default order), producing unexpected results.
Prerequisites
Before you begin, ensure that you have:
An Alibaba Cloud Elasticsearch cluster (V7.10 in this example). For more information, see Create an Alibaba Cloud Elasticsearch cluster
Auto Indexing enabled on the destination Elasticsearch cluster. For more information, see Configure the YML file
NoteAuto Indexing is used here for testing only. In production, create the destination index manually and disable Auto Indexing, because auto-created indexes may not match your schema. For more information, see Getting started.
An Alibaba Cloud Logstash cluster in the same virtual private cloud (VPC) as the Elasticsearch cluster. For more information, see Create an Alibaba Cloud Logstash cluster
Test data in the source Elasticsearch index
yc_text. The fields to merge areapp.nameandmessage. The following example shows the test data used in this topic:{ "took" : 2, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 6, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "yc_text", "_type" : "_doc", "_id" : "HpIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app1", "annual_rate" : "31%", "describe" : "Select whether to push messages for returns", "message" : "10000" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "H5IduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app2", "annual_rate" : "35%", "describe" : "Push messages on a daily basis when returns are credited to your account", "message" : "10001" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "IpIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app3", "annual rate" : "30", "describe" : "Push messages on a daily basis for returns", "message" : "10004" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "IJIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app4", "annual_rate" : "38%", "describe" : "Push messages immediately on a daily basis when returns are credited to your account", "message" : "10002" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "IZIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app5", "annual_rate" : "40%", "describe" : "Push messages on a daily basis when returns are credited to your account", "message" : "10003" } }, { "_index" : "yc_text", "_type" : "_doc", "_id" : "I5IduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "app.name" : "app6", "annual_rate" : "33%", "describe" : "Push messages for returns by using text messages", "message" : "10005" } } ] } }
Create a pipeline to merge fields
Go to the Logstash Clusters page of the Alibaba Cloud Elasticsearch console.
In the top navigation bar, select the region where your cluster resides.
On the Logstash Clusters page, find your cluster and click its ID.
In the left-side navigation pane, click Pipelines.
On the Pipelines page, click Create Pipeline.
In the Create wizard, enter a pipeline ID and configure the pipeline. The following pipeline reads from the source index
yc_text, mergesapp.nameintomessage, renames the merged field toanger, removes the originalapp.namefield, and writes the result to the destination indexyc_text_new. In this example, the source and destination Elasticsearch clusters are the same. Adjust the hosts, credentials, and index names to match your setup.merge: Merges
app.nameintomessage. The result is stored inmessageas an array containing the values of both fields.rename: Renames
messagetoanger.remove_field: Removes the
app.namefield to eliminate duplicated data.
WarningSaving or deploying a pipeline restarts the Logstash cluster. Make sure the restart does not affect your business before proceeding.
input { elasticsearch { hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"] user => "elastic" password => "your_password" index => "yc_text" docinfo => true } } filter { mutate { merge => { "message" => "app.name" } } mutate { rename => [ "message","anger" ] } mutate { remove_field => [ "app.name" ] } } output { elasticsearch { hosts => ["http://es-cn-tl3264bqv001d****.elasticsearch.aliyuncs.com:9200"] user => "elastic" password => "your_password" index => "yc_text_new" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}" } }The pipeline runs three mutate operations in sequence: After the pipeline runs, the
angerfield inyc_text_newholds the combined values, for example["10001", "app2"]. For more information about pipeline configuration, see Use configuration files to manage pipelines and Logstash configuration files.Click Save or Save and Deploy.
Save: Stores the pipeline settings but does not apply them. On the Pipelines page, find the pipeline and click Deploy Now in the Actions column to restart the cluster and apply the settings.
Save and Deploy: Stores and immediately applies the settings by restarting the Logstash cluster.
Verify the result
Log on to the Kibana console of the destination Elasticsearch cluster. For more information, see Log on to the Kibana console.
Click Dev tools in the upper-right corner.
On the Console tab, run the following query to retrieve documents from the destination index:
GET yc_text_new/_search { "query": { "match_all": {} } }A successful response looks similar to the following:
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 6, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "H5IduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "Push messages on a daily basis when returns are credited to your account", "@version" : "1", "anger" : [ "10001", "app2" ], "@timestamp" : "2021-12-15T03:45:25.321Z", "annual_rate" : "35%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "IZIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "Push messages on a daily basis when returns are credited to your account", "@version" : "1", "anger" : [ "10003", "app5" ], "@timestamp" : "2021-12-15T03:45:25.321Z", "annual_rate" : "40%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "I5IduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "Push messages for returns by using text messages", "@version" : "1", "anger" : [ "10005", "app6" ], "@timestamp" : "2021-12-15T03:45:25.322Z", "annual_rate" : "33%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "HpIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "Select whether to push messages for returns", "@version" : "1", "anger" : [ "10000", "app1" ], "@timestamp" : "2021-12-15T03:45:25.298Z", "annual_rate" : "31%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "IJIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "Push messages immediately on a daily basis when returns are credited to your account", "@version" : "1", "anger" : [ "10002", "app4" ], "@timestamp" : "2021-12-15T03:45:25.321Z", "annual_rate" : "38%" } }, { "_index" : "yc_text_new", "_type" : "_doc", "_id" : "IpIduH0BWiRrY8Azn65i", "_score" : 1.0, "_source" : { "describe" : "Push messages on a daily basis for returns", "@version" : "1", "anger" : [ "10004", "app3" ], "@timestamp" : "2021-12-15T03:45:25.321Z", "annual rate" : "30" } } ] } }Each document now has an
angerfield containing the merged values ofmessageandapp.nameas an array. The originalapp.namefield is absent.