A Message Queue for Apache Kafka instance can be connected to Logstash as an output. This topic describes how to use Logstash to send messages to Message Queue for Apache Kafka over the Internet.

Prerequisites

Before you start this tutorial, make sure that the following operations are complete:

Step 1: Obtain an endpoint

Logstash establishes a connection to Message Queue for Apache Kafka by using a Message Queue for Apache Kafka endpoint.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to connect to Logstash as an output.
  4. On the Instance Details page, obtain an endpoint of the instance in the Endpoint Information section. In the Configuration Information section, obtain the values of the Username and Password parameters.
    endpoint
    Note For more information about the differences among endpoints, see Comparison among endpoints.

Step 2: Create a topic

Perform the following operations to create a topic for storing messages:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
    Notice You must create a topic in the region where your application resides. This means that you must select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if a topic is created in the China (Beijing) region, the message producer and consumer must run on ECS instances in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, set the properties of the topic and click OK.
    Create a topic
    Parameter Description Example
    Name The name of the topic. demo
    Description The description of the topic. demo test
    Partitions The number of partitions in the topic. 12
    Storage Engine The storage engine of the topic.

    Message Queue for Apache Kafka supports the following storage engines:

    • Cloud Storage: If this option is selected, disks provided by Alibaba Cloud are used and three replicas are stored in distributed mode. This type of storage engine features low latency, high performance, persistence, and high durability. If the disk type of your instance is Standard (High Write), you can select only Cloud Storage.
    • Local Storage: If this option is selected, the in-sync replicas (ISR) algorithm of open source Apache Kafka is used and three replicas are stored in distributed mode.
    Cloud Storage
    Message Type The message type of the topic.
    • Normal Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the messages may be out of order. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the messages are stilled stored in the same partition in the order they are sent. However, specific messages in the partition cannot be sent until the partition is restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup Policy The log cleanup policy for the topic.

    If you set the Storage Engine parameter to Local Storage, you must set the Log Cleanup Policy parameter.

    Message Queue for Apache Kafka supports the following log cleanup policies:

    • Delete: The default log cleanup policy is used. If the remaining disk space is sufficient, messages are retained for the maximum retention period. If disk usage exceeds 85%, the disk space is insufficient, and earlier messages are deleted to ensure service availability.
    • Compact: The Apache Kafka log compaction policy is used. If the keys of different messages are the same, messages that have the latest key values are retained. This policy applies to scenarios in which the system is recovered from a system failure, or the cache is reloaded after a system restart. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the system status information or configuration information in a log-compacted topic.
      Notice Log-compacted topics are generally used only in specific ecosystem components, such as Kafka Connect or Confluent Schema Registry. Do not use this log cleanup policy for a topic that is used to send and subscribe to messages in other components. For more information, see Message Queue for Apache Kafka demos.
    Compact
    Tag The tags to be attached to the topic. demo
    After the topic is created, it is displayed on the Topics page.

Step 3: Use Logstash to send a message

Start Logstash on the server where Logstash is installed, and send a message to the topic that you created.

  1. Run the cd command to switch to the bin directory of Logstash.
  2. Run the following command to download the kafka.client.truststore.jks certificate file:
    wget https://code.aliyun.com/alikafka/aliware-kafka-demos/raw/master/kafka-log-stash-demo/vpc-ssl/kafka.client.truststore.jks
  3. Create a configuration file named jaas.conf.
    1. Run the vim jaas.conf command to create an empty configuration file.
    2. Press the i key to go to the insert mode.
    3. Enter the following content:
      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="XXX"
        password="XXX";
      };
      Parameter Description Example
      username The username of your Message Queue for Apache Kafka instance of the Internet and VPC type. alikafka_pre-cn-v0h1***
      password The password of your Message Queue for Apache Kafka instance of the Internet and VPC type. GQiSmqbQVe3b9hdKLDcIlkrBK6***
    4. Press the Esc key to return to the command line mode.
    5. Press the : key to enter the bottom line mode. Type wq and press the Enter key to save the file and exit.
  4. Create a configuration file named output.conf.
    1. Run the vim output.conf command to create an empty configuration file.
    2. Press the i key to go to the insert mode.
    3. Enter the following content:
      input {
          stdin{}
      }
      
      output {
         kafka {
              bootstrap_servers => "121.40.XXX.XXX:9093,120.26.XXX.XXX:9093,120.26.XXX.XXX:9093"
              topic_id => "logstash_test"
              security_protocol => "SASL_SSL"
              sasl_mechanism => "PLAIN"
              jaas_path => "/home/logstash-7.6.2/bin/jaas.conf"
              ssl_truststore_password => "KafkaOnsClient"
              ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks"
              ssl_endpoint_identification_algorithm => ""
          }
      }
      Parameter Description Example
      bootstrap_servers The public endpoint of your Message Queue for Apache Kafka instance. The public endpoint provided by Message Queue for Apache Kafka is the Secure Sockets Layer (SSL) endpoint. 121.XX.XX.XX:9093,120.XX.XX.XX:9093,120.XX.XX.XX:9093
      topic_id The name of the topic. logstash_test
      security_protocol The security protocol. Default value: SASL_SSL. You do not need to change this value. SASL_SSL
      sasl_mechanism The security authentication mechanism. Default value: PLAIN. You do not need to change this value. PLAIN
      jaas_path The path of the jaas.conf configuration file. /home/logstash-7.6.2/bin/jaas.conf
      ssl_truststore_password The password of the kafka.client.truststore.jks certificate. Default value: KafkaOnsClient. You do not need to change this value. KafkaOnsClient
      ssl_truststore_location The path of the kafka.client.truststore.jks certificate. /home/logstash-7.6.2/bin/kafka.client.truststore.jks
      ssl_endpoint_identification_algorithm The algorithm for identifying the SSL endpoint. This parameter is required for Logstash V6.x and later. Null
    4. Press the Esc key to return to the command line mode.
    5. Press the : key to enter the bottom line mode. Type wq and press the Enter key to save the file and exit.
  5. Send a message to the topic that you created.
    1. Run the ./logstash -f output.conf command.
    2. Enter test and press Enter.
      output_result

Step 4: View the partitions of the topic

Perform the following operations to view the message that was sent to the topic:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, find the topic whose partition status you want to view, and choose More > Partition Status in the Actions column.
    Table 1. Partition status information
    Parameter Description
    Partition ID The ID of the partition in the topic.
    Minimum Offset The minimum offset of messages that are consumed in the partition.
    Maximum Offset The maximum offset of messages that are consumed in the partition.
    Last Updated At The time when the last message in the partition is stored.
    Partition Status tab

Step 5: Query the message by offset

You can query the sent message based on its partition ID and offset information.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Message Query.
  5. On the Message Query page, select Search by offset from the Search Method drop-down list.
  6. Select a topic name from the Topic drop-down list and a partition from the Partition drop-down list, enter an offset in the Offset field, and then click Search.

    Messages that start from the specified offset are displayed. For example, if the specified partition and offset are both 5, the query results are messages that start from Offset 5 in Partition 5.

    Table 2. Parameters in the query results and supported operations
    GUI element Description
    Partition The ID of the partition in the topic to which the message is sent.
    Offset The offset of the message.
    Key The key of the message. The key is converted to a string.
    Value The value of the message. The value is converted to a string, which indicates the message content.
    Created At The timestamp recorded by the producer when the message is sent or the value of the timestamp field that you specify for the ProducerRecord object.
    Note
    • If a value is specified for the timestamp field, the value is displayed.
    • If no value is specified for the timestamp field, the system time when the message is sent is displayed.
    • A value in the format of 1970/x/x x:x:x indicates that the timestamp field is set to 0 or an invalid value.
    • You cannot specify a value for the timestamp field on clients of Message Queue for Apache Kafka version 0.9 and earlier.
    Actions
    • Click Download Key: Download the key of the message.
    • Click Download Value: Download the content of the message.
    Notice
    • The Message Queue for Apache Kafka console displays a maximum of 1 KB of content for each queried message. The remaining content of the message is omitted. If you need to view the complete message, download the message.
    • This operation is supported only by instances of Professional Edition.
    • You can download up to 10 MB of messages at a time. If the total size of the queried messages exceeds 10 MB, only the first 10 MB of message content can be downloaded.

References

For more information about parameter settings, see Kafka output plugin.