You can use the data delivery feature of Managed Service for Prometheus (Prometheus) to deliver monitoring data from a Prometheus instance and then consume the data based on your business requirements. This topic describes how to use the data delivery feature of Prometheus to deliver monitoring data from a Prometheus instance to Apsara MQ for Kafka and then consume the data.
Prerequisites
A Prometheus instance is created. For more information, see the following:
An Apsara MQ for Kafka instance is created and a topic is created. For more information, see Overview.
EventBridge is activated. For more information, see Activate EventBridge and grant permissions to a RAM user.
Limits
The following table lists the instances that support data delivery.
Instance | Description |
Prometheus for Alibaba Cloud services | The free instance, except for those starting with the name cloud-product-prometheus. |
Prometheus for container services | N/A |
Prometheus for application monitoring | N/A |
Prometheus for Flink Serverless | N/A |
Prometheus for Kubernetes | N/A |
General-purpose Prometheus instance | The general-purpose instance, except for those whose data is reported through OpenTelemetry endpoints. |
When you deliver the data in the virtual private cloud (VPC), if the VPC where the Prometheus instance resides is not the same as the target VPC, ensure that the IP address of the vSwitch in the target VPC has been added to the whitelist of the Prometheus instance. Otherwise, network connection may fail.
On the vSwitch page in the VPC console, you can obtain the CIDR block of the vSwitch.
Step 1: Create a data delivery task
Log on to the ARMS console.
In the left-side navigation pane, choose Managed Service for Prometheus > Data Delivery.
On the Data Delivery page, select a region in the top navigation bar and click Create Task.
In the dialog box that appears, set the Task Name and Task Description parameters, and click OK.
On the Edit Task page, configure the data source and event target.
Click + Add Data Source, set the parameters, and then click OK. The following table lists the parameters.
Parameter
Description
Example
Prometheus Instance
Select the Prometheus instance whose data you want to deliver.
c78cb8273c02*****
Data Filtering
Use labels to specify the metrics that you want to include or exclude.
Regular expressions are supported. Use line breaks to separate multiple conditions. The data can be delivered only when the relationship among the conditions is Logical AND.
__name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization regionId=cn-hangzhou id=i-2ze0mxp.*
Click Add Target, set the Destination Type parameter to ApsaraMQ for Kafka, configure the required information, and then click OK.
On the Edit Task page, click OK and Save.
Step 2: View the delivered monitoring data
To reduce the loads of Apsara MQ for Kafka, monitoring data from Prometheus is converted to the JsonArray format by using Snappy before the data is delivered to Apsara MQ for Kafka. For more information, see Snappy.
Method 1: Use the console to view the monitoring data
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.
On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane, click Topics. Find the topic that you want to manage and click Details in the Actions column. Click the CloudMonitor or Message Query tab to view the monitoring data that you imported.
Method 2: Use a Kafka consumer to view the monitoring data
Initialize a Kafka consumer. For more information, see the Use a single consumer to subscribe to messages section of the Use instance endpoints to send and receive messages topic.
Add the following content to the
KafkaConsumerDemo.java
file. In the following sample code, an initialized Kafka consumer uses Snappy to decompress and consume the monitoring data:public static void main(String[] args) { // Initialize the Kafka consumer first. while (true){ try { ConsumerRecords<String, byte[]> records = consumer.poll(1000); // All messages must be consumed before the next polling cycle starts. The total duration must not exceed the timeout period specified by SESSION_TIMEOUT_MS_CONFIG. // We recommend that you create a separate thread pool to consume messages and asynchronously return the results. for (ConsumerRecord<String, byte[]> record : records) { byte[] compressedData = record.value(); byte[] data = Snappy.uncompress(compressedData); System.out.println(new String(data)); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } }
Compile and execute the
KafkaConsumerDemo.java
file. Then, the following monitoring data in JSON format is displayed:[{ "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "2.5", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "+Inf", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.005", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.025", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }]