全部產品
Search
文件中心

Application Real-Time Monitoring Service:在Kafka非同步訊息通訊情境中實現鏈路追蹤

更新時間:Jul 06, 2024

從2.7.1.2探針版本開始,ARMS應用監控增加了Kafka外掛程式,在基於Kafka進行非同步訊息通訊的情境中,也可以實現鏈路追蹤。本文介紹支援鏈路追蹤的Kafka版本要求,以及如何配置Kafka外掛程式。

ARMS探針版本要求

僅2.7.1.2及以上版本的探針包含Kafka外掛程式,升級探針的操作,請參見升級ARMS探針

Kafka版本要求

Kafka外掛程式實現鏈路跟蹤,依賴於Kafka訊息協議對Header的支援,僅0.11.0版本以上的Kafka支援了Header,詳情請參見Kafka官方文檔,所以請確保Kafka的服務端和用戶端都在0.11.0版本以上。

Spring-Kafka等用戶端在原生Kafka用戶端的基礎上做了封裝,如果您使用Spring-Kafka等經過了封裝的用戶端,請參考對應的版本文檔,確保底層的Kafka用戶端版本在0.11.0以上。

Kafka外掛程式使用方式

訊息發送端 (Producer)

訊息發送端接入ARMS應用監控以後,需要在ARMS控制台開啟Kafka訊息鏈路資訊透傳才能實現鏈路追蹤。請在ARMS控制台的應用設定 > 自訂配置頁面,開啟kafka發送訊息自動透傳上下文開關,具體的開啟方式請參見自訂配置

重要

如果Kafka服務端的版本小於0.11.0,請勿開啟此選項,否則會導致發送訊息異常。

訊息接收方 (Consumer)

使用Spring-Kafka

如果訊息接收方使用Spring-Kafka,只要探針版本和Kafka用戶端版本滿足要求,無需任何額外操作即可支援鏈路追蹤。

使用原生Kafka用戶端

原生的Apache Kafka Consumer API採用Pull模式不間斷的輪詢來消費訊息,ARMS應用監控的Kafka外掛程式無法直接從Header中擷取並處理鏈路資訊,需要使用者通過ARMS控制台指定Kafka消費方法。消費方法需要帶有一個org.apache.kafka.clients.consumer.ConsumerRecord或org.apache.kafka.clients.consumer.ConsumerRecords類型的參數。

package arms.test.kafka;
 
 public class KafkaConsumeTest {
        public void testConsumer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "PLAINTEXT://XXXX");
            props.put("group.id",  UUID.randomUUID().toString());
            props.put("enable.auto.commit", "true");
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
            kafkaConsumer.subscribe(Arrays.asList("test"));
            while (true) {
                ListAdapter consumer;
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                     handler(record);       
            }
         }

         public void handler(ConsumerRecord<String, String> record){
                LOGGER.info( Utils.string(record));
         }       
 }

請在ARMS控制台應用設定 > 自訂配置頁面的自訂Kafka消費方法參數中,填入完整方法名arms.test.kafka.KafkaConsumeTest.handler,具體的配置方式請參見自訂配置。指定Kafka消費方法後,需要重啟訊息接收方的應用才會生效。

相關文檔

完成鏈路追蹤後,您可以基於已儲存的全量鏈路詳細資料,自由組合篩選條件與彙總維度進行即時分析。更多資訊,請參見調用鏈分析