All Products
Search
Document Center

ApsaraMQ for Kafka:Connect a ApsaraMQ for Kafka instance to Filebeat as an input

Last Updated:Oct 13, 2023

An ApsaraMQ for Kafka instance can be connected to Filebeat as an input. This topic describes how to use Filebeat to consume messages from ApsaraMQ for Kafka over the Internet.

Background information

Before you start this tutorial, make sure that the following operations are complete:

Step 1: Obtain an endpoint

Filebeat establishes a connection to ApsaraMQ for Kafka by using an ApsaraMQ for Kafka endpoint.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to connect to Filebeat as an input.

  4. In the Endpoint Information section of the Instance Details page, view the endpoints of the instance. In the Configuration Information section, obtain the values of the Username and Password parameters.

    endpoint
    Note

    For information about the differences between different types of endpoints, see Comparison among endpoints.

Step 2: Create a topic

Perform the following operations to create a topic for storing messages:

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

    Important

    You must create topics in the region where your application is deployed. To do this, select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if the producers and consumers of messages run on an ECS instance that is deployed in the China (Beijing) region, the topic must also be created in the China (Beijing) region.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Topics.

  5. On the Topics page, click Create Topic.

  6. In the Create Topic panel, specify the properties of the topic and click OK.

    创建Topic

    Parameter

    Description

    Example

    Name

    The topic name.

    demo

    Description

    The topic description.

    demo test

    Partitions

    The number of partitions in the topic.

    12

    Storage Engine

    Note

    You can specify the storage engine type only if you use a Professional Edition instance. If you use a Standard Edition instance, cloud storage is selected by default.

    The type of the storage engine that is used to store messages in the topic.

    ApsaraMQ for Kafka supports the following types of storage engines:

    • Cloud Storage: If you select this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine features low latency, high performance, long durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can set this parameter only to Cloud Storage.

    • Local Storage: If you select this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.

    Cloud Storage

    Message Type

    The message type of the topic. Valid values:

    • Normal Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. If a broker in the cluster fails, the order of the messages may not be preserved in the partitions. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.

    • Partitionally Ordered Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. If a broker in the cluster fails, the messages are still stored in the partitions in the order in which the messages are sent. Messages in some partitions cannot be sent until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.

    Normal Message

    Log Cleanup Policy

    The log cleanup policy that is used by the topic.

    If you set the Storage Engine parameter to Local Storage, you must configure the Log Cleanup Policy parameter. You can set the Storage Engine parameter to Local Storage only if you use an ApsaraMQ for Kafka Professional Edition instance.

    ApsaraMQ for Kafka provides the following log cleanup policies:

    • Delete: the default log cleanup policy. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes the earliest stored messages to ensure service availability.

    • Compact: the log compaction policy that is used in Apache Kafka. Log compaction ensures that the latest values are retained for messages that have the same key. This policy is suitable for scenarios such as restoring a failed system or reloading the cache after a system restarts. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic.

      Important

      You can use log-compacted topics only in specific cloud-native components such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.

    Compact

    Tag

    The tags that you want to attach to the topic.

    demo

    After a topic is created, you can view the topic on the Topics page.

Step 3: Send messages

Perform the following operations to send messages to the topic that you created:

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Topics.

  5. On the Topics page, find the topic that you want to manage and choose More > Send Message in the Actions column.

  6. In the Start to Send and Consume Message panel, configure the parameters to send a message for testing.

    • If you set the Method of Sending parameter to Console, perform the following steps:

      1. In the Message Key field, enter the message key. Example: demo.

      2. In the Message Content field, enter the message content. Example: {"key": "test"}.

      3. Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.

        • If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field. Example: 0. For information about how to query partition IDs, see View partition status.

        • If you do not want to send the test message to a specific partition, click No.

      4. Use ApsaraMQ for Kafka SDKs or run the Docker commands that are displayed in the Start to Send and Consume Message panel to subscribe to the test message.

    • If you set the Method of Sending parameter to Docker, perform the following steps to run a Docker container.

      1. Run the Docker commands that are displayed in the Run the Docker container to produce a sample message section to send the test message.

      2. Run the Docker commands that are displayed in the How do I consume a message after the message is sent? section to subscribe to the test message.

    • If you set the Method of Sending parameter to SDK, select an SDK for the required programming language or framework and an access method to send and subscribe to the test message.

Step 4: Create a group

Perform the following operations to create a group for Filebeat:

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Groups.

  5. On the Groups page, click Create Group.

  6. In the Create Group panel, enter a group name in the Group ID field and a group description in the Description field, attach tags to the group, and then click OK.

    After a consumer group is created, you can view the consumer group on the Groups page.

Step 5: Use Filebeat to consume messages

Start Filebeat on the server where Filebeat is installed to consume messages from the created topic.

  1. Run the cd command to switch to the installation directory of Filebeat:

  2. Run the following command to download the certificate authority (CA) certificate file:

    wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20220826/ytsw/only-4096-ca-cert
  3. Create a configuration file named input.yml.

    1. Run the vim input.yml command to create an empty configuration file.

    2. Press the i key to enter the insert mode.

    3. Enter the following content:

      filebeat.inputs:
      - type: kafka
        hosts:
          - alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093
          - alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093
          - alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
        username: "alikafka_pre-cn-v641e1dt***"
        password: "aeN3WLRoMPRXmAP2jvJuGk84Kuuo***"
        topics: ["filebeat_test"]
        group_id: "filebeat_group"
        ssl.certificate_authorities: ["/home/admin/filebeat/filebeat-7.7.0-linux-x86_64/ca-cert"]
        ssl.verification_mode: none
      
      output.console:
        pretty: true

      Parameter

      Description

      Example

      hosts

      The public endpoint of your Message Queue for Apache Kafka instance. The public endpoint provided by ApsaraMQ for Kafka is the Secure Sockets Layer (SSL) endpoint.

      - alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093
      - alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093
      - alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093

      username

      The username of your Message Queue for Apache Kafka instance of the Internet and VPC type.

      alikafka_pre-cn-v641e1d***

      password

      The password of your Message Queue for Apache Kafka instance of the Internet and VPC type.

      aeN3WLRoMPRXmAP2jvJuGk84Kuuo***

      topics

      The name of the topic.

      filebeat_test

      group_id

      The name of the group.

      filebeat_group

      ssl.certificate_authorities

      The path of the CA certificate file.

      /home/admin/filebeat/filebeat-7.7.0-linux-x86_64/ca-cert

      ssl.verification_mode

      The verification mode.

      none

      For more information about parameter settings, see Kafka input plugin.

    4. Press the Esc key to return to the CLI mode.

    5. Press the : key to enter the bottom line mode. Enter wq and press the Enter key to save the file and exit.

  4. Run the following command to consume messages:

    ./filebeat -c ./input.yml
    result