All Products
Search
Document Center

ApsaraMQ for Kafka:Connect a ApsaraMQ for Kafka instance to Logstash as an input

Last Updated:Aug 17, 2023

A ApsaraMQ for Kafka instance can be connected to Logstash as an input. This topic describes how to use Logstash to consume messages from ApsaraMQ for Kafka over the Internet.

Prerequisites

Before you begin, make sure that the following requirements are met:

Step 1: Obtain the public endpoint

Logstash establishes a connection to ApsaraMQ for Kafka by using a ApsaraMQ for Kafka endpoint. The username and password of your Message Queue for Apache Kafka instance are required for identity authentication.

  1. Log on to the ApsaraMQ for Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. On the Instances page, click the name of the instance that you want to connect to Logstash as an input.

  4. On the Instance Details page, obtain an endpoint of the instance in the Endpoint Information section. In the Configuration Information section, obtain the values of the Username parameter and Password parameter.
    endpoint
    Note For information about the differences among endpoints, see Comparison among endpoints.

Step 2: Create a topic

Perform the following operations to create a topic for storing messages:

  1. Log on to the ApsaraMQ for Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
    Important You must create topics in the region where your application is deployed. When you create a topic, select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if your message producers and consumers run on ECS instances that are deployed in the China (Beijing) region, create topics in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, configure the parameters and click OK.
    Create a topic
    ParameterDescriptionExample
    NameThe topic name. demo
    DescriptionThe topic description. demo test
    PartitionsThe number of partitions in the topic. 12
    Storage Engine
    Note You can select the type of the storage engine only if you use a Professional Edition instance. If you use a Standard Edition instance, cloud storage is selected by default.
    The type of the storage engine that is used to store messages in the topic.

    ApsaraMQ for Kafka supports the following types of storage engines:

    • Cloud Storage: If you select this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine features low latency, high performance, durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can set this parameter only to Cloud Storage.
    • Local Storage: If you select this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.
    Cloud Storage
    Message TypeThe message type of the topic. Valid values:
    • Normal Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the order of the messages may not be preserved in the partitions. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the messages are still stored in the partitions in the order in which the messages are sent. Messages in some partitions cannot be sent until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup PolicyThe log cleanup policy that is used by the topic.

    If you set the Storage Engine parameter to Local Storage, you must configure the Log Cleanup Policy parameter. You can set the Storage Engine parameter to Local Storage only if you use an ApsaraMQ for Kafka Professional Edition instance.

    ApsaraMQ for Kafka provides the following log cleanup policies:

    • Delete: The default log cleanup policy. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes messages from the earliest stored message to ensure service availability.
    • Compact: The Apache Kafka log compaction policy is used. For more information, see Kafka 3.4 Documentation Log compaction ensures that the latest values are retained for messages that have the same key. This policy is suitable for scenarios such as restoring a failed system or reloading the cache after a system restarts. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic.
      Important You can use log-compacted topics only in specific cloud-native components such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.
    Compact
    TagThe tags that you want to attach to the topic. demo
    After the topic is created, it is displayed on the Topics page.

Step 3: Send messages

Perform the following operations to send messages to the topic that you created:

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, find the topic that you want to manage, and choose More > Send Message in the Actions column.
  6. In the Start to Send and Consume Message panel, configure the required parameters to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the message. For example, you can enter demo as the key of the message.
      2. In the Message Content field, enter the content of the message. For example, you can enter {"key": "test"} as the content of the message.
      3. Configure the Send to Specified Partition parameter to specify whether to send the message to a specified partition.
        • If you want to send the message to a specified partition, click Yes and enter the partition ID in the Partition ID field. For example, you can enter 0 as the partition ID. For information about how to query partition IDs, see View partition status.
        • If you do not want to send the message to a specified partition, click No.
      4. Use Message Queue for Apache Kafka SDKs or run the docker commands that are displayed in the Start to Send and Consume Message panel to consume the message.
    • Set the Method of Sending parameter to Docker. Run a Docker container to produce a test message, and then consume the message.
      1. Run the docker commands that are provided in the Run the Docker container to produce a sample message section to send a test message.
      2. Run the docker commands that are provided in the How do I consume a message after the message is sent? section to consume the message.
    • Set the Method of Sending parameter to SDK and click the link to the topic that describes how to obtain and use the SDK that you want to use. Then, use the SDK to send and consume a test message. Message Queue for Apache Kafka provides topics that describe how to use SDKs for different programming languages based on different connection types.

Step 4: Create a consumer group

Perform the following operations to create a consumer group for Logstash.

  1. Log on to the ApsaraMQ for Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Groups.
  5. On the Groups page, click Create Group.
  6. In the Create Group panel, enter the group name in the Group ID field and the group description in the Description field, attach tags to the group, and then click OK.
    After the group is created, you can view the group on the Groups page.

Step 5: Use Logstash to consume messages

Start Logstash on the server where Logstash is installed, and consume messages from the created topic.

  1. Run the cd command to switch to the bin directory of Logstash.

  2. Run the following command to download the kafka.client.truststore.jks certificate file:

    wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks
  3. Create a configuration file named jaas.conf.

    1. Run the vim jaas.conf command to create an empty configuration file.

    2. Press the I key to enter the insert mode.
    3. Enter the following content in the configuration file:

      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="XXX"
        password="XXX";
      };

      Parameter

      Description

      Example

      username

      The username of your Internet- and VPC-connected Message Queue for Apache Kafka instance.

      alikafka_pre-cn-v0h1***

      password

      The password of your Internet- and VPC-connected Message Queue for Apache Kafka instance.

      GQiSmqbQVe3b9hdKLDcIlkrBK6***

    4. Press the Esc key to return to the command line mode.
    5. Press the : key to go to the bottom line. Enter wq, press the Enter key to save the file, and then exit.
  4. Create a configuration file named input.conf.

    1. Run the vim input.conf command to create an empty configuration file.

    2. Press the I key to enter the insert mode.
    3. Enter the following content in the configuration file:

      input {
          kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093"
              topics => ["logstash_test"]
      
              security_protocol => "SASL_SSL"
              sasl_mechanism => "PLAIN"
      
              jaas_path => "/home/logstash-7.6.2/bin/jaas.conf"
      
              ssl_truststore_password => "KafkaOnsClient"
              ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks"
      
              ssl_endpoint_identification_algorithm => ""
      
              group_id => "logstash_group"
              consumer_threads => 3
              auto_offset_reset => "earliest"
          }
      }
      
      output {
          stdout {
              codec => rubydebug
          }
      }

      Parameter

      Description

      Example

      bootstrap_servers

      The public endpoint of your Message Queue for Apache Kafka instance. The public endpoint provided by ApsaraMQ for Kafka is the SSL endpoint.

      alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093

      topics

      The name of the topic.

      logstash_test

      security_protocol

      The security protocol. Default value: SASL_SSL. You do not need to change this value.

      SASL_SSL

      sasl_mechanism

      The security authentication mechanism. Default value: PLAIN. You do not need to change this value.

      PLAIN

      jaas_path

      The path of the jaas.conf configuration file.

      /home/logstash-7.6.2/bin/jaas.conf

      ssl_truststore_password

      The password of the kafka.client.truststore.jks certificate. Default value: KafkaOnsClient. You do not need to change this value.

      KafkaOnsClient

      ssl_truststore_location

      The path of the kafka.client.truststore.jks certificate.

      /home/logstash-7.6.2/bin/kafka.client.truststore.jks

      ssl_endpoint_identification_algorithm

      The authentication algorithm used for the SSL endpoint. This parameter is required for Logstash version 6.x and later.

      N/A

      group_id

      The name of the consumer group.

      logstash_group

      consumer_threads

      The number of consumer threads. We recommend that you set this parameter to a value that is the same as the number of partitions of the topic.

      3

      auto_offset_reset

      Specifies how the consumer offset is reset. Valid values:

      • earliest: Consumption starts from the earliest message.

      • latest: Consumption starts from the latest message.

      earliest

    4. Press the Esc key to return to the command line mode.
    5. Press the : key to go to the bottom line. Enter wq, press the Enter key to save the file, and then exit.
  5. Run the following command to consume messages:

    ./logstash -f input.conf

    The following result is returned.

    result

References

For more information about parameter settings, see Kafka input plugin.