This topic describes how to use Logstash to write Apache Kafka data to AnalyticDB for MySQL.

Procedure

  1. Run the following commands to install and update plug-ins:
    $bin/plugin install 
    $bin/plugin update

    Logstash 1.5 and later versions have been integrated with Apache Kafka and have changed the directories and names of all plug-ins. The URL for publishing plug-ins is logstash-plugins.

  2. Configure plug-ins.
    • Input configuration examples

      The following example shows how to use a Kafka consumer.

      input {
          kafka {
              zk_connect => "localhost:2181"
              group_id => "logstash"
              topic_id => "test"
              codec => plain
              reset_beginning => false # boolean (optional), default: false
              consumer_threads => 5  # number (optional), default: 1
              decorate_events => true # boolean (optional), default: false
          }
      }          

      Parameters:

      • group_id: specifies the consumer group with a unique group ID. Consumption from different consumer groups is isolated from each other.
      • topic_id: specifies the topic. A consumer first subscribes to a topic and then consumes data in the topic.
      • reset_beginning: specifies the position from which Logstash starts to consume data. By default, Logstash starts to consume data from the previous offset. If no data has been consumed, Logstash will start to consume data from the start offset.

        To import the original data, you must set reset_beginning to true, which works in the same way as the cat command. This way, Logstash will consume data from the start offset. When Logstash consumes the last row, the system sets reset_beginning to tail -F and Logstash will continue to monitor the corresponding data without terminating.

      • decorate_events: specifies that the intrinsic information is provided when a message is published. This information includes the size of the consumed message, topic source, and consumer group information.
      • rebalance_max_retries: specifies the number of retries that can be performed to register the owner registry node of a partition. A rebalance is performed when a new consumer joins a consumer group, after which some partitions are moved to the new consumer. If the new consumer obtains consumption permissions on a specific partition, the new consumer will register the partition owner registry node in ZooKeeper. Retries are performed until the original consumer releases the node.
      • consumer_timeout_ms: specifies that an exception is thrown if no message arrives within the specified timeout period. Typically, you do not need to modify this parameter.

      For more information about Input parameters, visit logstash-kafka.

      Note If multiple consumers need to consume messages of the same topic in parallel, divide the topic into multiple partitions and set the same group_id and topic_id values for two or more consumers. This ensures that messages are consumed in sequence.
    • Output configuration examples
      output {
          jdbc {
              driver_class => "com.mysql.jdbc.Driver"
              connection_string => "jdbc:mysql://HOSTNAME/DATABASE? user=USER&password=PASSWORD"
              statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
          }
      }         

      Parameters:

      • connection_string: specifies the connection string of AnalyticDB for MySQL.
      • statement: specifies the declared arrays in the INSERT statement.

      For more information about Output parameters, visit logstash-kafka.

  3. Run the bin/logstash -f config/xxxx.conf command in the installation directory of Logstash to start the task and write Kafka data to AnalyticDB for MySQL.