All Products
Search
Document Center

ApsaraMQ for Kafka:Send messages from Logstash to ApsaraMQ for Kafka over the Internet

Last Updated:Mar 11, 2026

Logstash can forward log and event data to an ApsaraMQ for Kafka instance over the Internet using SASL_SSL authentication. This guide walks you through endpoint retrieval, topic creation, Logstash configuration, and message verification.

Prerequisites

Before you begin, make sure you have:

Step 1: Get the endpoint and credentials

Logstash connects to ApsaraMQ for Kafka through an SSL endpoint. Retrieve the endpoint and SASL credentials from the console.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.

  3. On the Instances page, click the name of the target instance.

  4. On the Instance Details page, collect the following: endpoint

    • Endpoint Information section: Copy the SSL endpoint (port 9093).

    • Configuration Information section: Note the Username and Password.

Note

For details on endpoint types, see Comparison among endpoints.

Step 2: Create a topic

Create a topic to receive the messages that Logstash sends.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.

    Important

    Create topics in the same region as your Elastic Compute Service (ECS) instance. Topics cannot be used across regions. For example, if your producers and consumers run on an ECS instance in the China (Beijing) region, create the topic in China (Beijing) as well.

  3. On the Instances page, click the name of the target instance.

  4. In the left-side navigation pane, click Topics.

  5. On the Topics page, click Create Topic.

  6. In the Create Topic panel, configure the topic properties and click OK. After the topic is created, it appears on the Topics page.

    ParameterDescriptionExample
    NameThe topic name.demo
    DescriptionA brief description of the topic.demo test
    PartitionsThe number of partitions.12
    Storage EngineThe storage engine type. Only configurable on Professional Edition instances. Standard Edition defaults to Cloud Storage.
    - Cloud Storage: Uses Alibaba Cloud disks with 3 distributed replicas. Low latency, high performance, long durability, and high reliability. Required for Standard (High Write) instances.
    - Local Storage: Uses the in-sync replicas (ISR) algorithm from open source Apache Kafka with 3 distributed replicas.

    Cloud Storage
    Message TypeThe message ordering behavior.
    - Normal Message: Messages with the same key go to the same partition in send order. Order may not be preserved during broker failures. Auto-selected when Storage Engine is Cloud Storage.
    - Partitionally Ordered Message: Messages with the same key go to the same partition in send order. Order is preserved during broker failures, but affected partitions are unavailable until restored. Auto-selected when Storage Engine is Local Storage.

    Normal Message
    Log Cleanup PolicyOnly configurable when Storage Engine is Local Storage (Professional Edition).
    - Delete: Default policy. Retains messages up to the maximum retention period. Deletes the oldest messages when storage exceeds 85%.
    - Compact: Retains only the latest value for each key. You can use log-compacted topics only in specific cloud-native components, such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.

    Compact
    TagOptional tags to attach to the topic.demo

Step 3: Configure and run Logstash

Set up the SSL certificate, SASL credentials, and Logstash output configuration on your server, then send a test message.

Download the SSL certificate

Switch to the Logstash bin directory and download the truststore certificate:

cd <logstash-install-dir>/bin
wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/raw/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks

Create the JAAS configuration file

Create a file named jaas.conf in the Logstash bin directory with the following content:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<your-username>"
  password="<your-password>";
};

Replace the placeholders with your actual values:

PlaceholderDescriptionExample
<your-username>The Username from the Configuration Information section.alikafka_pre-cn-v0h1\*\*\*
<your-password>The Password from the Configuration Information section.GQiSmqbQVe3b9hdKLDcIlkrBK6\*\*\*
Note

The jaas_path setting is JVM-wide. If you run multiple Kafka outputs in a single Logstash instance and need different credentials for each, use the inline sasl_jaas_config parameter instead. See the Kafka output plugin reference for details.

Create the Logstash output configuration file

Create a file named output.conf in the Logstash bin directory with the following content:

input {
    stdin {}
}

output {
    stdout { codec => json }
    kafka {
        bootstrap_servers => "<your-endpoint>"
        topic_id => "<your-topic>"
        security_protocol => "SASL_SSL"
        sasl_mechanism => "PLAIN"
        jaas_path => "<logstash-install-dir>/bin/jaas.conf"
        ssl_truststore_password => "KafkaOnsClient"
        ssl_truststore_location => "<logstash-install-dir>/bin/kafka.client.truststore.jks"
        ssl_endpoint_identification_algorithm => ""
    }
}

Replace these placeholders with your actual values:

PlaceholderDescriptionExample
<your-endpoint>The SSL endpoint (port 9093) from the Endpoint Information section.alikafka-pre-cn-zv\*\*\*\*\*-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-3.alikafka.aliyuncs.com:9093
<your-topic>The name of the topic created in Step 2.logstash_test
<logstash-install-dir>The absolute path to your Logstash installation directory./home/logstash-7.6.2

The remaining parameters use fixed values. Do not change them:

ParameterFixed valueDescription
security_protocolSASL_SSLThe security protocol for Internet connections.
sasl_mechanismPLAINThe SASL authentication mechanism.
ssl_truststore_passwordKafkaOnsClientThe password for the truststore certificate.
ssl_endpoint_identification_algorithm"" (empty string)Required for Logstash 6.x and later. Disables hostname verification.

Send a test message

  1. Start Logstash with the output configuration:

       ./logstash -f output.conf
  2. After Logstash starts, type test and press Enter. The stdout output displays the message locally as JSON, while the kafka output sends it to your ApsaraMQ for Kafka instance.

    output_result

Verify the result

Confirm that messages reached your ApsaraMQ for Kafka instance by checking partition status and querying messages in the console.

Check partition status

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.

  3. On the Instances page, click the name of the target instance.

  4. In the left-side navigation pane, click Topics.

  5. Click the topic name, then click the Partition Status tab on the Topic Details page.

    ParameterDescription
    Partition IDThe partition ID.
    Minimum OffsetThe earliest offset in the partition.
    Maximum OffsetThe latest offset in the partition.
    MessagesThe total number of messages in the partition.
    Last Updated AtThe time when the most recent message was stored.

    partition status

Query a message by offset

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.

  3. On the Instances page, click the name of the target instance.

  4. In the left-side navigation pane, click Message Query.

  5. From the Search Method drop-down list, select Search by offset.

  6. Select a topic from the Topic drop-down list, select a partition from the Partition drop-down list, enter an offset value in the Offset field, and click Search. The console returns all messages with offsets greater than or equal to the specified value. For example, setting Partition to 5 and Offset to 5 returns all messages from partition 5 with offsets 5 and above.

    ParameterDescription
    PartitionThe partition where the message is stored.
    OffsetThe message offset within the partition.
    KeyThe message key, displayed as a string.
    ValueThe message body, displayed as a string.
    Created AtThe timestamp when the message was sent. If you specified a value for the ProducerRecord timestamp field, the specified value is displayed. If you did not specify a value, the local system time when the message was sent is displayed. If the timestamp field is set to 0 or an invalid value, the time is displayed in 1970/x/x x:x:x format. Clients on ApsaraMQ for Kafka version 0.9 or earlier cannot set this field.
    ActionsDownload Key: Download the message key. Download Value: Download the message body. The console displays up to 1 KB per message. Download the message to view content beyond 1 KB. A maximum of 10 MB of messages can be downloaded at a time.

References