Configure Logstash to consume messages from an ApsaraMQ for Kafka instance over the internet using SASL_SSL authentication.
Before you begin
Before you begin, make sure that you have:
An ApsaraMQ for Kafka instance with internet access enabled (the examples in this topic use a non-Serverless instance). For more information, see Connect to an instance over the Internet and a VPC
Logstash installed
JDK 8 installed
A topic and a consumer group in the ApsaraMQ for Kafka console. If you have not created them yet, see Step 1: Create a topic and a consumer group
How the connection works
Logstash connects to ApsaraMQ for Kafka through an SSL endpoint (port 9093) and authenticates with SASL_SSL. The setup requires three components:
The SSL certificate (
kafka.client.truststore.jks)JAAS credentials (username and password from your Kafka instance)
A Logstash pipeline configuration that points to your Kafka endpoint
Step 1: Create a topic and a consumer group
If you already have a topic and consumer group, skip to Step 2.
Create a topic
Log in to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance resides.
ImportantCreate the topic in the same region as the Elastic Compute Service (ECS) instance that runs your producers and consumers. Topics cannot be used across regions.
On the Instances page, click the instance name.
In the left-side navigation pane, click Topics.
Click Create Topic.
In the Create Topic panel, configure the following parameters and click OK.
Parameter Description Example Name The topic name. demo Description The topic description. demo test Partitions The number of partitions. 12 Storage Engine The storage engine type. Cloud Storage uses Alibaba Cloud disks with three replicas in distributed mode, offering low latency and high reliability. Local Storage uses the in-sync replicas (ISR) algorithm of open source Apache Kafka. The storage engine is only configurable on non-serverless Professional Edition instances. Other instance types default to Cloud Storage. Standard (High Write) edition instances support only Cloud Storage. Cloud Storage Message Type Normal Message: Messages with the same key are stored in the same partition in send order. Ordering may not be preserved if a broker fails. Automatically set when you select Cloud Storage. Partitionally Ordered Message: Same ordering behavior, but ordering is preserved even during broker failures (some partitions may be temporarily unavailable). Automatically set when you select Local Storage. Normal Message Log Cleanup Policy Only available with Local Storage on Professional Edition instances. Delete (default): Retains messages based on the maximum retention period. Deletes the earliest messages when storage exceeds 85%. Compact: Retains the latest value for each message key. Required by components such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos. Compact Tag Optional tags for the topic. demo
Create a consumer group
On the instance details page, click Groups in the left-side navigation pane.
Click Create Group.
In the Create Group panel, enter a name in the Group ID field, add an optional description, and click OK.
Send test messages
To verify the connection later, send a test message to the topic:
On the instance details page, click Topics in the left-side navigation pane.
Click the topic name, then click Send Message in the upper-right corner of the Topic Details page.
In the Start to Send and Consume Message panel, select a Sending Method (Console, Docker, or SDK) and follow the on-screen instructions to send a test message.
Step 2: Get the endpoint and credentials
Log in to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance resides.
On the Instances page, click the instance name.
On the Instance Details page, find the following values:
Internet endpoint in the Endpoint Information section
Username and Password in the Configuration Information section

For information about endpoint types, see Comparison among endpoints.
Step 3: Configure Logstash
Run all commands in this section from the Logstash bin directory.
Download the SSL certificate
wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jksCreate the JAAS configuration
Create a file named jaas.conf with the following content:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<your-username>"
password="<your-password>";
};Replace the placeholders with your instance credentials:
| Placeholder | Description | Example |
|---|---|---|
<your-username> | Username from the Configuration Information section | alikafka_pre-cn-v0h1\*\*\* |
<your-password> | Password from the Configuration Information section | GQiSmqbQVe3b9hdKLDcIlkrBK6\*\*\* |
Create the Logstash pipeline configuration
Create a file named input.conf with the following content:
input {
kafka {
bootstrap_servers => "<your-internet-endpoint>"
topics => ["<your-topic>"]
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
jaas_path => "<path-to-jaas-conf>"
ssl_truststore_password => "KafkaOnsClient"
ssl_truststore_location => "<path-to-truststore-jks>"
ssl_endpoint_identification_algorithm => ""
group_id => "<your-group-id>"
consumer_threads => 3
auto_offset_reset => "earliest"
}
}
output {
stdout {
codec => rubydebug
}
}Replace the placeholders with your values:
| Placeholder | Description | Example |
|---|---|---|
<your-internet-endpoint> | Internet endpoint (SSL, port 9093) from the Endpoint Information section. Separate multiple brokers with commas. | alikafka-pre-cn-zv\*\*\*-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*-3.alikafka.aliyuncs.com:9093 |
<your-topic> | The topic name to consume from. | logstash_test |
<path-to-jaas-conf> | Absolute path to the jaas.conf file. | /home/logstash-7.6.2/bin/jaas.conf |
<path-to-truststore-jks> | Absolute path to the kafka.client.truststore.jks file. | /home/logstash-7.6.2/bin/kafka.client.truststore.jks |
<your-group-id> | The consumer group name created in Step 1. | logstash_group |
The following parameters have fixed values specific to ApsaraMQ for Kafka internet connections:
| Parameter | Fixed value | Description |
|---|---|---|
security_protocol | SASL_SSL | The protocol for internet connections to ApsaraMQ for Kafka. |
sasl_mechanism | PLAIN | The authentication mechanism. |
ssl_truststore_password | KafkaOnsClient | The password for the SSL certificate. |
ssl_endpoint_identification_algorithm | "" (empty string) | Required for Logstash 6.x and later. |
Optional parameters:
| Parameter | Description |
|---|---|
consumer_threads | Number of consumer threads. We recommend that you set this to the number of partitions in the topic for optimal throughput. |
auto_offset_reset | Where to start consuming when no committed offset exists. earliest reads from the beginning. latest reads only new messages. |
Step 4: Start consuming messages
Run the following command from the Logstash bin directory:
./logstash -f input.confIf the configuration is correct, consumed messages appear in the terminal:

What's next
To send Logstash output to Elasticsearch or other destinations, replace the
outputblock ininput.confwith the appropriate Logstash output plugin.For the full list of Kafka input plugin parameters, see the Elastic Kafka input plugin documentation.