すべてのプロダクト
Search
ドキュメントセンター

Application Real-Time Monitoring Service:Prometheus インスタンスから Apsara MQ for Kafka へのモニタリングデータの配信

最終更新日:Mar 03, 2025

Managed Service for Prometheus (Prometheus) のデータ配信機能を使用して、Prometheus インスタンスからモニタリングデータを配信し、ビジネス要件に基づいてデータを消費できます。このトピックでは、Prometheus のデータ配信機能を使用して、Prometheus インスタンスから Apsara MQ for Kafka にモニタリングデータを配信し、データを消費する方法について説明します。

前提条件

制限事項

  • 次の表に、データ配信をサポートするインスタンスを示します。

インスタンス

説明

Alibaba Cloud サービス用 Prometheus

cloud-product-prometheus という名前で始まるものを除く、無料インスタンス。

コンテナサービス用 Prometheus

該当なし

アプリケーション監視用 Prometheus

該当なし

Flink Serverless 用 Prometheus

該当なし

Kubernetes 用 Prometheus

該当なし

汎用 Prometheus インスタンス

OpenTelemetry エンドポイントを介してデータが報告されるものを除く、汎用インスタンス。

  • 仮想プライベートクラウド (VPC) 内でデータを配信する場合、Prometheus インスタンスが存在する VPC がターゲット VPC と同じでない場合は、ターゲット VPC 内の vSwitch の IP アドレスが Prometheus インスタンスのホワイトリストに追加されていることを確認してください。そうでない場合、ネットワーク接続に失敗する可能性があります。

    VPC コンソールvSwitch ページで、vSwitch の CIDR ブロックを取得できます。

    444.jpg

ステップ 1: データ配信タスクの作成

  1. ARMS コンソールにログインします。

  2. 左側のナビゲーションペインで、Managed Service for Prometheus > データ配信 を選択します。

  3. データ配信 ページで、上部ナビゲーションバーでリージョンを選択し、タスクの作成 をクリックします。

  4. 表示されるダイアログボックスで、タスク名タスクの説明 パラメータを設定し、OK をクリックします。

  5. タスクの編集 ページで、データソースとイベントターゲットを設定します。

    1. + データソースを追加 をクリックし、パラメータを設定して、OK をクリックします。次の表にパラメータを示します。

      パラメータ

      説明

      Prometheus インスタンス

      配信するデータの Prometheus インスタンスを選択します。

      c78cb8273c02*****

      データフィルタリング

      ラベルを使用して、含めるまたは除外するメトリックを指定します。

      正規表現がサポートされています。複数の条件を区切るには、改行を使用します。条件間の関係が論理 AND の場合にのみ、データが配信されます。

      __name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization
      regionId=cn-hangzhou
      id=i-2ze0mxp.*
    2. ターゲットの追加 をクリックし、宛先タイプ パラメータを ApsaraMQ for Kafka に設定し、必要な情報を設定して、OK をクリックします。

  6. タスクの編集 ページで、OK保存 をクリックします。

ステップ 2: 配信されたモニタリングデータの表示

説明

Apsara MQ for Kafka の負荷を軽減するために、Prometheus からのモニタリングデータは、Apsara MQ for Kafka に配信される前に、Snappy を使用して JsonArray 形式に変換されます。詳細については、Snappy を参照してください。

方法 1: コンソールを使用してモニタリングデータを表示する

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. リソースの分布 セクションの 概要 ページで、管理するApsaraMQ for Kafkaインスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションペインで、トピック をクリックします。管理するトピックを見つけ、アクション 列の 詳細 をクリックします。CloudMonitor または メッセージクエリ タブをクリックして、インポートしたモニタリングデータを表示します。

    image

方法 2: Kafka コンシューマーを使用してモニタリングデータを表示する

  1. Kafka コンシューマーを初期化します。詳細については、「インスタンスエンドポイントを使用してメッセージを送受信する」トピックの 単一のコンシューマーを使用してメッセージをサブスクライブする セクションを参照してください。

  2. 次のコンテンツを KafkaConsumerDemo.java ファイルに追加します。次のサンプルコードでは、初期化された Kafka コンシューマーが Snappy を使用してモニタリングデータを解凍して消費します。

    public static void main(String[] args) {
    
            // Kafka コンシューマーを最初に初期化します。
            
            while (true){
                try {
                    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                    // 次のポーリングサイクルが開始される前に、すべてのメッセージが消費される必要があります。合計期間は、SESSION_TIMEOUT_MS_CONFIG で指定されたタイムアウト期間を超えてはなりません。
                    // メッセージを消費し、結果を非同期的に返すために、個別のスレッドプールを作成することをお勧めします。
                    for (ConsumerRecord<String, byte[]> record : records) {
                        byte[] compressedData = record.value();
                        byte[] data = Snappy.uncompress(compressedData);
                        
                        System.out.println(new String(data));
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);
                    } catch (Throwable ignore) {
    
                    }
    
                    e.printStackTrace();
                }
            }
    }
  3. KafkaConsumerDemo.java ファイルをコンパイルして実行します。次に、JSON 形式の次のモニタリングデータが表示されます。

    [{
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "2.5",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "+Inf",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.005",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.025",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }]