全部產品
Search
文件中心

Managed Service for Prometheus:將Prometheus資料投遞至Kafka

更新時間:Dec 25, 2025

當您需要將某個Prometheus執行個體的資料匯出,進行自訂業務處理時,您可以選擇Prometheus資料投遞服務。本文介紹如何使用Prometheus資料投遞功能將執行個體資料投遞至Kafka並進行消費處理。

前提條件

注意事項

  • 選擇專用網路進行資料投遞時,如果Prometheus執行個體所在VPC與目標VPC不在同一個,您需要保證目標VPC內的vSwitch的IP已加入Prometheus執行個體的白名單中,否則會導致網路不通。vSwitch的網段資訊可以在專用網路控制台的交換器詳情頁面擷取。

  • 支援資料投遞的資料來源列表。

    執行個體類型

    說明

    Prometheus for 雲端服務

    除cloud-product-prometheus名稱開頭的免費執行個體

    Prometheus for Container Service

    Prometheus for Flink Serverless

    Prometheus for Kubernetes

    通用

    除通過OpenTelemetry地址上報上來的資料

  • 僅支援從建立任務開始後的即時資料匯出,不支援歷史資料的投遞。

步驟一:建立投遞任務

  1. 登入CloudMonitor控制台

  2. 在左側導覽列,單擊Prometheus 監控 > 資料投遞

  3. 資料投遞頁面,單擊頂部功能表列選擇目標地區,然後單擊建立任務

  4. 在對話方塊中輸入任務名稱任務描述後,單擊確定

  5. 任務編輯頁面,配置資料來源和投遞目標。

    1. 單擊+ 添加資料來源,配置以下參數,然後單擊確定

      配置項

      說明

      樣本

      Prometheus執行個體

      被投遞的Prometheus資料來源。

      c78cb8273c02*****

      資料過濾

      設定需要過濾的指標標籤,支援Regex。多個條件需要換行,條件需要同時滿足,才會投遞。

      __name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization
      regionId=cn-hangzhou
      id=i-2ze0mxp.*

      資料打標

      設定需要新增的標籤,支援對投遞的指標資料新增多個label。如果存在多個,則換行。

      deliver_test_key1=ssss
      deliver_test_key2=yyyy
    2. 單擊添加目標,選擇目標類型阿里雲訊息佇列Kafka版,請按照表單所需填寫其餘資訊,然後單擊確定

  6. 配置完成後,在任務編輯頁面,單擊確定,然後單擊儲存建立投遞任務。

步驟二:查看Prometheus監控資料

說明

為減輕投遞目標的壓力,投遞至Kafka的Prometheus監控資料為經過Snappy標準化壓縮後的JsonArray資料。更多資訊,請參見Snappy壓縮格式

方式一:通過控制台查看

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 实例列表頁面,單擊目標執行個體名稱。

  4. 在左側導覽列,單擊Topic 管理,然後單擊目標Topic操作列的詳情,在CloudMonitor訊息查詢頁簽查看您匯入的資料。

    image

方式二:通過用戶端查看

  1. 初始化Kafka用戶端,請參見單Consumer訂閱訊息

  2. KafkaConsumerDemo.java檔案中添加以下代碼。以下為初始化Kafka用戶端後,消費資料並使用Snappy解壓的樣本:

    public static void main(String[] args) {
    
            // 請先初始化kafka consumer
            
            while (true){
                try {
                    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                    //必須在下次poll之前消費完這些資料, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG。
                    //建議開一個單獨的線程池來消費訊息,然後非同步返回結果。
                    for (ConsumerRecord<String, byte[]> record : records) {
                        byte[] compressedData = record.value();
                        byte[] data = Snappy.uncompress(compressedData);
                        
                        System.out.println(new String(data));
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);
                    } catch (Throwable ignore) {
    
                    }
    
                    e.printStackTrace();
                }
            }
    }
  3. 編譯並運行KafkaConsumerDemo.java檔案,您可看到以下JSON格式的指標資料輸出。

    [{
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "2.5",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "+Inf",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.005",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.025",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }]