All Products
Search
Document Center

Application Real-Time Monitoring Service:Deliver monitoring data from a Prometheus instance to Apsara MQ for Kafka

Last Updated:Dec 25, 2024

You can use the data delivery feature of Managed Service for Prometheus (Prometheus) to deliver monitoring data from a Prometheus instance and then consume the data based on your business requirements. This topic describes how to use the data delivery feature of Prometheus to deliver monitoring data from a Prometheus instance to Apsara MQ for Kafka and then consume the data.

Prerequisites

Limits

  • The following table lists the instances that support data delivery.

Instance

Description

Prometheus for Alibaba Cloud services

The free instance, except for those starting with the name cloud-product-prometheus.

Prometheus for container services

N/A

Prometheus for application monitoring

N/A

Prometheus for Flink Serverless

N/A

Prometheus for Kubernetes

N/A

General-purpose Prometheus instance

The general-purpose instance, except for those whose data is reported through OpenTelemetry endpoints.

  • When you deliver the data in the virtual private cloud (VPC), if the VPC where the Prometheus instance resides is not the same as the target VPC, ensure that the IP address of the vSwitch in the target VPC has been added to the whitelist of the Prometheus instance. Otherwise, network connection may fail.

    On the vSwitch page in the VPC console, you can obtain the CIDR block of the vSwitch.

    444.jpg

Step 1: Create a data delivery task

  1. Log on to the ARMS console.

  2. In the left-side navigation pane, choose Managed Service for Prometheus > Data Delivery.

  3. On the Data Delivery page, select a region in the top navigation bar and click Create Task.

  4. In the dialog box that appears, set the Task Name and Task Description parameters, and click OK.

  5. On the Edit Task page, configure the data source and event target.

    1. Click + Add Data Source, set the parameters, and then click OK. The following table lists the parameters.

      Parameter

      Description

      Example

      Prometheus Instance

      Select the Prometheus instance whose data you want to deliver.

      c78cb8273c02*****

      Data Filtering

      Use labels to specify the metrics that you want to include or exclude.

      Regular expressions are supported. Use line breaks to separate multiple conditions. The data can be delivered only when the relationship among the conditions is Logical AND.

      __name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization
      regionId=cn-hangzhou
      id=i-2ze0mxp.*
    2. Click Add Target, set the Destination Type parameter to ApsaraMQ for Kafka, configure the required information, and then click OK.

  6. On the Edit Task page, click OK and Save.

Step 2: View the delivered monitoring data

Note

To reduce the loads of Apsara MQ for Kafka, monitoring data from Prometheus is converted to the JsonArray format by using Snappy before the data is delivered to Apsara MQ for Kafka. For more information, see Snappy.

Method 1: Use the console to view the monitoring data

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Topics. Find the topic that you want to manage and click Details in the Actions column. Click the CloudMonitor or Message Query tab to view the monitoring data that you imported.

    image

Method 2: Use a Kafka consumer to view the monitoring data

  1. Initialize a Kafka consumer. For more information, see the Use a single consumer to subscribe to messages section of the Use instance endpoints to send and receive messages topic.

  2. Add the following content to the KafkaConsumerDemo.java file. In the following sample code, an initialized Kafka consumer uses Snappy to decompress and consume the monitoring data:

    public static void main(String[] args) {
    
            // Initialize the Kafka consumer first.
            
            while (true){
                try {
                    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                    // All messages must be consumed before the next polling cycle starts. The total duration must not exceed the timeout period specified by SESSION_TIMEOUT_MS_CONFIG. 
                    // We recommend that you create a separate thread pool to consume messages and asynchronously return the results. 
                    for (ConsumerRecord<String, byte[]> record : records) {
                        byte[] compressedData = record.value();
                        byte[] data = Snappy.uncompress(compressedData);
                        
                        System.out.println(new String(data));
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);
                    } catch (Throwable ignore) {
    
                    }
    
                    e.printStackTrace();
                }
            }
    }
  3. Compile and execute the KafkaConsumerDemo.java file. Then, the following monitoring data in JSON format is displayed:

    [{
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "2.5",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "+Inf",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.005",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.025",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }]