Managed Service for Prometheus(Prometheus)のデータ配信機能を使用して、Prometheus インスタンスからモニタリングデータを配信し、ビジネス要件に基づいてデータを消費できます。このトピックでは、Prometheus のデータ配信機能を使用して、Prometheus インスタンスから Apsara MQ for Kafka にモニタリングデータを配信し、データを消費する方法について説明します。
前提条件
Prometheus インスタンスが作成されていること。詳細については、以下をご参照ください。
Apsara MQ for Kafka インスタンスが作成され、トピックが作成されていること。詳細については、「概要」をご参照ください。
EventBridge がアクティブ化されていること。詳細については、「EventBridge をアクティブ化し、RAM ユーザーに権限を付与する」をご参照ください。
説明
制限事項
次の表に、データ配信をサポートするインスタンスを示します。
インスタンス | 説明 |
Alibaba Cloud サービス用 Prometheus | cloud-product-prometheus という名前で始まるものを除く、無料インスタンス。 |
コンテナサービス用 Prometheus | 該当なし |
アプリケーション監視用 Prometheus | 該当なし |
Flink Serverless 用 Prometheus | 該当なし |
Kubernetes 用 Prometheus | 該当なし |
汎用 Prometheus インスタンス | OpenTelemetry エンドポイントを介してデータが報告されるものを除く、汎用インスタンス。 |
仮想プライベートクラウド(VPC)でデータを配信する場合、Prometheus インスタンスが存在する VPC がターゲット VPC と同じでない場合は、ターゲット VPC 内の vSwitch の IP アドレスが Prometheus インスタンスのホワイトリストに追加されていることを確認してください。そうでない場合、ネットワーク接続に失敗する可能性があります。
VPC コンソールの [vSwitch] ページで、vSwitch の CIDR ブロックを取得できます。

ステップ 1: データ配信タスクを作成する
Managed Service for Prometheus コンソール にログインします。
左側のナビゲーションウィンドウで、[データ配信] をクリックします。
[データ配信] ページで、上部のナビゲーションバーでリージョンを選択し、[タスクの作成] をクリックします。
表示されるダイアログボックスで、[タスク名] パラメーターと [タスクの説明] パラメーターを設定し、[OK] をクリックします。
[タスクの編集] ページで、データソースとイベントターゲットを設定します。
[+ データソースを追加] をクリックし、パラメーターを設定して、[OK] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
[Prometheus インスタンス]
データを配信する Prometheus インスタンス。
c78cb8273c02*****
[データフィルタリング]
フィルタリングするメトリックのラベル。
正規表現がサポートされています。複数の条件を区切るには、改行を使用します。条件間の関係が論理 AND の場合にのみ、データを配信できます。
__name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization regionId=cn-hangzhou id=i-2ze0mxp.*[データラベリング]
配信するメトリックデータに追加するラベル。複数のラベルを区切るには、改行を使用します。
deliver_test_key1=ssss deliver_test_key2=yyyy[ターゲットの追加] をクリックし、[宛先タイプ] パラメーターを [ApsaraMQ For Kafka] に設定し、必要な情報を入力して、[OK] をクリックします。
[タスクの編集] ページで、[OK] と [保存] をクリックします。
ステップ 2: 配信されたモニタリングデータを表示する
Apsara MQ for Kafka の負荷を軽減するために、Prometheus からのモニタリングデータは、Apsara MQ for Kafka に配信される前に、Snappy を使用して JsonArray 形式に変換されます。詳細については、Snappy をご参照ください。
方法 1: コンソールを使用してモニタリングデータを表示する
ApsaraMQ for Kafka コンソール にログインします。
概要 ページの リソースの分布 セクションで、管理する Apsara MQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、[トピック] をクリックします。管理するトピックを見つけ、[アクション] 列の [詳細] をクリックします。[CloudMonitor] タブまたは [メッセージクエリ] タブをクリックして、インポートしたモニタリングデータを表示します。

方法 2: Kafka コンシューマーを使用してモニタリングデータを表示する
Kafka コンシューマーを初期化します。詳細については、「インスタンスエンドポイントを使用してメッセージを送受信する」トピックの「単一のコンシューマーを使用してメッセージをサブスクライブする」セクションをご参照ください。
KafkaConsumerDemo.javaファイルに次の内容を追加します。次のサンプルコードでは、初期化された Kafka コンシューマーは Snappy を使用してモニタリングデータを解凍して消費します:public static void main(String[] args) { // Kafka コンシューマーを最初に初期化します。 // 初期化処理は省略 while (true){ try { ConsumerRecords<String, byte[]> records = consumer.poll(1000); // 次のポーリングサイクルが開始される前に、すべてのメッセージが消費されている必要があります。合計期間は、SESSION_TIMEOUT_MS_CONFIG で指定されたタイムアウト期間を超えてはなりません。 // メッセージを消費し、非同期で結果を返すために、個別のスレッドプールを作成することをお勧めします。 for (ConsumerRecord<String, byte[]> record : records) { byte[] compressedData = record.value(); byte[] data = Snappy.uncompress(compressedData); System.out.println(new String(data)); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } }KafkaConsumerDemo.javaファイルをコンパイルして実行すると、JSON フォーマットの以下のモニタリングデータが表示されます:[{ "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "2.5", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "+Inf", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.005", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.025", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }]