The Application Real-Time Monitoring Service (ARMS) agent V2.7.1.2 and later provides the Kafka plug-in. This allows you to implement tracing in Kafka asynchronous messaging. This topic describes the Kafka versions that support tracing, and how to configure the Kafka plug-in.
Supported ARMS agent versions
Only the ARMS agent V2.7.1.2 and later support the Kafka plug-in. For information about how to upgrade the ARMS agent, see Upgrade ARMS agent.
Supported Kafka versions
Whether the Kafka plug-in supports tracing depends on whether the Kafka protocol supports headers. Only Kafka versions later than 0.11.0 support headers. For more information, see Kafka official documentation.
If you use a client encapsulated based on a native Kafka client, such as Spring Kafka, make sure that the underlying Kafka client version is 0.11.0 or later.
Use the Kafka plug-in
Producer
After the producer is connected to Application Monitoring, you must enable trace information pass-through for Kafka messages in the ARMS console. Log on to the ARMS console. In the left-side navigation pane of the application details page, click Application Settings. On the Custom Configuration tab of the page that appears, turn on kafka sends messages automatically through context. For more information, see Customize application settings.
If the version of the Kafka server is earlier than 0.11.0, do not enable this option. Otherwise, messages may fail to be sent.
Consumer
Use Spring Kafka
If the consumer uses Spring Kafka, and the versions of the ARMS agent and the Kafka client meet the requirements, tracing is supported without additional operations.
Use a native Kafka client
The Apache Kafka Consumer API allows the consumer to send continuous poll requests to pull messages. As a result, the Kafka plug-in of the ARMS agent cannot directly obtain trace data from headers and process it. A custom consumption method must be specified in the ARMS console. The consumption method must contain a parameter of the org.apache.kafka.clients.consumer.ConsumerRecord or org.apache.kafka.clients.consumer.ConsumerRecords type.
package arms.test.kafka;
public class KafkaConsumeTest {
public void testConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "PLAINTEXT://XXXX");
props.put("group.id", UUID.randomUUID().toString());
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ListAdapter consumer;
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
handler(record);
}
}
public void handler(ConsumerRecord<String, String> record){
LOGGER.info( Utils.string(record));
}
}
Log on to the ARMS console. In the left-side navigation pane of the application details page, click Application Settings. On the Custom Configuration tab of the page that appears, set the Kafka custom consumption method parameter to the full name of the consumption method. Example: arms.test.kafka.KafkaConsumeTest.handler. For more information, see Customize application settings. After you specify a Kafka consumption method, you must restart the application of the consumer to make the method take effect.
References
After tracing is completed, you can configure filter conditions and aggregation dimensions to analyze the trace data in real time. For more information, see Trace Explorer.