Message Queue for Apache Kafka can be connected to Logstash as an input. This topic describes how to use Logstash
to consume messages in Message Queue for Apache Kafka in a virtual private cloud (VPC) environment.
Prerequisites
Ensure that the following actions are completed:
- A Message Queue for Apache Kafka instance is purchased and deployed. For more information, see Access from a VPC.
- Logstash is downloaded and installed. For more information, see Installing Logstash.
- Java Development Kit (JDK) 8 is downloaded and installed. For more information, see
Download JDK 8.
Step 1: Obtain an endpoint
Logstash establishes a connection to Message Queue for Apache Kafka by using a Message Queue for Apache Kafka endpoint.
- Log on to the Message Queue for Apache Kafka console.
- In the left-side navigation pane, click Instances.
- On the Instance Details page, select the instance that is to be connected to Logstash as an input.
- In the Basic Information section, obtain the endpoint of the instance.
Step 2: Create a topic
Perform the following operations to create a topic for storing messages.
- In the left-side navigation pane of the Message Queue for Apache Kafka console, click Topics.
- On the Topics page, click Create Topic.
- In the Create Topic dialog box, enter the topic information and click Create.
Step 3: Send messages
Perform the following operations to send messages to the topic you created.
- On the Topics page of the Message Queue for Apache Kafka console, find the topic you created and click Send Message in the Actions column.
- In the Send Message dialog box, enter the message information and click Send.
Step 4: Create a consumer group
Perform the following operations to create a consumer group for Logstash.
- In the left-side navigation pane of the Message Queue for Apache Kafka console, click Consumer Groups.
- On the Consumer Groups page, click Create Consumer Group.
- In the Create Consumer Group dialog box, enter the consumer group information and click Create.
Step 5: Use Logstash to consume messages
Start Logstash on the server where Logstash has been installed, and consume messages
from the created topic.
- Run the cd command to switch to the bin directory of Logstash.
- Create the input.conf configuration file.
- Run the
vim input.conf
command to create an empty configuration file.
- Press the i key to go to the insert mode.
- Enter the following content:
input {
kafka {
bootstrap_servers => "192.168.XXX.XXX:9092,192.168.XXX.XXX:9092,192.168.XXX.XXX:9092"
group_id => "logstash_group"
topics => ["logstash_test"]
consumer_threads => 12
auto_offset_reset => "earliest"
}
}
output {
stdout{codec=>rubydebug}
}
Parameter |
Description |
Example |
bootstrap_servers |
Message Queue for Apache Kafka supports the following VPC endpoints:
- Default endpoint
- Simple Authentication and Security Layer (SASL) endpoint
|
192.168.XXX.XXX:9092,192.168.XXX.XXX:9092,192.168.XXX.XXX:9092 |
group_id |
The name of the consumer group. |
logstash_group |
topics |
The name of the topic. |
logstash_test |
consumer_threads |
The number of consumer threads. We recommend that you set it to the number of partitions
of the topic.
|
12 |
auto_offset_reset |
The reset offset. Valid values:
- earliest: The earliest message is read.
- latest: The latest message is read.
|
earliest |
- Press the Esc key to return to the command line mode.
- Press the : key to enter the bottom line mode. Type wq, and then press Enter to save the file and exit.
- Run the following command to consume messages:
The following output is obtained.
