從2.7.1.2探針版本開始,ARMS應用監控增加了Kafka外掛程式,在基於Kafka進行非同步訊息通訊的情境中,也可以實現鏈路追蹤。本文介紹支援鏈路追蹤的Kafka版本要求,以及如何配置Kafka外掛程式。
ARMS探針版本要求
僅2.7.1.2及以上版本的探針包含Kafka外掛程式,升級探針的操作,請參見升級ARMS探針。
Kafka版本要求
Kafka外掛程式實現鏈路跟蹤,依賴於Kafka訊息協議對Header的支援,僅0.11.0版本以上的Kafka支援了Header,詳情請參見Kafka官方文檔,所以請確保Kafka的服務端和用戶端都在0.11.0版本以上。
Spring-Kafka等用戶端在原生Kafka用戶端的基礎上做了封裝,如果您使用Spring-Kafka等經過了封裝的用戶端,請參考對應的版本文檔,確保底層的Kafka用戶端版本在0.11.0以上。
Kafka外掛程式使用方式
訊息發送端 (Producer)
訊息發送端接入ARMS應用監控以後,需要在ARMS控制台開啟Kafka訊息鏈路資訊透傳才能實現鏈路追蹤。請在ARMS控制台的頁面,開啟kafka發送訊息自動透傳上下文開關,具體的開啟方式請參見自訂配置。
如果Kafka服務端的版本小於0.11.0,請勿開啟此選項,否則會導致發送訊息異常。
訊息接收方 (Consumer)
使用Spring-Kafka
如果訊息接收方使用Spring-Kafka,只要探針版本和Kafka用戶端版本滿足要求,無需任何額外操作即可支援鏈路追蹤。
使用原生Kafka用戶端
原生的Apache Kafka Consumer API採用Pull模式不間斷的輪詢來消費訊息,ARMS應用監控的Kafka外掛程式無法直接從Header中擷取並處理鏈路資訊,需要使用者通過ARMS控制台指定Kafka消費方法。消費方法需要帶有一個org.apache.kafka.clients.consumer.ConsumerRecord或org.apache.kafka.clients.consumer.ConsumerRecords類型的參數。
package arms.test.kafka;
public class KafkaConsumeTest {
public void testConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "PLAINTEXT://XXXX");
props.put("group.id", UUID.randomUUID().toString());
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ListAdapter consumer;
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
handler(record);
}
}
public void handler(ConsumerRecord<String, String> record){
LOGGER.info( Utils.string(record));
}
}請在ARMS控制台頁面的自訂Kafka消費方法參數中,填入完整方法名arms.test.kafka.KafkaConsumeTest.handler,具體的配置方式請參見自訂配置。指定Kafka消費方法後,需要重啟訊息接收方的應用才會生效。
相關文檔
完成鏈路追蹤後,您可以基於已儲存的全量鏈路詳細資料,自由組合篩選條件與彙總維度進行即時分析。更多資訊,請參見調用鏈分析。