A Message Queue for Apache Kafka instance can be connected to Logstash as an output. This topic describes how to use Logstash to send messages to Message Queue for Apache Kafka over the Internet.

Prerequisites

Before you start this tutorial, make sure that the following operations are complete:

Step 1: Obtain an endpoint

Logstash establishes a connection to Message Queue for Apache Kafka by using a Message Queue for Apache Kafka endpoint.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. On the Instances page, click the name of the instance that you want to connect to Logstash as an output.
  4. On the Instance Details page, obtain an endpoint of the instance in the Endpoint Information section. In the Configuration Information section, obtain the values of the Username parameter and Password parameter.
    endpoint
    Note For more information about the differences among endpoints, see Comparison among endpoints.

Step 2: Create a topic

Perform the following operations to create a topic for storing messages:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
    Important You must create topics in the region where your application is deployed. When you create a topic, select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if your message producers and consumers run on ECS instances that are deployed in the China (Beijing) region, create topics in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, configure the parameters and click OK.
    Creates a topic by calling an API operation.
    Parameter Description Example
    Name The name of the topic. demo
    Description The description of the topic. demo test
    Partitions The number of partitions of the topic. 12
    Storage Engine The storage engine that you want to use.

    Message Queue for Apache Kafka supports the following storage engines:

    • Cloud Storage: If you specify this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine provides the following benefits: low latency, high performance, durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can specify only Cloud Storage as the value of the Storage Engine parameter.
    • Local Storage: If you specify this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.
    Cloud Storage
    Message Type The message type of the topic.
    • Normal Message: By default, messages of the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the order of the messages may not be preserved in affected partitions. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster breaks down, the messages are stored in the same order in the partitions that are affected by the failed broker. The affected partitions cannot store new messages until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup Policy The log cleanup policy for the topic.

    If you set the Storage Engine parameter to Local Storage, configure the Log Cleanup Policy parameter.

    Message Queue for Apache Kafka provides the following log cleanup policies:

    • Delete: The default log cleanup policy is used. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes messages in the order in which the messages are stored. The earliest message that is stored is the first message that is deleted. This helps ensure that the performance of the service is not degraded.
    • Compact: The Apache Kafka log compaction policy is used. If the log compaction policy is used and the keys of different messages are the same, the system retains the most recent message that is stored. You can use this policy in scenarios where you want to restore status data after a system failure occurs and reload cached data after the system restarts. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic.
      Important You can use log-compacted topics only in specific ecosystem components such as Kafka Connect and Confluent Schema Registry. You cannot use the log compaction policy for a topic that is used to send and receive messages in other components. For more information, see Message Queue for Apache Kafka demos.
    Compact
    Tag The tags that you want to attach to the topic. demo
    After the topic is created, the topic is displayed on the Topics page.

Step 3: Use Logstash to send a message

Start Logstash on the server where Logstash is installed, and send a message to the topic that you created.

  1. Run the cd command to switch to the bin directory of Logstash.
  2. Run the following command to download the kafka.client.truststore.jks certificate file:
    wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks
  3. Create a configuration file named jaas.conf.
    1. Run the vim jaas.conf command to create an empty configuration file.
    2. Press the i key to enter the insert mode.
    3. Enter the following content:
      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="XXX"
        password="XXX";
      };
      Parameter Description Example
      username The username of your Message Queue for Apache Kafka instance of the Internet and VPC type. alikafka_pre-cn-v0h1***
      password The password of your Message Queue for Apache Kafka instance of the Internet and VPC type. GQiSmqbQVe3b9hdKLDcIlkrBK6***
    4. Press the Esc key to return to the command line mode.
    5. Press the : key to go to the bottom line. Enter wq, press the Enter key to save the file, and then exit.
  4. Create a configuration file named output.conf.
    1. Run the vim output.conf command to create an empty configuration file.
    2. Press the i key to enter the insert mode.
    3. Enter the following content:
      input {
          stdin{}
      }
      
      output {
         kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093"
              topic_id => "logstash_test"
              security_protocol => "SASL_SSL"
              sasl_mechanism => "PLAIN"
              jaas_path => "/home/logstash-7.6.2/bin/jaas.conf"
              ssl_truststore_password => "KafkaOnsClient"
              ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks"
              ssl_endpoint_identification_algorithm => ""
          }
      }
      Parameter Description Example
      bootstrap_servers The public endpoint of your Message Queue for Apache Kafka instance. The public endpoint provided by Message Queue for Apache Kafka is the Secure Sockets Layer (SSL) endpoint. alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
      topic_id The name of the topic. logstash_test
      security_protocol The security protocol. Default value: SASL_SSL. You do not need to change this value. SASL_SSL
      sasl_mechanism The security authentication mechanism. Default value: PLAIN. You do not need to change this value. PLAIN
      jaas_path The path of the jaas.conf configuration file. /home/logstash-7.6.2/bin/jaas.conf
      ssl_truststore_password The password of the kafka.client.truststore.jks certificate. Default value: KafkaOnsClient. You do not need to change this value. KafkaOnsClient
      ssl_truststore_location The path of the kafka.client.truststore.jks certificate. /home/logstash-7.6.2/bin/kafka.client.truststore.jks
      ssl_endpoint_identification_algorithm The algorithm for identifying the SSL endpoint. This parameter is required for Logstash V6.x and later. Null
    4. Press the Esc key to return to the command line mode.
    5. Press the : key to go to the bottom line. Enter wq, press the Enter key to save the file, and then exit.
  5. Send a message to the topic that you created.
    1. Run the ./logstash -f output.conf command.
    2. Enter test and press Enter.
      output_result

Step 4: View the partitions of the topic

Perform the following operations to view the message that was sent to the topic:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, find the topic whose partition status you want to view, and choose More > Partition Status in the Actions column.
    Table 1. Information about the status of a partition
    Parameter Description
    Partition ID The ID of the partition.
    Minimum Offset The earliest offset based on which messages in the partition are consumed.
    Maximum Offset The latest offset based on which messages in the partition are consumed.
    Last Updated At The most recent point in time when a message is stored in the partition.
    Information about the status of a partition

Step 5: Query the message by offset

You can query the sent message based on its partition ID and offset information.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Message Query.
  5. On the Message Query page, select Search by offset from the Search Method drop-down list.
  6. Select a topic name from the Topic drop-down list and a partition from the Partition drop-down list, enter an offset value in the Offset field, and then click Search.

    Messages whose offset values are greater than or equal to the specified offset value are displayed. For example, if you specify 5 as the value of both the Partition parameter and Offset parameter, the system queries messages whose offset values are greater than or equal to 5 from Partition 5.

    Table 2. Parameters that are included in message query results
    Parameter Description
    Partition The partition from which the message is obtained.
    Offset The offset of the message.
    Key The key of the message. The key is converted to a string.
    Value The content of the message. The message content is converted to a string.
    Created At The point in time when the message was produced. The value is the timestamp that the producer recorded when the producer sent the message or the value is the same as the value of the timestamp field that you specified for ProducerRecord.
    Note
    • If you specified a value for the timestamp field, the specified value is displayed.
    • If you did not specify a value for the timestamp field, the local system time when the message is sent is displayed.
    • A value in the 1970/x/x x:x:x format indicates that the timestamp field is set to 0 or an invalid value.
    • You cannot specify a value for the timestamp field on clients of Message Queue for Apache Kafka V0.9 and earlier.
    Actions
    • Click Download Key to download the key of the message.
    • Click Download Value to download the content of the message.
    Important
    • The Message Queue for Apache Kafka console can display up to 1 KB of content for each message. If the size of a message exceeds 1 KB, the additional content of the message is omitted. If you want to view the complete message, download the message.
    • You can download messages only if the instance is of the Professional Edition.
    • You can download up to 10 MB of message content. If the size of a message exceeds 10 MB, only the first 10 MB of message content can be downloaded.

References

For more information about parameter settings, see Kafka output plugin.