When Logstash runs inside a virtual private cloud (VPC), it can produce messages directly to an ApsaraMQ for Kafka instance in the same VPC. This guide walks you through configuring a Logstash output pipeline -- from obtaining the broker endpoint to verifying that messages arrive in a Kafka topic.
Prerequisites
Before you begin, make sure that you have:
An ApsaraMQ for Kafka instance deployed in a VPC. See Purchase and deploy a VPC-connected instance
Logstash installed on a server in the same VPC. See Download Logstash
Java Development Kit (JDK) 8 installed. See Java 8
Step 1: Get the VPC endpoint
Logstash connects to ApsaraMQ for Kafka through a VPC endpoint. ApsaraMQ for Kafka provides two types of VPC endpoints:
| Endpoint type | Port | Description |
|---|---|---|
| Default | 9092 | No authentication required |
| SASL | 9094 | Requires authentication. You must enable the access control list (ACL) feature first. See Enable the ACL feature |
For a full comparison, see Comparison among endpoints.
To get the endpoint:
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance resides.
On the Instances page, click the instance name.
On the Instance Details page, copy the endpoint from the Endpoint Information section. If you plan to use SASL authentication, also note the Username and Password from the Configuration Information section.

Step 2: Create a topic
Create a topic in the ApsaraMQ for Kafka console to store the messages that Logstash sends.
You must create topics in the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if the producers and consumers of messages run on an ECS instance that is deployed in the China (Beijing) region, the topic must also be created in the China (Beijing) region.
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance resides.
On the Instances page, click the instance name.
In the left-side navigation pane, click Topics.
On the Topics page, click Create Topic.
In the Create Topic panel, specify the properties of the topic and click OK.
| Parameter | Description | Example |
|---|---|---|
| Name | The topic name. | demo |
| Description | The topic description. | demo test |
| Partitions | The number of partitions in the topic. | 12 |
| Storage Engine | The type of the storage engine that is used to store messages in the topic. Valid values: Cloud Storage (uses Alibaba Cloud disks, stores data in three replicas in distributed mode; features low latency, high performance, long durability, and high reliability) and Local Storage (uses the in-sync replicas (ISR) algorithm of open source Apache Kafka, stores data in three replicas in distributed mode). You can specify the storage engine type only if you use a non-serverless Professional Edition instance. For other types of instances, Cloud Storage is selected by default. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can set this parameter only to Cloud Storage. | Cloud Storage |
| Message Type | The message type of the topic. Valid values: Normal Message (messages with the same key are stored in the same partition in send order; broker failure may disrupt order; auto-set when Storage Engine = Cloud Storage) and Partitionally Ordered Message (messages with the same key are stored in the same partition in send order; broker failure does not disrupt order, but some partitions become unavailable until restored; auto-set when Storage Engine = Local Storage). | Normal Message |
| Log Cleanup Policy | The log cleanup policy for the topic. Required only when Storage Engine is set to Local Storage. Valid values: Delete (default; messages are retained based on the maximum retention period; the system deletes the earliest stored messages when storage usage exceeds 85%) and Compact (the log compaction policy from Apache Kafka; retains latest values for messages with the same key; only usable in specific cloud-native components such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos). | Compact |
| Tag | The tags that you want to attach to the topic. | demo |
After a topic is created, you can view the topic on the Topics page.
Step 3: Create the Logstash configuration
Navigate to the Logstash bin directory and create a configuration file named output.conf.
input {
stdin {}
}
output {
kafka {
bootstrap_servers => "<your-kafka-endpoint-1>:9092,<your-kafka-endpoint-2>:9092,<your-kafka-endpoint-3>:9092"
topic_id => "<your-topic-name>"
}
}Placeholder reference
Replace the placeholders in the configuration with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-kafka-endpoint-*> | Broker addresses from the Endpoint Information section | alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com |
<your-topic-name> | Name of the target Kafka topic | logstash_test |
Commonly used parameters
Beyond bootstrap_servers and topic_id, the following parameters are commonly configured:
| Parameter | Description | Default |
|---|---|---|
codec | Encoding format for events. Set to json to send the full event as JSON. The default plain codec sends only the message field along with a timestamp and hostname. | plain |
acks | Number of acknowledgments the producer requires. Valid values: 0, 1, all. | 1 |
retries | Number of times to retry a failed send. | 0 |
compression_type | Compression algorithm. Valid values: none, gzip, snappy, lz4. | none |
client_id | Identifier passed to the broker for logging and debugging. | -- |
For the full parameter reference, see Kafka output plugin.
Step 4: Send a test message
From the Logstash
bindirectory, start Logstash with the configuration file:./logstash -f output.confAfter Logstash finishes starting, type
testand press Enter. Logstash sends the message to the specified Kafka topic. The terminal output looks similar to the following:
Step 5: Verify the message
Confirm that the test message reached the topic through one of the following methods.
Check partition status
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance resides.
On the Instances page, click the instance name.
In the left-side navigation pane, click Topics.
On the Topics page, click the target topic name, then click the Partition Status tab.
Verify that the Maximum Offset has increased and that Messages shows a non-zero count.

| Column | Description |
|---|---|
| Partition ID | ID of the partition |
| Minimum Offset | Earliest offset in the partition |
| Maximum Offset | Latest offset in the partition |
| Messages | Total number of messages in the partition |
| Last Updated At | Timestamp of the most recent message |
Query by message offset
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance resides.
On the Instances page, click the instance name.
In the left-side navigation pane, click Message Query.
On the Message Query page, select Search by offset from the Search Method drop-down list.
Select your topic from the Topic drop-down list and a partition from the Partition drop-down list, enter an offset value in the Offset field, and then click Search.
Messages with an offset greater than or equal to the specified value are returned. For example, if you specify 5 as the values of the Partition parameter and the Offset parameter, the system queries messages whose offsets are equal to or greater than 5 from Partition 5.
| Column | Description |
|---|---|
| Partition | Partition from which the message is retrieved |
| Offset | Position of the message within the partition |
| Key | Message key, displayed as a string |
| Value | Message content, displayed as a string |
| Created At | Timestamp when the message was produced. This is either the client-specified timestamp or the broker's system time. If the timestamp field is set to 0 or an invalid value, the time is displayed in the 1970/x/x x:x:x format. You cannot specify the timestamp field on clients of ApsaraMQ for Kafka version 0.9 or earlier. |
| Actions | Click Download Key to download the message key, or click Download Value to download the message content. |
Production recommendations
Deploy Logstash in the same VPC and region as your ApsaraMQ for Kafka instance. Cross-region connections are not supported.
For production workloads, set
ackstoalland increaseretriesto improve delivery reliability.