When you use Kafka for asynchronous messaging, trace context is lost at the producer-consumer boundary, making end-to-end request tracing impossible. The ARMS agent injects trace context into Kafka message headers to propagate spans from producer to consumer, letting you trace complete request flows across asynchronous Kafka pipelines. The Kafka plug-in requires ARMS agent V2.7.1.2 or later and Kafka versions later than 0.11.0.
Configure the Kafka plug-in
Configure the producer
After connecting the producer to Application Monitoring, enable trace context propagation in the ARMS console:
Log on to the ARMS console.
In the left-side navigation pane of the application details page, click Application Settings.
On the Custom Configuration tab, turn on kafka sends messages automatically through context.
For more information, see Custom configuration.
Do not enable this option if your Kafka server version is earlier than 0.11.0. Enabling it on an older server version may cause messages to fail to be sent.
Configure the consumer
Choose the configuration path based on your consumer client.
Spring Kafka
If the consumer uses Spring Kafka and both the ARMS agent and the Kafka client meet the version requirements, tracing works automatically — no additional configuration is needed.
Native Kafka client
The Apache Kafka Consumer API uses a continuous poll loop to pull messages. Because of this polling model, the ARMS agent Kafka plug-in cannot extract trace data from headers automatically. Register a custom consumption method in the ARMS console instead.
The consumption method must accept a parameter of type org.apache.kafka.clients.consumer.ConsumerRecord or org.apache.kafka.clients.consumer.ConsumerRecords.
package arms.test.kafka;
public class KafkaConsumeTest {
public void testConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "PLAINTEXT://XXXX");
props.put("group.id", UUID.randomUUID().toString());
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ListAdapter consumer;
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
handler(record);
}
}
public void handler(ConsumerRecord<String, String> record){
LOGGER.info( Utils.string(record));
}
}
To register the custom consumption method:
Log on to the ARMS console.
In the left-side navigation pane of the application details page, click Application Settings.
On the Custom Configuration tab, set the Kafka custom consumption method parameter to the fully qualified method name. Example:
arms.test.kafka.KafkaConsumeTest.handler.
For more information, see Custom configuration.
After registering the method, restart the consumer application for the change to take effect.
Supported ARMS agent versions
The Kafka plug-in requires ARMS agent V2.7.1.2 or later. To upgrade, see Upgrade ARMS agent.
Supported Kafka versions
Trace context propagation relies on Kafka record headers, which are supported only by Kafka versions later than 0.11.0. For details, see Kafka official documentation.
If your consumer uses a client built on top of the native Kafka client — such as Spring Kafka — confirm that the underlying Kafka client version is 0.11.0 or later.
References
After tracing is set up, configure filter conditions and aggregation dimensions to analyze trace data in real time. For more information, see Trace Explorer.