すべてのプロダクト
Search
ドキュメントセンター

Managed Service for Prometheus:Prometheus データの Kafka への配信

最終更新日:Jan 13, 2026

Prometheus データ配信サービスを使用して、Prometheus インスタンスからデータをエクスポートし、カスタム処理を行うことができます。このトピックでは、Prometheus インスタンスから ApsaraMQ for Kafka にデータを配信して、さらに処理する方法について説明します。

前提条件

注意事項

  • Virtual Private Cloud (VPC) 経由でデータを配信する場合、Prometheus インスタンスと宛先が異なる VPC にある場合は、宛先の vSwitch の CIDR ブロックを Prometheus インスタンスのホワイトリストに追加する必要があります。そうしないと、ネットワーク接続に失敗します。vSwitch の CIDR ブロックは、VPC コンソールの vSwitch 詳細ページで確認できます。

  • 次の表に、データ配信をサポートするデータソースをリストします。

    インスタンスタイプ

    説明

    Prometheus for Alibaba Cloud Services

    名前が cloud-product-prometheus で始まる無料インスタンスは除きます。

    Prometheus for Container Service

    なし

    Prometheus for Flink Serverless

    なし

    Prometheus for Kubernetes

    なし

    汎用

    OpenTelemetry エンドポイントからレポートされたデータは除きます。

  • データ配信タスクを作成した後に生成されたリアルタイムデータのみがエクスポート可能です。既存データをエクスポートすることはできません。

ステップ 1:配信タスクの作成

  1. Prometheus マネージドサービスコンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[Prometheus モニタリング] > [Data Shipping] をクリックします。

  3. [データ転送] ページで、上部のメニューバーからターゲットリージョンを選択し、[タスクの作成] をクリックします。

  4. ダイアログボックスで [タスク名][タスクの説明] を入力し、[OK] をクリックします。

  5. [タスクの編集] ページで、データソースと宛先を設定します。

    1. [+ データソースの追加] をクリックし、次のパラメーターを設定してから [OK] をクリックします。

      設定項目

      説明

      Prometheus インスタンス

      配信する Prometheus データソース。

      c78cb8273c02*****

      データフィルタリング

      フィルタリングに使用するメトリックラベルを指定します。正規表現がサポートされています。複数の条件を指定する場合は、改行で区切ります。すべての条件が満たされた場合にのみデータが配信されます。

      __name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization
      regionId=cn-hangzhou
      id=i-2ze0mxp.*

      データラベリング

      配信されるメトリックデータに追加する新しいラベルを指定します。複数のラベルを追加する場合は、改行で区切ります。

      deliver_test_key1=ssss
      deliver_test_key2=yyyy
    2. [宛先の追加] をクリックします。[宛先タイプ][ApsaraMQ for Kafka] に設定し、その他の必要な情報を入力して [OK] をクリックします。

  6. 設定が完了したら、[タスクの編集] ページで [OK] をクリックし、[保存] をクリックしてデータ配信タスクを作成します。

ステップ 2:Prometheus モニタリングデータの表示

説明

宛先の負荷を軽減するため、Prometheus モニタリングデータは Kafka に配信される前に、Snappy を使用して JSON 配列フォーマットに圧縮されます。詳細については、「Snappy 圧縮フォーマット」をご参照ください。

方法 1:コンソールでのデータ表示

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションでリージョンを選択します。

  3. インスタンスリスト ページで、対象インスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで [Topic 管理] をクリックします。宛先の Topic を見つけ、[操作] 列の [詳細] をクリックします。[CloudMonitor] または [メッセージクエリ] タブで、インポートされたデータを表示できます。

    image

方法 2:クライアントを使用したデータ表示

  1. Kafka クライアントを初期化します。詳細については、「単一コンシューマーとしてのメッセージのサブスクライブ」をご参照ください。

  2. 次のコードを KafkaConsumerDemo.java ファイルに追加します。次の例は、Kafka クライアントを初期化した後、Snappy を使用してデータを消費し解凍する方法を示しています:

    public static void main(String[] args) {
    
            // 最初に Kafka コンシューマーを初期化します。
            
            while (true){
                try {
                    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                    // 次の poll の前にこれらのレコードを消費する必要があります。合計時間は SESSION_TIMEOUT_MS_CONFIG を超えることはできません。
                    // 別のスレッドプールを使用してメッセージを消費し、非同期で結果を返すことを推奨します。
                    for (ConsumerRecord<String, byte[]> record : records) {
                        byte[] compressedData = record.value();
                        byte[] data = Snappy.uncompress(compressedData);
                        
                        System.out.println(new String(data));
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);
                    } catch (Throwable ignore) {
    
                    }
    
                    e.printStackTrace();
                }
            }
    }
  3. KafkaConsumerDemo.java ファイルをコンパイルして実行します。次のメトリックデータが JSON フォーマットで返されます:

    [{
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "2.5",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "+Inf",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.005",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.025",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }]