2.7.1.2及以上版本的ARMS Agent增加了Kafka插件,使ARMS应用监控可以对应用中特定版本和接入方式的Kafka进行Trace跟踪。本文列出了ARMS应用监控支持的Kafka版本,以及不同接入方式的注意事项。

ARMS Agent版本要求

仅v2.7.1.2及以上版本的ARMS Agent增加了Kafka插件。如果你需要使用包含Kafka插件的ARMS Agent,请联系ARMS钉钉服务账号arms160804,为您进行探针版本升级。

Kafka版本要求

ARMS Agent的Kafka插件实现Trace跟踪需要依赖Kafka消息协议(Message Protocol)所支持的Header的能力,Kafka消息协议共有以下3个版本。

消息版本 Magic Number Kafka版本 是否支持Header
v0 0 <v0.10.0.x
v1 1 [v0.10.0.x, v0.11.0.x)
v2 2 ≥v0.11.0.x

由于仅v0.11.0及以上版本的Kafka支持Header,因此,ARMS应用监控仅支持v0.11.0及以上版本的Kafka。

Kafka的客户端(Producer和Consumer)和服务端(Broker)版本要求

在理想情况下,客户端和服务端的版本应该是相同的,即Kafka Broker如果使用X版本,那么Producer和Consumer应该使用的也是X版本,这样可减少很多运行中的负担。

但是实际上,很多用户经常会采用不匹配的版本,这不可避免的带来了一些额外的复杂度。

因此,ARMS Agent的Kafka插件对不同版本的客户端和服务端设置了以下约束,即不论Kafka客户端(Producer和Consumer)和服务端(Broker)版本是否相同,只要都大于v0.11.0,ARMS Agent的Kafka插件就会注入Trace数据。

服务端(Borker) 客户端(Provider和Consumer) Kafka插件是否会注入Trace数据
[v0.10.0.x, v0.11.0.x) <v0.10.0.x
[v0.10.0.x, v0.11.0.x)
≥v0.11.0.x
≥v0.11.0.x <v0.10.0.x
[v0.10.0.x, v0.11.0.x)
≥v0.11.0.x

Kafka接入方式

ARMS对以下两种Kafka接入方式进行了验证。

直接接入Apache Kafka客户端

Producer

Apache Kafka Provider API无需额外操作即可通过官方接入方式接入ARMS。更多信息,请参见Apache Kafka官方文档

Consumer

Apache Kafka Consumer API并不支持Consumer逐个消费消息,而是采用Pull模式不间断的轮询来消费消息,因此ARMS Agent的Kafka插件没办法直接将Trace注入,需要一个额外的地方进行Consumer消息的注入。如果直接采用Apache Kafka客户端进行Consumer端的接入,ARMS需要进行以下操作。

Consumer端示例文件:

 package arms.test.kafka;
 
 public class KafkaConsumeTest {
        public void testConsumer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "PLAINTEXT://XXXX");
            props.put("group.id",  UUID.randomUUID().toString());
            props.put("enable.auto.commit", "true");
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
            kafkaConsumer.subscribe(Arrays.asList("test"));
            while (true) {
                ListAdapter consumer;
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                     handler(record);       
            }
         }

         public void handler(ConsumerRecord<String, String> record){
                LOGGER.info( Utils.string(record));
         }       
 } 

对于以上示例文件,对应的在ARMS Agent安装包的arms-agent.config文件中,添加以下内容。

profiler.kafka.consumer.entryPoint=arms.test.kafka.KafkaConsumeTest.handler

其中,arms.test.kafka.KafkaConsumeTest.handler为示例文件中的包、类和方法名。

间接接入Spring Kafka

Spring Kafka在Apache Kafka客户端之上做了一层封装,因此,引入的Spring Kafka版本和Apache Kafka客户端版本需要满足以下映射关系。更多信息,请参见Spring Kafka官方文档

Spring for Apache Kafka Version Spring Integration for Apache Kafka Version kafka-clients Spring Boot
v2.7.0 v5.4.x v2.7.0或v2.8.0 v2.4.x或v2.5.x
v2.6.x v5.3.x或v5.4.x v2.6.0 v2.3.x或v2.4.x
v2.5.x v3.3.x v2.5.1 v2.3.x
v2.4.x v3.2.x v2.4.1 v2.2.x
v2.3.x v3.2.x v2.3.1 v2.2.x

如果您使用的是间接接入Spring Kafka的方式接入Kafka Broker,那么您不需要其他任何配置,ARMS Agent Kafka插件默认支持这种方式的Trace链路追踪。

Overhead Trace数据

ARMS Agent的Kafka插件默认为Kafka Message Header添加以下数据,ARMS应用监控即可通过以下数据实现Trace跟踪。

  • EagleEye-TraceID
  • EagleEye-RpcID
  • EagleEye-SpanID
  • EagleEye-pSpanID
  • EagleEye-pAppName
  • EagleEye-pRpc
  • EagleEye-IP
  • EagleEye-ROOT-APP
  • EagleEye-Sampled
  • uber-trace-id
  • X-B3-TraceId
  • X-B3-SpanId
  • X-B3-ParentSpanId
  • X-B3-Flags
  • X-B3-Sampled
  • traceparent