The logstash-input-oss plug-in connects Alibaba Cloud Logstash to Object Storage Service (OSS) through Simple Message Queue (formerly MNS) (SMQ). When an OSS object is updated, SMQ delivers a notification to the plug-in, which triggers Logstash to read the latest data from OSS.
logstash-input-oss is an open source plug-in. For more information, see the GitHub repository.
How it works
-
An OSS object is created or updated (for example, via PutObject or AppendObject).
-
OSS sends an event notification to an SMQ queue.
-
The plug-in receives the SMQ notification and extracts the object key.
-
Logstash reads all data in the object and sends it downstream through the pipeline.
Usage notes
-
After receiving an SMQ notification, Logstash synchronizes all data in the associated object — not just the delta.
-
Objects in
.gzor.gzipformat are processed as gzip. All other formats are processed as plain text. -
Objects in binary formats (such as
.jaror.bin) may be read as garbled characters.
Prerequisites
Before you begin, make sure you have:
-
Installed the logstash-input-oss plug-in. See Install or remove a Logstash plug-in.
-
Activated OSS and SMQ, with OSS buckets in the same region as the SMQ queues or topics. See Activate OSS and Activate SMQ and authorize RAM users to access SMQ.
-
Configured event notification rules in OSS. See Configure event notification rules.
Create a pipeline
Create a pipeline using configuration files as described in Use configuration files to manage pipelines. Configure the parameters in the Parameters section, then save and deploy the pipeline.
The following example reads data from an OSS bucket and writes it to Alibaba Cloud Elasticsearch.
input {
oss {
endpoint => "oss-cn-hangzhou-internal.aliyuncs.com"
bucket => "zl-ossou****"
access_key_id => "******"
access_key_secret => "*********"
prefix => "file-sample-prefix"
mns_settings => {
endpoint => "******.mns.cn-hangzhou-internal.aliyuncs.com"
queue => "aliyun-es-sample-mns"
}
codec => json {
charset => "UTF-8"
}
}
}
output {
elasticsearch {
hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
index => "aliyun-es-sample"
user => "elastic"
password => "changeme"
}
}
The SMQ endpoint must be an internal endpoint and must not be prefixed with http. Using an external endpoint causes an error.
Parameters
Required parameters
| Parameter | Type | Description |
|---|---|---|
endpoint |
string | The endpoint used to access OSS. See Regions, endpoints and open ports. |
bucket |
string | The name of the OSS bucket. |
access_key_id |
string | The AccessKey ID of your Alibaba Cloud account. |
access_key_secret |
string | The AccessKey secret of your Alibaba Cloud account. |
mns_settings |
hash | The SMQ configuration. See mns_settings sub-parameters. |
Optional parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
prefix |
string | — | Filters objects by name prefix. Logstash reads only objects whose names start with this value. Not a regular expression. Use this parameter to read from specific directories in the bucket. |
additional_oss_settings |
hash | — | Additional OSS client settings. Supported sub-parameters: secure_connection_enabled (enable secure connections) and max_connections_to_oss (maximum number of connections to OSS). |
delete |
boolean | false |
Whether to delete processed objects from the source OSS bucket after reading. |
backup_to_bucket |
string | — | The name of the OSS bucket to which processed objects are backed up. |
backup_to_dir |
string | — | The local directory to which processed files are backed up. |
backup_add_prefix |
string | — | A prefix added to the object key after processing. The key is the full path including the object name in OSS. Use this parameter to store backups in a specific folder within the same or a different bucket. |
include_object_properties |
boolean | — | Whether to include OSS object properties (last_modified, content_type, and metadata) in [@metadata][oss]. If not set, only [@metadata][oss][key] is available. |
exclude_pattern |
string | — | A Ruby regular expression matching object keys to skip. For example, "\/logs\/debug\/" excludes all objects with /logs/debug/ in the path. |
mns_settings sub-parameters
| Sub-parameter | Required | Default | Description |
|---|---|---|---|
endpoint |
Yes | — | The SMQ endpoint. Must be an internal endpoint and must not include http. |
queue |
Yes | — | The name of the SMQ queue. |
poll_interval_seconds |
No | 10 |
The maximum wait time (in seconds) for a ReceiveMessage request when the queue is empty. See ReceiveMessage. |
wait_seconds |
No | — | The maximum polling wait time (in seconds) for a ReceiveMessage request. |
Object metadata
When include_object_properties is enabled, the plug-in exposes the following fields in [@metadata][oss]. Use these fields in your filter or output stages to enrich events or apply conditional logic.
| Field | Type | Description |
|---|---|---|
[@metadata][oss][key] |
string | The full object key (path) in OSS. Always available, regardless of include_object_properties. |
[@metadata][oss][last_modified] |
string | The timestamp when the object was last modified. Available when include_object_properties is true. |
[@metadata][oss][content_type] |
string | The MIME type of the object. Available when include_object_properties is true. |
[@metadata][oss][metadata] |
hash | Custom user-defined metadata attached to the object. Available when include_object_properties is true. |
FAQ
Why is the plug-in based on SMQ instead of polling OSS directly?
OSS object update events integrate seamlessly with SMQ, making message-based notification a natural fit. The alternative — using the ListObjects API — requires tracking which objects have and have not been processed in local storage. As the number of objects grows, ListObjects performance degrades. The broader object storage ecosystem (including the Amazon S3 open source community) has also moved away from ListObjects in favor of message notification mechanisms.
What happens if Logstash is triggered while OSS is still writing data to an object?
The plug-in records in the SMQ queue which objects have been written and transmits them through the Logstash pipeline. Any data not yet written to OSS continues to be written. The next time Logstash is triggered, the plug-in reads the remaining data from OSS.