The Application Real-Time Monitoring Service (ARMS) Kafka plug-in propagates trace context through Kafka message headers, linking producers and consumers into a single distributed trace.
Prerequisites
Before you begin, make sure that you have:
ARMS agent V2.7.1.2 or later installed. To upgrade, see Upgrade ARMS agent
Apache Kafka later than 0.11.0 (the minimum version that supports message headers)
If using Spring Kafka or another wrapper, a native Kafka client version of 0.11.0 or later
Tracing relies on Kafka message headers, introduced in Kafka 0.11.0. For details, see Kafka record format.
Configure the producer
Enable trace context pass-through so the ARMS agent injects trace data into outgoing Kafka message headers.
Log on to the ARMS console.
In the left-side navigation pane of the application details page, click Application Settings.
On the Custom Configuration tab, turn on kafka sends messages automatically through context.
For details, see Customize application settings.
If the Kafka server version is earlier than 0.11.0, do not enable this option. Messages may fail to send because the server does not support headers.
Configure the consumer
Spring Kafka
No additional configuration is required. If the ARMS agent and Kafka client versions meet the prerequisites, the ARMS agent automatically extracts trace context from incoming message headers.
Native Kafka client
The native Kafka Consumer API uses continuous poll requests to pull messages, which prevents the ARMS agent from directly extracting trace data from headers. To work around this, register a custom consumption method in the ARMS console.
The consumption method must accept a parameter of type org.apache.kafka.clients.consumer.ConsumerRecord or org.apache.kafka.clients.consumer.ConsumerRecords.
Example
The following example defines a handler method that the ARMS agent intercepts to extract trace context from each record:
package arms.test.kafka;
public class KafkaConsumeTest {
public void testConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "<bootstrap-server-address>");
props.put("group.id", UUID.randomUUID().toString());
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer =
new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records =
kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// Delegate each record to the handler method
handler(record);
}
}
}
// ARMS agent intercepts this method to extract trace context
public void handler(ConsumerRecord<String, String> record) {
LOGGER.info(Utils.string(record));
}
}Replace the following placeholder with your actual value:
| Placeholder | Description | Example |
|---|---|---|
<bootstrap-server-address> | Kafka bootstrap server address in PLAINTEXT://host:port format | PLAINTEXT://192.168.1.100:9092 |
Register the consumption method
Log on to the ARMS console.
In the left-side navigation pane of the application details page, click Application Settings.
On the Custom Configuration tab, set Kafka custom consumption method to the fully qualified method name. Example:
arms.test.kafka.KafkaConsumeTest.handler.Restart the consumer application to apply the change.
For details, see Customize application settings.
What to do next
After tracing is set up, use Trace Explorer to filter, aggregate, and analyze trace data in real time.