A ApsaraMQ for Kafka instance can be connected to Logstash as an output. This topic describes how to use Logstash to send messages to ApsaraMQ for Kafka over the Internet.
Prerequisites
Before you start this tutorial, make sure that the following operations are complete:
A ApsaraMQ for Kafka instance is purchased and deployed. For more information, see Purchase and deploy an Internet- and VPC-connected instance.
Logstash is downloaded and installed. For more information, see Download Logstash.
Java Development Kit (JDK) 8 is downloaded and installed. For more information, see Download JDK 8.
Step 1: Obtain an endpoint
Logstash establishes a connection to ApsaraMQ for Kafka by using a ApsaraMQ for Kafka endpoint.
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
On the Instances page, click the name of the instance that you want to connect to Logstash as an output.
On the Instance Details page, obtain an endpoint of the instance in the Endpoint Information section. In the Configuration Information section, obtain the values of the Username parameter and Password parameter.
NoteFor information about the differences among endpoints, see Comparison among endpoints.
Step 2: Create a topic
Perform the following operations to create a topic for storing messages:
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
ImportantYou must create topics in the region where your application is deployed. When you create a topic, select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if your message producers and consumers run on ECS instances that are deployed in the China (Beijing) region, create topics in the China (Beijing) region.
On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane, click Topics.
On the Topics page, click Create Topic.
In the Create Topic panel, configure the parameters and click OK.
Parameter
Description
Example
Name
The topic name.
demo
Description
The topic description.
demo test
Partitions
The number of partitions in the topic.
12
Storage Engine
NoteYou can select the type of the storage engine only if you use a Professional Edition instance. If you use a Standard Edition instance, cloud storage is selected by default.
The type of the storage engine that is used to store messages in the topic.
ApsaraMQ for Kafka supports the following types of storage engines:
Cloud Storage: If you select this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine features low latency, high performance, durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can set this parameter only to Cloud Storage.
Local Storage: If you select this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.
Cloud Storage
Message Type
The message type of the topic. Valid values:
Normal Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the order of the messages may not be preserved in the partitions. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
Partitionally Ordered Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the messages are still stored in the partitions in the order in which the messages are sent. Messages in some partitions cannot be sent until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
Normal Message
Log Cleanup Policy
The log cleanup policy that is used by the topic.
If you set the Storage Engine parameter to Local Storage, you must configure the Log Cleanup Policy parameter. You can set the Storage Engine parameter to Local Storage only if you use an ApsaraMQ for Kafka Professional Edition instance.
ApsaraMQ for Kafka provides the following log cleanup policies:
Delete: The default log cleanup policy. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes messages from the earliest stored message to ensure service availability.
Compact: The Apache Kafka log compaction policy is used. For more information, see Kafka 3.4 Documentation Log compaction ensures that the latest values are retained for messages that have the same key. This policy is suitable for scenarios such as restoring a failed system or reloading the cache after a system restarts. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic.
ImportantYou can use log-compacted topics only in specific cloud-native components such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.
Compact
Tag
The tags that you want to attach to the topic.
demo
After the topic is created, it is displayed on the Topics page.
Step 3: Use Logstash to send a message
Start Logstash on the server where Logstash is installed, and send a message to the topic that you created.
Run the cd command to switch to the bin directory of Logstash.
Run the following command to download the kafka.client.truststore.jks certificate file:
wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks
Create a configuration file named jaas.conf.
Run the
vim jaas.conf
command to create an empty configuration file.Press the I key to enter the insert mode.
Enter the following content:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX"; };
Parameter
Description
Example
username
The username of your Message Queue for Apache Kafka instance of the Internet and VPC type.
alikafka_pre-cn-v0h1***
password
The password of your Message Queue for Apache Kafka instance of the Internet and VPC type.
GQiSmqbQVe3b9hdKLDcIlkrBK6***
Press the Esc key to return to the command line mode.
Press the : key to go to the bottom line. Enter wq, press the Enter key to save the file, and then exit.
Create a configuration file named output.conf.
Run the
vim output.conf
command to create an empty configuration file.Press the I key to enter the insert mode.
Enter the following content:
input { stdin{} } output { kafka { bootstrap_servers => "alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093" topic_id => "logstash_test" security_protocol => "SASL_SSL" sasl_mechanism => "PLAIN" jaas_path => "/home/logstash-7.6.2/bin/jaas.conf" ssl_truststore_password => "KafkaOnsClient" ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks" ssl_endpoint_identification_algorithm => "" } }
Parameter
Description
Example
bootstrap_servers
The public endpoint of your Message Queue for Apache Kafka instance. The public endpoint provided by ApsaraMQ for Kafka is the Secure Sockets Layer (SSL) endpoint.
alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
topic_id
The name of the topic.
logstash_test
security_protocol
The security protocol. Default value: SASL_SSL. You do not need to change this value.
SASL_SSL
sasl_mechanism
The security authentication mechanism. Default value: PLAIN. You do not need to change this value.
PLAIN
jaas_path
The path of the jaas.conf configuration file.
/home/logstash-7.6.2/bin/jaas.conf
ssl_truststore_password
The password of the kafka.client.truststore.jks certificate. Default value: KafkaOnsClient. You do not need to change this value.
KafkaOnsClient
ssl_truststore_location
The path of the kafka.client.truststore.jks certificate.
/home/logstash-7.6.2/bin/kafka.client.truststore.jks
ssl_endpoint_identification_algorithm
The algorithm for identifying the SSL endpoint. This parameter is required for Logstash V6.x and later.
Null
Press the Esc key to return to the command line mode.
Press the : key to go to the bottom line. Enter wq, press the Enter key to save the file, and then exit.
Send a message to the topic that you created.
Run the
./logstash -f output.conf
command.Enter test and press Enter.
Step 4: View the partitions of the topic
Perform the following operations to view the message that was sent to the topic:
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane, click Topics.
On the Topics page, find the topic whose partition status you want to view, and choose in the Actions column.
Table 1. Information about the status of a partition Parameter
Description
Partition ID
The ID of the partition.
Minimum Offset
The earliest offset based on which messages in the partition are consumed.
Maximum Offset
The latest offset based on which messages in the partition are consumed.
Last Updated At
The most recent point in time when a message is stored in the partition.
Step 5: Query the message by offset
You can query the sent message based on its partition ID and offset information.
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane, click Message Query.
On the Message Query page, select Search by offset from the Search Method drop-down list.
Select a topic name from the Topic drop-down list and a partition from the Partition drop-down list, enter an offset value in the Offset field, and then click Search.
Messages whose offset values are greater than or equal to the specified offset value are displayed. For example, if you specify 5 as the value of both the Partition parameter and Offset parameter, the system queries messages whose offset values are greater than or equal to 5 from Partition 5.
Table 2. Parameters that are included in message query results Parameter
Description
Partition
The partition from which the message is obtained.
Offset
The offset of the message.
Key
The key of the message. The key is converted to a string.
Value
The content of the message. The message content is converted to a string.
Created At
The point in time when the message was produced. The value is the timestamp that the producer recorded when the producer sent the message or the value is the same as the value of the timestamp field that you specified for
ProducerRecord
.NoteIf you specified a value for the timestamp field, the specified value is displayed.
If you did not specify a value for the timestamp field, the local system time when the message is sent is displayed.
A value in the 1970/x/x x:x:x format indicates that the timestamp field is set to 0 or an invalid value.
You cannot specify a value for the timestamp field on clients of ApsaraMQ for Kafka V0.9 and earlier.
Actions
Click Download Key to download the key of the message.
Click Download Value to download the content of the message.
ImportantThe Message Queue for Apache Kafka console can display up to 1 KB of content for each message. If the size of a message exceeds 1 KB, the excess content of the message is omitted. If you want to view the complete message, download the message.
You can download up to 10 MB of message content. If the size of a message exceeds 10 MB, only the first 10 MB of message content can be downloaded.
References
For more information about parameter settings, see Kafka output plugin.