Application Real-Time Monitoring Service (ARMS) エージェント V2.7.1.2 以降では、Kafkaプラグインが提供されています。これにより、Kafka非同期メッセージングでのトレースを実装できます。このトピックでは、トレースをサポートするKafkaのバージョンと、Kafkaプラグインの設定方法について説明します。
サポートされているARMSエージェントのバージョン
ARMSエージェントV2.7.1.2以降のみがKafkaプラグインをサポートしています。ARMSエージェントのアップグレード方法については、ARMSエージェントのアップグレードを参照してください。
サポートされているKafkaのバージョン
Kafkaプラグインがトレースをサポートするかどうかは、Kafkaプロトコルがヘッダーをサポートするかどうかによって異なります。0.11.0以降のKafkaバージョンのみがヘッダーをサポートしています。詳細については、Kafka公式ドキュメントを参照してください。
Spring KafkaなどのネイティブKafkaクライアントに基づいてカプセル化されたクライアントを使用する場合は、基盤となるKafkaクライアントのバージョンが0.11.0以降であることを確認してください。
Kafkaプラグインの使用
プロデューサー
プロデューサーがアプリケーションモニタリングに接続された後、ARMSコンソールでKafkaメッセージのトレース情報パススルーを有効にする必要があります。ARMSコンソールにログインします。アプリケーション詳細ページの左側のナビゲーションペインで、アプリケーション設定をクリックします。表示されるページのカスタム設定タブで、Kafkaがコンテキストを介してメッセージを自動的に送信するをオンにします。詳細については、アプリケーション設定のカスタマイズを参照してください。
Kafkaサーバーのバージョンが0.11.0より前の場合は、このオプションを有効にしないでください。有効にすると、メッセージの送信に失敗する可能性があります。
コンシューマー
Spring Kafkaの使用
コンシューマーがSpring Kafkaを使用し、ARMSエージェントとKafkaクライアントのバージョンが要件を満たしている場合、追加の操作なしでトレースがサポートされます。
ネイティブKafkaクライアントの使用
Apache Kafka Consumer APIを使用すると、コンシューマーは継続的なpollリクエストを送信してメッセージをプルできます。そのため、ARMSエージェントのKafkaプラグインは、ヘッダーからトレースデータを直接取得して処理することはできません。ARMSコンソールでカスタム消費メソッドを指定する必要があります。消費メソッドには、org.apache.kafka.clients.consumer.ConsumerRecordまたはorg.apache.kafka.clients.consumer.ConsumerRecordsタイプの パラメーターが含まれている必要があります。
package arms.test.kafka;
public class KafkaConsumeTest {
public void testConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "PLAINTEXT://XXXX");
props.put("group.id", UUID.randomUUID().toString());
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ListAdapter consumer;
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
handler(record); // コンシューマーレコードを処理するメソッド
}
}
public void handler(ConsumerRecord<String, String> record){ // レコード処理メソッド
LOGGER.info( Utils.string(record));
}
}ARMSコンソールにログインします。アプリケーション詳細ページの左側のナビゲーションペインで、アプリケーション設定をクリックします。表示されるページのカスタム設定タブで、Kafkaカスタム消費メソッドパラメーターを消費メソッドの完全名に設定します。例:arms.test.kafka.KafkaConsumeTest.handler。詳細については、アプリケーション設定のカスタマイズを参照してください。Kafka消費メソッドを指定した後、コンシューマーのアプリケーションを再起動して、メソッドを有効にする必要があります。
参照
トレースが完了したら、フィルター条件と集計ディメンションを設定して、トレースデータをリアルタイムで分析できます。詳細については、トレース分析を参照してください。