A Message Queue for Apache Kafka instance can be connected to Filebeat as an input. This topic describes how to use Filebeat to consume messages from Message Queue for Apache Kafka over the Internet.

Background information

Before you start this tutorial, make sure that the following operations are complete:

Step 1: Obtain an endpoint

Filebeat establishes a connection to Message Queue for Apache Kafka by using a Message Queue for Apache Kafka endpoint.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to connect to Filebeat as an input.
  4. On the Instance Details page, obtain an endpoint of the instance in the Endpoint Information section. In the Configuration Information section, obtain the values of the Username and Password parameters.
    endpoint
    Note For more information about the differences among endpoints, see Comparison among endpoints.

Step 2: Create a topic

Perform the following operations to create a topic for storing messages:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
    Notice You must create a topic in the region where your application resides. This means that you must select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if a topic is created in the China (Beijing) region, the message producer and consumer must run on ECS instances in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, set the properties of the topic and click OK.
    Create a topic
    Parameter Description Example
    Name The name of the topic. demo
    Description The description of the topic. demo test
    Partitions The number of partitions in the topic. 12
    Storage Engine The storage engine of the topic.

    Message Queue for Apache Kafka supports the following storage engines:

    • Cloud Storage: If this option is selected, disks provided by Alibaba Cloud are used and three replicas are stored in distributed mode. This type of storage engine features low latency, high performance, persistence, and high durability. If the disk type of your instance is Standard (High Write), you can select only Cloud Storage.
    • Local Storage: If this option is selected, the in-sync replicas (ISR) algorithm of open source Apache Kafka is used and three replicas are stored in distributed mode.
    Cloud Storage
    Message Type The message type of the topic.
    • Normal Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the messages may be out of order. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the messages are stilled stored in the same partition in the order they are sent. However, specific messages in the partition cannot be sent until the partition is restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup Policy The log cleanup policy for the topic.

    If you set the Storage Engine parameter to Local Storage, you must set the Log Cleanup Policy parameter.

    Message Queue for Apache Kafka supports the following log cleanup policies:

    • Delete: The default log cleanup policy is used. If the remaining disk space is sufficient, messages are retained for the maximum retention period. If disk usage exceeds 85%, the disk space is insufficient, and earlier messages are deleted to ensure service availability.
    • Compact: The Apache Kafka log compaction policy is used. If the keys of different messages are the same, messages that have the latest key values are retained. This policy applies to scenarios in which the system is recovered from a system failure, or the cache is reloaded after a system restart. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the system status information or configuration information in a log-compacted topic.
      Notice Log-compacted topics are generally used only in specific ecosystem components, such as Kafka Connect or Confluent Schema Registry. Do not use this log cleanup policy for a topic that is used to send and subscribe to messages in other components. For more information, see Message Queue for Apache Kafka demos.
    Compact
    Tag The tags to be attached to the topic. demo
    After the topic is created, it is displayed on the Topics page.

Step 3: Send messages

Perform the following operations to send messages to the topic that you created:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, find the topic based on which you want to send and consume messages, and choose More > Send Message in the Actions column.
  6. In the Start to Send and Consume Message panel, set the parameters or use the method as prompted to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the test message, such as demo.
      2. In the Message Content field, enter the content of the test message, such as {"key": "test"}.
      3. Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
        • If you want to send the test message to a specific partition, click Yes and enter the partition ID, such as 0, in the Partition ID field. For more information about how to query partition IDs, see View partition status.
        • If you do not want to send the test message to a specific partition, click No.
      4. Use Message Queue for Apache Kafka SDKs or run docker commands to consume the test message as prompted.
    • Set the Method of Sending parameter to Docker and run a Docker container.
      1. Run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
      2. Run the docker commands provided in the How do I consume a message after the message is sent? section to consume the test message.
    • Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send and consume messages.

Step 4: Create a group

Perform the following operations to create a group for Filebeat:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Groups.
  5. On the Groups page, click Create Group.
  6. In the Create Group panel, enter the group name in the Group ID field and the group description in the Description field, attach tags to the group, and then click OK.
    After the group is created, it is displayed on the Groups page.

Step 5: Use Filebeat to consume messages

Start Filebeat on the server where Filebeat is installed to consume messages from the created topic.

  1. Run the cd command to switch to the installation directory of Filebeat:
  2. Run the following command to download the certificate authority (CA) certificate file:
    wget https://code.aliyun.com/alikafka/aliware-kafka-demos/raw/master/kafka-filebeat-demo/vpc-ssl/ca-cert
  3. Create a configuration file named input.yml.
    1. Run the vim input.yml command to create an empty configuration file.
    2. Press the i key to go to the insert mode.
    3. Enter the following content:
      filebeat.inputs:
      - type: kafka
        hosts:
          - 121.XX.XX.XX:9093
          - 120.XX.XX.XX:9093
          - 120.XX.XX.XX:9093
        username: "alikafka_pre-cn-v641e1dt***"
        password: "aeN3WLRoMPRXmAP2jvJuGk84Kuuo***"
        topics: ["filebeat_test"]
        group_id: "filebeat_group"
        ssl.certificate_authorities: ["/root/filebeat/filebeat-7.7.0-linux-x86_64/ca-cert"]
        ssl.verification_mode: none
      
      output.console:
        pretty: true
      Parameter Description Example
      hosts The public endpoint of your Message Queue for Apache Kafka instance. The public endpoint provided by Message Queue for Apache Kafka is the Secure Sockets Layer (SSL) endpoint.
      - 121.XX.XX.XX:9093
      - 120.XX.XX.XX:9093
      - 120.XX.XX.XX:9093
      username The username of your Message Queue for Apache Kafka instance of the Internet and VPC type. alikafka_pre-cn-v641e1d***
      password The password of your Message Queue for Apache Kafka instance of the Internet and VPC type. aeN3WLRoMPRXmAP2jvJuGk84Kuuo***
      topics The name of the topic. filebeat_test
      group_id The name of the group. filebeat_group
      ssl.certificate_authorities The path of the CA certificate file. /root/filebeat/filebeat-7.7.0-linux-x86_64/ca-cert
      ssl.verification_mode The verification mode. none

      For more information about parameter settings, see Kafka input plugin.

    4. Press the Esc key to return to the command line mode.
    5. Press the : key to enter the bottom line mode. Type wq and press the Enter key to save the file and exit.
  4. Run the following command to consume messages:
    ./filebeat -c ./input.yml
    result