全部产品
Search
文档中心

Managed Service for Prometheus:Pengiriman data Prometheus ke Kafka

更新时间:Jan 14, 2026

Untuk mengekspor data dari instans Prometheus guna pemrosesan kustom, Anda dapat menggunakan layanan Pengiriman Data Prometheus. Topik ini menjelaskan cara mengirimkan data dari instans Prometheus ke ApsaraMQ for Kafka untuk pemrosesan lebih lanjut.

Prasyarat

Catatan

  • Jika Anda mengirimkan data melalui virtual private cloud (VPC) dan instans Prometheus serta tujuan berada di VPC yang berbeda, Anda harus menambahkan Blok CIDR vSwitch tujuan ke daftar putih instans Prometheus. Jika tidak, koneksi jaringan akan gagal. Anda dapat menemukan Blok CIDR vSwitch pada halaman detail vSwitch di Konsol VPC.

  • Tabel berikut mencantumkan sumber data yang mendukung pengiriman data.

    Jenis instans

    Deskripsi

    Prometheus untuk Layanan Alibaba Cloud

    Tidak termasuk instans gratis yang namanya diawali dengan cloud-product-prometheus.

    Prometheus untuk Container Service

    Tidak ada

    Prometheus untuk Flink Serverless

    Tidak ada

    Prometheus untuk Kubernetes

    Tidak ada

    Tujuan umum

    Tidak termasuk data yang dilaporkan dari titik akhir OpenTelemetry.

  • Hanya data real-time yang dihasilkan setelah Anda membuat tugas pengiriman data yang dapat diekspor. Anda tidak dapat mengekspor data historis.

Langkah 1: Buat tugas pengiriman

  1. Masuk ke Managed Service for Prometheus console.

  2. Pada panel navigasi sebelah kiri, klik Prometheus Monitoring > Data Shipping.

  3. Pada halaman Data Shipping, pilih wilayah tujuan dari bilah menu atas, lalu klik Create Task.

  4. Pada kotak dialog, masukkan Task Name dan Task Description, lalu klik OK.

  5. Pada halaman Edit Task, konfigurasikan sumber data dan tujuan.

    1. Klik + Add Data Source, konfigurasikan parameter berikut, lalu klik OK.

      Item konfigurasi

      Deskripsi

      Contoh

      Prometheus Instance

      Sumber data Prometheus yang dikirimkan.

      c78cb8273c02*****

      Data Filtering

      Tentukan label metrik untuk penyaringan. Ekspresi reguler didukung. Jika Anda menentukan beberapa kondisi, pisahkan dengan jeda baris. Data hanya dikirimkan jika semua kondisi terpenuhi.

      __name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization
      regionId=cn-hangzhou
      id=i-2ze0mxp.*

      Data Labeling

      Tentukan label baru yang akan ditambahkan ke data metrik yang dikirimkan. Jika Anda menambahkan beberapa label, pisahkan dengan jeda baris.

      deliver_test_key1=ssss
      deliver_test_key2=yyyy
    2. Klik Add Destination. Atur Destination Type ke ApsaraMQ for Kafka, masukkan informasi lain yang diperlukan, lalu klik OK.

  6. Setelah menyelesaikan konfigurasi, klik OK pada halaman Edit Task, lalu klik Save untuk membuat tugas pengiriman data.

Langkah 2: Lihat data pemantauan Prometheus

Catatan

Untuk mengurangi beban pada tujuan, data pemantauan Prometheus dikompresi menggunakan Snappy ke dalam format array JSON sebelum dikirimkan ke Kafka. Untuk informasi selengkapnya, lihat Format kompresi Snappy.

Metode 1: Lihat data di konsol

  1. Masuk ke ApsaraMQ for Kafka console.

  2. Pada halaman Overview, pilih wilayah di bagian Resource Distribution.

  3. Pada halaman Instances, klik nama instans target.

  4. Pada panel navigasi sebelah kiri, klik Topic Management. Temukan topik tujuan dan klik Details di kolom Actions. Pada tab CloudMonitor atau Message Query, Anda dapat melihat data yang diimpor.

    image

Metode 2: Lihat data menggunakan client

  1. Inisialisasi client Kafka. Untuk informasi selengkapnya, lihat Berlangganan pesan sebagai konsumen tunggal.

  2. Tambahkan kode berikut ke file KafkaConsumerDemo.java. Contoh berikut menunjukkan cara mengonsumsi dan mendekompresi data menggunakan Snappy setelah menginisialisasi client Kafka:

    public static void main(String[] args) {
    
            // Inisialisasi consumer Kafka terlebih dahulu.
            
            while (true){
                try {
                    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                    // Anda harus mengonsumsi record ini sebelum poll berikutnya. Waktu total tidak boleh melebihi SESSION_TIMEOUT_MS_CONFIG.
                    // Disarankan untuk menggunakan kolam thread terpisah untuk mengonsumsi pesan dan mengembalikan hasil secara asinkron.
                    for (ConsumerRecord<String, byte[]> record : records) {
                        byte[] compressedData = record.value();
                        byte[] data = Snappy.uncompress(compressedData);
                        
                        System.out.println(new String(data));
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);
                    } catch (Throwable ignore) {
    
                    }
    
                    e.printStackTrace();
                }
            }
    }
  3. Kompilasi dan jalankan file KafkaConsumerDemo.java. Data metrik berikut dikembalikan dalam format JSON:

    [{
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "2.5",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "+Inf",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "675.0",
    	"timestamp": "1698732988354"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.005",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }, {
    	"instance": "*****",
    	"pod": "*****",
    	"rejected": "false",
    	"type": "validate",
    	"pod_name": "*****",
    	"endpoint": "http-metrics",
    	"__name__": "apiserver_admission_controller_admission_duration_seconds_bucket",
    	"service": "kubernetes",
    	"name": "*****",
    	"namespace": "default",
    	"le": "0.025",
    	"job": "apiserver",
    	"operation": "UPDATE",
    	"value": "1037.0",
    	"timestamp": "1698732988519"
    }]