Logstash is an open source server-side data processing pipeline that can collect data from multiple sources at the same time, transform the data, and then store it to the specified location. AnalyticDB for MySQL is fully compatible with MySQL. You can use an input plug-in of Logstash to import data from a data source to AnalyticDB for MySQL. This topic describes how to use Logstash to import data from Apache Kafka to AnalyticDB for MySQL Data Warehouse Edition (V3.0).

Logstash plug-ins

  • Input plug-ins that are used to collect data of various types and in different sizes from disparate sources

    Typically, data is stored across multiple systems in different formats. Logstash supports multiple data inputs and collects data from multiple data sources at the same time. Logstash can collect data from logs, metrics, web applications, data stores, and AWS services in a continuous streaming manner.

  • Filter plug-ins that are used to parse and convert data in real time

    Logstash uses filters to parse all types of events and identify defined fields to construct schemas. Then, it converts the schemas into common data types and transmits data to destination repositories, helping you analyze and make the most out of data in an easy and efficient manner. Filter plug-ins provide the following features:

    • Use Grok to parse unstructured data into structured data.
    • Parse geographic information from IP addresses.
    • Anonymize personally identifiable information (PII) to completely exclude sensitive fields.
    • Simplify overall processing without impacts from data sources, formats, or architectures.
  • Output plug-ins that are used to export data

    Logstash provides multiple outputs to flexibly adapt to various downstream use cases.AnalyticDB for MySQL

Procedure

Apache Kafka is a distributed service that can publish and subscribe to logs with high throughput. It provides high availability, high performance, distributed architecture, high scalability, and high durability. Apache Kafka is widely used in major companies. It can be integrated with Logstash and eliminate the need for repeated construction.

  1. In the root directory of the Apache Kafka server, run the following commands to install and update plug-ins:
    $ bin/plugin install 
    $ bin/plugin update

    Logstash 1.5 and later versions are integrated with Apache Kafka and change the directories and names of all plug-ins. For more information about Logstash plug-ins, visit GitHub.

  2. Configure plug-ins.
    • Input configuration examples

      The following example shows how to use a Kafka consumer.

      input {
          kafka {
              zk_connect => "localhost:2181"
              group_id => "Logstash"
              topic_id => "test"
              codec => plain
              reset_beginning => false # boolean (optional), default: false
              consumer_threads => 5  # number (optional), default: 1
              decorate_events => true # boolean (optional), default: false
          }
      }          
      Parameters:
      • group_id: the consumer group with a unique group ID. Consumption from different consumer groups is isolated from each other.
      • topic_id: the topic. A consumer first subscribes to a topic and then consumes data in the topic.
      • reset_beginning: the position from which Logstash starts to consume data. By default, Logstash starts to consume data from the previous offset. If no data has been consumed, Logstash starts to consume data from the start offset.

        To import the original data, you must set reset_beginning to true. This way, Logstash consumes data from the start offset in the same way as the cat command. When Logstash consumes the last row, the system sets reset_beginning to tail -F and Logstash continues to monitor the corresponding data without terminating.

      • decorate_events: provides the intrinsic information when a message is published. This information includes the size of the consumed message, topic source, and consumer group information.
      • rebalance_max_retries: the number of retries that can be performed to register the owner registry node of a partition. A rebalance is performed when a new consumer joins a consumer group, after which specific partitions are moved to the new consumer. If the new consumer obtains consumption permissions on a specific partition, the new consumer registers the partition owner registry node in ZooKeeper. Retries are performed until the original consumer releases the node.
      • consumer_timeout_ms: the timeout period for receiving messages. Typically, you do not need to modify this parameter.

      For more information about input parameters, visit GitHub.

      Note If multiple consumers need to consume messages of the same topic in parallel, you must divide the topic into multiple partitions and set the same group_id and topic_id values for two or more consumers. This ensures that messages are consumed in sequence.
    • Output configuration examples
      output {
          jdbc {
              driver_class => "com.mysql.jdbc.Driver"
              connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
              statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
          }
      }         
      Parameters:
      • connection_string: the endpoint that is used to connect to the AnalyticDB for MySQL cluster.
      • statement: the declared arrays in the INSERT statement.

      For more information about output parameters, visit GitHub.

  3. Run the bin/Logstash -f config/xxxx.conf command in the installation directory of Logstash to start the task and write Kafka data to AnalyticDB for MySQL.