All Products
Search
Document Center

Elasticsearch:Use Logstash to rename a field

Last Updated:Mar 25, 2026

Field names that contain special characters — such as the at sign (@) — can block downstream tools like DataWorks from migrating data. This topic shows you how to use the Logstash mutate filter plugin to rename such a field in an Alibaba Cloud Elasticsearch index.

Choose a method

Two methods are available for renaming a field:

MethodBest forComplexity
Logstash `mutate` filter plugin (this topic)Ongoing or large-scale migrations that require a configurable pipelineHigher
Reindex API with a Painless scriptOne-time, small-scale migrationsLower

For a one-time migration, the Reindex API is simpler. Submit a single request to copy data and rename the field in one step:

POST _reindex
{
  "source": { "index": "product_info" },
  "dest": { "index": "product_info2" },
  "script": {
    "source": "ctx._source.ctxt_user_info = ctx._source.remove('@ctxt_user_info')"
  }
}

Prerequisites

Before you begin, ensure that you have:

Prepare test data

  1. Log on to the Kibana console of the Elasticsearch cluster. For more information, see Log on to the Kibana console.

  2. In the upper-left corner, click the menu.png icon and choose Management > Dev Tools.

  3. On the Console tab, run the following command to create a source index named product_info:

    PUT /product_info
    {
        "settings": {
            "number_of_shards": 5,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "@ctxt_user_info": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                }
            }
        }
    }
  4. Run the following command to insert test data into the source index:

    POST /product_info/_doc/_bulk
    {"index":{}}
    {"@ctxt_user_info":"test1"}
    {"index":{}}
    {"@ctxt_user_info":"test1"}
  5. Run the following command to verify that the source index contains the @ctxt_user_info field:

    GET /product_info/_search

    The response confirms the field name includes an at sign (@):

    {
      "took" : 16,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 2,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "product_info",
            "_type" : "_doc",
            "_id" : "3BDMUZMBh7mRlA4aS0Nq",
            "_score" : 1.0,
            "_routing" : "74873",
            "_source" : {
              "@ctxt_user_info" : "test1"
            }
          },
          {
            "_index" : "product_info",
            "_type" : "_doc",
            "_id" : "3RDMUZMBh7mRlA4aS0Nq",
            "_score" : 1.0,
            "_routing" : "74873",
            "_source" : {
              "@ctxt_user_info" : "test1"
            }
          }
        ]
      }
    }

Rename a field using Logstash

Step 1: Create the destination index

Skip this step if Auto Indexing is enabled for your cluster. We recommend disabling Auto Indexing when you need precise control over index mappings, because automatically created indexes may not match your requirements.

On the Dev Tools page of the Kibana console, run the following command to create a destination index named product_info2:

PUT /product_info2
{
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "ctxt_user_info": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                    }
                }
            }
        }
    }
}

Step 2: Create and configure a Logstash pipeline

  1. Go to the Logstash Clusters page of the Alibaba Cloud Elasticsearch console.

  2. In the top navigation bar, select the region where your cluster resides. On the Logstash Clusters page, find the cluster and click its ID.

  3. In the left-side navigation pane, click Pipelines.

  4. On the Pipelines page, click Create Pipeline.

  5. On the Create page, enter a pipeline ID and configure the pipeline with the following settings:

    input {
        elasticsearch {
            hosts => ["http://es-cn-tl32gid**********.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "product_info"
            docinfo => true
        }
    }
    filter {
        mutate {
            rename => { "@ctxt_user_info" => "ctxt_user_info" }
        }
    }
    output {
        elasticsearch {
            hosts => ["http://es-cn-tl32gid**********.elasticsearch.aliyuncs.com:9200"]
            user => "elastic"
            password => "your_password"
            index => "product_info2"
            document_type => "%{[@metadata][_type]}"
            document_id => "%{[@metadata][_id]}"
        }
    }

    The filter.mutate.rename parameter renames the field as data flows from the source index to the destination index. Be aware of these behaviors:

    • If the destination field already exists, its value is overwritten.

    • If the source field does not exist, no action is taken for that field — this is not treated as an error.

    Warning

    Saving and deploying the pipeline restarts the Logstash cluster. Make sure the restart does not affect your business before proceeding.

  6. Click Next.

  7. Configure pipeline parameters.

  8. Click Save or Save and Deploy:

    • Save: Stores the pipeline settings and triggers a cluster change, but the settings do not take effect immediately. After saving, go to the Pipelines page, find the pipeline, and click Deploy Now in the Actions column to restart the cluster and apply the settings.

    • Save and Deploy: Restarts the Logstash cluster immediately to apply the settings.

Step 3: Verify the result

On the Dev Tools page of the Kibana console, run the following command to query the destination index:

GET product_info2/_search

Confirm that the field is now named ctxt_user_info — without the @ prefix:

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "product_info2",
        "_type" : "_doc",
        "_id" : "r5N7fn0BKQKHRO31rK6C",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2021-12-03T04:14:26.872Z",
          "@version" : "1",
          "ctxt_user_info" : "test1"
        }
      },
      {
        "_index" : "product_info2",
        "_type" : "_doc",
        "_id" : "rpN7fn0BKQKHRO31rK6C",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2021-12-03T04:14:26.871Z",
          "@version" : "1",
          "ctxt_user_info" : "test2"
        }
      }
    ]
  }
}