All Products
Search
Document Center

ApsaraMQ for Kafka:Consume messages from ApsaraMQ for Kafka with Logstash over the internet

Last Updated:Mar 11, 2026

Configure Logstash to consume messages from an ApsaraMQ for Kafka instance over the internet using SASL_SSL authentication.

Before you begin

Before you begin, make sure that you have:

  • An ApsaraMQ for Kafka instance with internet access enabled (the examples in this topic use a non-Serverless instance). For more information, see Connect to an instance over the Internet and a VPC

  • Logstash installed

  • JDK 8 installed

  • A topic and a consumer group in the ApsaraMQ for Kafka console. If you have not created them yet, see Step 1: Create a topic and a consumer group

How the connection works

Logstash connects to ApsaraMQ for Kafka through an SSL endpoint (port 9093) and authenticates with SASL_SSL. The setup requires three components:

  1. The SSL certificate (kafka.client.truststore.jks)

  2. JAAS credentials (username and password from your Kafka instance)

  3. A Logstash pipeline configuration that points to your Kafka endpoint

Step 1: Create a topic and a consumer group

If you already have a topic and consumer group, skip to Step 2.

Create a topic

  1. Log in to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.

    Important

    Create the topic in the same region as the Elastic Compute Service (ECS) instance that runs your producers and consumers. Topics cannot be used across regions.

  3. On the Instances page, click the instance name.

  4. In the left-side navigation pane, click Topics.

  5. Click Create Topic.

  6. In the Create Topic panel, configure the following parameters and click OK.

    ParameterDescriptionExample
    NameThe topic name.demo
    DescriptionThe topic description.demo test
    PartitionsThe number of partitions.12
    Storage EngineThe storage engine type. Cloud Storage uses Alibaba Cloud disks with three replicas in distributed mode, offering low latency and high reliability. Local Storage uses the in-sync replicas (ISR) algorithm of open source Apache Kafka. The storage engine is only configurable on non-serverless Professional Edition instances. Other instance types default to Cloud Storage. Standard (High Write) edition instances support only Cloud Storage.Cloud Storage
    Message TypeNormal Message: Messages with the same key are stored in the same partition in send order. Ordering may not be preserved if a broker fails. Automatically set when you select Cloud Storage. Partitionally Ordered Message: Same ordering behavior, but ordering is preserved even during broker failures (some partitions may be temporarily unavailable). Automatically set when you select Local Storage.Normal Message
    Log Cleanup PolicyOnly available with Local Storage on Professional Edition instances. Delete (default): Retains messages based on the maximum retention period. Deletes the earliest messages when storage exceeds 85%. Compact: Retains the latest value for each message key. Required by components such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.Compact
    TagOptional tags for the topic.demo

Create a consumer group

  1. On the instance details page, click Groups in the left-side navigation pane.

  2. Click Create Group.

  3. In the Create Group panel, enter a name in the Group ID field, add an optional description, and click OK.

Send test messages

To verify the connection later, send a test message to the topic:

  1. On the instance details page, click Topics in the left-side navigation pane.

  2. Click the topic name, then click Send Message in the upper-right corner of the Topic Details page.

  3. In the Start to Send and Consume Message panel, select a Sending Method (Console, Docker, or SDK) and follow the on-screen instructions to send a test message.

Step 2: Get the endpoint and credentials

  1. Log in to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.

  3. On the Instances page, click the instance name.

  4. On the Instance Details page, find the following values:

    • Internet endpoint in the Endpoint Information section

    • Username and Password in the Configuration Information section

Endpoint and credential information on the Instance Details page
Note

For information about endpoint types, see Comparison among endpoints.

Step 3: Configure Logstash

Run all commands in this section from the Logstash bin directory.

Download the SSL certificate

wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks

Create the JAAS configuration

Create a file named jaas.conf with the following content:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<your-username>"
  password="<your-password>";
};

Replace the placeholders with your instance credentials:

PlaceholderDescriptionExample
<your-username>Username from the Configuration Information sectionalikafka_pre-cn-v0h1\*\*\*
<your-password>Password from the Configuration Information sectionGQiSmqbQVe3b9hdKLDcIlkrBK6\*\*\*

Create the Logstash pipeline configuration

Create a file named input.conf with the following content:

input {
    kafka {
        bootstrap_servers => "<your-internet-endpoint>"
        topics => ["<your-topic>"]

        security_protocol => "SASL_SSL"
        sasl_mechanism => "PLAIN"

        jaas_path => "<path-to-jaas-conf>"

        ssl_truststore_password => "KafkaOnsClient"
        ssl_truststore_location => "<path-to-truststore-jks>"

        ssl_endpoint_identification_algorithm => ""

        group_id => "<your-group-id>"
        consumer_threads => 3
        auto_offset_reset => "earliest"
    }
}

output {
    stdout {
        codec => rubydebug
    }
}

Replace the placeholders with your values:

PlaceholderDescriptionExample
<your-internet-endpoint>Internet endpoint (SSL, port 9093) from the Endpoint Information section. Separate multiple brokers with commas.alikafka-pre-cn-zv\*\*\*-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*-3.alikafka.aliyuncs.com:9093
<your-topic>The topic name to consume from.logstash_test
<path-to-jaas-conf>Absolute path to the jaas.conf file./home/logstash-7.6.2/bin/jaas.conf
<path-to-truststore-jks>Absolute path to the kafka.client.truststore.jks file./home/logstash-7.6.2/bin/kafka.client.truststore.jks
<your-group-id>The consumer group name created in Step 1.logstash_group

The following parameters have fixed values specific to ApsaraMQ for Kafka internet connections:

ParameterFixed valueDescription
security_protocolSASL_SSLThe protocol for internet connections to ApsaraMQ for Kafka.
sasl_mechanismPLAINThe authentication mechanism.
ssl_truststore_passwordKafkaOnsClientThe password for the SSL certificate.
ssl_endpoint_identification_algorithm"" (empty string)Required for Logstash 6.x and later.

Optional parameters:

ParameterDescription
consumer_threadsNumber of consumer threads. We recommend that you set this to the number of partitions in the topic for optimal throughput.
auto_offset_resetWhere to start consuming when no committed offset exists. earliest reads from the beginning. latest reads only new messages.

Step 4: Start consuming messages

Run the following command from the Logstash bin directory:

./logstash -f input.conf

If the configuration is correct, consumed messages appear in the terminal:

Logstash consuming messages from ApsaraMQ for Kafka

What's next