All Products
Search
Document Center

Application Real-Time Monitoring Service:Implement tracing in Kafka asynchronous messaging

Last Updated:Feb 22, 2024

The Application Real-Time Monitoring Service (ARMS) agent V2.7.1.2 and later provides the Kafka plug-in. This allows you to implement tracing in Kafka asynchronous messaging. This topic describes the Kafka versions that support tracing, and how to configure the Kafka plug-in.

Supported ARMS agent versions

Only the ARMS agent V2.7.1.2 and later support the Kafka plug-in. For information about how to upgrade the ARMS agent, see Upgrade ARMS agent.

Supported Kafka versions

Whether the Kafka plug-in supports tracing depends on whether the Kafka protocol supports headers. Only Kafka versions later than 0.11.0 support headers. For more information, see Kafka official documentation.

If you use a client encapsulated based on a native Kafka client, such as Spring Kafka, make sure that the underlying Kafka client version is 0.11.0 or later.

Use the Kafka plug-in

Producer

After the producer is connected to Application Monitoring, you must enable trace information pass-through for Kafka messages in the ARMS console. Log on to the ARMS console. In the left-side navigation pane of the application details page, click Application Settings. On the Custom Configuration tab of the page that appears, turn on kafka sends messages automatically through context. For more information, see Customize application settings.

Important

If the version of the Kafka server is earlier than 0.11.0, do not enable this option. Otherwise, messages may fail to be sent.

Consumer

Use Spring Kafka

If the consumer uses Spring Kafka, and the versions of the ARMS agent and the Kafka client meet the requirements, tracing is supported without additional operations.

Use a native Kafka client

The Apache Kafka Consumer API allows the consumer to send continuous poll requests to pull messages. As a result, the Kafka plug-in of the ARMS agent cannot directly obtain trace data from headers and process it. A custom consumption method must be specified in the ARMS console. The consumption method must contain a parameter of the org.apache.kafka.clients.consumer.ConsumerRecord or org.apache.kafka.clients.consumer.ConsumerRecords type.

package arms.test.kafka;
 
 public class KafkaConsumeTest {
        public void testConsumer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "PLAINTEXT://XXXX");
            props.put("group.id",  UUID.randomUUID().toString());
            props.put("enable.auto.commit", "true");
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
            kafkaConsumer.subscribe(Arrays.asList("test"));
            while (true) {
                ListAdapter consumer;
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                     handler(record);       
            }
         }

         public void handler(ConsumerRecord<String, String> record){
                LOGGER.info( Utils.string(record));
         }       
 }

Log on to the ARMS console. In the left-side navigation pane of the application details page, click Application Settings. On the Custom Configuration tab of the page that appears, set the Kafka custom consumption method parameter to the full name of the consumption method. Example: arms.test.kafka.KafkaConsumeTest.handler. For more information, see Customize application settings. After you specify a Kafka consumption method, you must restart the application of the consumer to make the method take effect.

References

After tracing is completed, you can configure filter conditions and aggregation dimensions to analyze the trace data in real time. For more information, see Trace Explorer.