All Products
Search
Document Center

Elasticsearch:Use the logstash-input-oss plugin

Last Updated:Mar 26, 2026

The logstash-input-oss plug-in connects Alibaba Cloud Logstash to Object Storage Service (OSS) through Simple Message Queue (formerly MNS) (SMQ). When an OSS object is updated, SMQ delivers a notification to the plug-in, which triggers Logstash to read the latest data from OSS.

logstash-input-oss is an open source plug-in. For more information, see the GitHub repository.

How it works

  1. An OSS object is created or updated (for example, via PutObject or AppendObject).

  2. OSS sends an event notification to an SMQ queue.

  3. The plug-in receives the SMQ notification and extracts the object key.

  4. Logstash reads all data in the object and sends it downstream through the pipeline.

Usage notes

  • After receiving an SMQ notification, Logstash synchronizes all data in the associated object — not just the delta.

  • Objects in .gz or .gzip format are processed as gzip. All other formats are processed as plain text.

  • Objects in binary formats (such as .jar or .bin) may be read as garbled characters.

Prerequisites

Before you begin, make sure you have:

Create a pipeline

Create a pipeline using configuration files as described in Use configuration files to manage pipelines. Configure the parameters in the Parameters section, then save and deploy the pipeline.

The following example reads data from an OSS bucket and writes it to Alibaba Cloud Elasticsearch.

input {
  oss {
    endpoint => "oss-cn-hangzhou-internal.aliyuncs.com"
    bucket => "zl-ossou****"
    access_key_id => "******"
    access_key_secret => "*********"
    prefix => "file-sample-prefix"
    mns_settings => {
      endpoint => "******.mns.cn-hangzhou-internal.aliyuncs.com"
      queue => "aliyun-es-sample-mns"
    }
    codec => json {
      charset => "UTF-8"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
    index => "aliyun-es-sample"
    user => "elastic"
    password => "changeme"
  }
}
Important

The SMQ endpoint must be an internal endpoint and must not be prefixed with http. Using an external endpoint causes an error.

Parameters

Required parameters

Parameter Type Description
endpoint string The endpoint used to access OSS. See Regions, endpoints and open ports.
bucket string The name of the OSS bucket.
access_key_id string The AccessKey ID of your Alibaba Cloud account.
access_key_secret string The AccessKey secret of your Alibaba Cloud account.
mns_settings hash The SMQ configuration. See mns_settings sub-parameters.

Optional parameters

Parameter Type Default Description
prefix string Filters objects by name prefix. Logstash reads only objects whose names start with this value. Not a regular expression. Use this parameter to read from specific directories in the bucket.
additional_oss_settings hash Additional OSS client settings. Supported sub-parameters: secure_connection_enabled (enable secure connections) and max_connections_to_oss (maximum number of connections to OSS).
delete boolean false Whether to delete processed objects from the source OSS bucket after reading.
backup_to_bucket string The name of the OSS bucket to which processed objects are backed up.
backup_to_dir string The local directory to which processed files are backed up.
backup_add_prefix string A prefix added to the object key after processing. The key is the full path including the object name in OSS. Use this parameter to store backups in a specific folder within the same or a different bucket.
include_object_properties boolean Whether to include OSS object properties (last_modified, content_type, and metadata) in [@metadata][oss]. If not set, only [@metadata][oss][key] is available.
exclude_pattern string A Ruby regular expression matching object keys to skip. For example, "\/logs\/debug\/" excludes all objects with /logs/debug/ in the path.

mns_settings sub-parameters

Sub-parameter Required Default Description
endpoint Yes The SMQ endpoint. Must be an internal endpoint and must not include http.
queue Yes The name of the SMQ queue.
poll_interval_seconds No 10 The maximum wait time (in seconds) for a ReceiveMessage request when the queue is empty. See ReceiveMessage.
wait_seconds No The maximum polling wait time (in seconds) for a ReceiveMessage request.

Object metadata

When include_object_properties is enabled, the plug-in exposes the following fields in [@metadata][oss]. Use these fields in your filter or output stages to enrich events or apply conditional logic.

Field Type Description
[@metadata][oss][key] string The full object key (path) in OSS. Always available, regardless of include_object_properties.
[@metadata][oss][last_modified] string The timestamp when the object was last modified. Available when include_object_properties is true.
[@metadata][oss][content_type] string The MIME type of the object. Available when include_object_properties is true.
[@metadata][oss][metadata] hash Custom user-defined metadata attached to the object. Available when include_object_properties is true.

FAQ

Why is the plug-in based on SMQ instead of polling OSS directly?

OSS object update events integrate seamlessly with SMQ, making message-based notification a natural fit. The alternative — using the ListObjects API — requires tracking which objects have and have not been processed in local storage. As the number of objects grows, ListObjects performance degrades. The broader object storage ecosystem (including the Amazon S3 open source community) has also moved away from ListObjects in favor of message notification mechanisms.

What happens if Logstash is triggered while OSS is still writing data to an object?

The plug-in records in the SMQ queue which objects have been written and transmits them through the Logstash pipeline. Any data not yet written to OSS continues to be written. The next time Logstash is triggered, the plug-in reads the remaining data from OSS.

What's next