Prometheus データ配信サービスを使用して、Prometheus インスタンスからデータをエクスポートし、カスタム処理を行うことができます。このトピックでは、Prometheus インスタンスから ApsaraMQ for Kafka にデータを配信して、さらに処理する方法について説明します。
前提条件
Prometheus インスタンスが接続されていること。詳細については、次のトピックをご参照ください。
宛先として ApsaraMQ for Kafka インスタンスがデプロイされており、Topic などの必要なリソースが作成されていること。詳細については、「概要」をご参照ください。
EventBridge が有効化されていること。詳細については、「EventBridge の有効化と権限の付与」をご参照ください。
説明
注意事項
Virtual Private Cloud (VPC) 経由でデータを配信する場合、Prometheus インスタンスと宛先が異なる VPC にある場合は、宛先の vSwitch の CIDR ブロックを Prometheus インスタンスのホワイトリストに追加する必要があります。そうしないと、ネットワーク接続に失敗します。vSwitch の CIDR ブロックは、VPC コンソールの vSwitch 詳細ページで確認できます。
次の表に、データ配信をサポートするデータソースをリストします。
インスタンスタイプ
説明
Prometheus for Alibaba Cloud Services
名前が cloud-product-prometheus で始まる無料インスタンスは除きます。
Prometheus for Container Service
なし
Prometheus for Flink Serverless
なし
Prometheus for Kubernetes
なし
汎用
OpenTelemetry エンドポイントからレポートされたデータは除きます。
データ配信タスクを作成した後に生成されたリアルタイムデータのみがエクスポート可能です。既存データをエクスポートすることはできません。
ステップ 1:配信タスクの作成
Prometheus マネージドサービスコンソールにログインします。
左側のナビゲーションウィンドウで、 をクリックします。
[データ転送] ページで、上部のメニューバーからターゲットリージョンを選択し、[タスクの作成] をクリックします。
ダイアログボックスで [タスク名] と [タスクの説明] を入力し、[OK] をクリックします。
[タスクの編集] ページで、データソースと宛先を設定します。
[+ データソースの追加] をクリックし、次のパラメーターを設定してから [OK] をクリックします。
設定項目
説明
例
Prometheus インスタンス
配信する Prometheus データソース。
c78cb8273c02*****
データフィルタリング
フィルタリングに使用するメトリックラベルを指定します。正規表現がサポートされています。複数の条件を指定する場合は、改行で区切ります。すべての条件が満たされた場合にのみデータが配信されます。
__name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization regionId=cn-hangzhou id=i-2ze0mxp.*データラベリング
配信されるメトリックデータに追加する新しいラベルを指定します。複数のラベルを追加する場合は、改行で区切ります。
deliver_test_key1=ssss deliver_test_key2=yyyy[宛先の追加] をクリックします。[宛先タイプ] を [ApsaraMQ for Kafka] に設定し、その他の必要な情報を入力して [OK] をクリックします。
設定が完了したら、[タスクの編集] ページで [OK] をクリックし、[保存] をクリックしてデータ配信タスクを作成します。
ステップ 2:Prometheus モニタリングデータの表示
宛先の負荷を軽減するため、Prometheus モニタリングデータは Kafka に配信される前に、Snappy を使用して JSON 配列フォーマットに圧縮されます。詳細については、「Snappy 圧縮フォーマット」をご参照ください。
方法 1:コンソールでのデータ表示
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションでリージョンを選択します。
インスタンスリスト ページで、対象インスタンスの名前をクリックします。
左側のナビゲーションウィンドウで [Topic 管理] をクリックします。宛先の Topic を見つけ、[操作] 列の [詳細] をクリックします。[CloudMonitor] または [メッセージクエリ] タブで、インポートされたデータを表示できます。

方法 2:クライアントを使用したデータ表示
Kafka クライアントを初期化します。詳細については、「単一コンシューマーとしてのメッセージのサブスクライブ」をご参照ください。
次のコードを
KafkaConsumerDemo.javaファイルに追加します。次の例は、Kafka クライアントを初期化した後、Snappy を使用してデータを消費し解凍する方法を示しています:public static void main(String[] args) { // 最初に Kafka コンシューマーを初期化します。 while (true){ try { ConsumerRecords<String, byte[]> records = consumer.poll(1000); // 次の poll の前にこれらのレコードを消費する必要があります。合計時間は SESSION_TIMEOUT_MS_CONFIG を超えることはできません。 // 別のスレッドプールを使用してメッセージを消費し、非同期で結果を返すことを推奨します。 for (ConsumerRecord<String, byte[]> record : records) { byte[] compressedData = record.value(); byte[] data = Snappy.uncompress(compressedData); System.out.println(new String(data)); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } }KafkaConsumerDemo.javaファイルをコンパイルして実行します。次のメトリックデータが JSON フォーマットで返されます:[{ "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "2.5", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "+Inf", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.005", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.025", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }]