Managed Service for Prometheus (Prometheus) のデータ配信機能を使用して、Prometheus インスタンスからモニタリングデータを配信し、ビジネス要件に基づいてデータを消費できます。このトピックでは、Prometheus のデータ配信機能を使用して、Prometheus インスタンスから Apsara MQ for Kafka にモニタリングデータを配信し、データを消費する方法について説明します。
前提条件
Prometheus インスタンスが作成されていること。詳細については、以下を参照してください。
Apsara MQ for Kafka インスタンスが作成され、トピックが作成されていること。詳細については、概要を参照してください。
EventBridge がアクティブ化されていること。詳細については、EventBridge のアクティブ化と RAM ユーザーへの権限の付与を参照してください。
制限事項
次の表に、データ配信をサポートするインスタンスを示します。
インスタンス | 説明 |
Alibaba Cloud サービス用 Prometheus | cloud-product-prometheus という名前で始まるものを除く、無料インスタンス。 |
コンテナサービス用 Prometheus | 該当なし |
アプリケーション監視用 Prometheus | 該当なし |
Flink Serverless 用 Prometheus | 該当なし |
Kubernetes 用 Prometheus | 該当なし |
汎用 Prometheus インスタンス | OpenTelemetry エンドポイントを介してデータが報告されるものを除く、汎用インスタンス。 |
仮想プライベートクラウド (VPC) 内でデータを配信する場合、Prometheus インスタンスが存在する VPC がターゲット VPC と同じでない場合は、ターゲット VPC 内の vSwitch の IP アドレスが Prometheus インスタンスのホワイトリストに追加されていることを確認してください。そうでない場合、ネットワーク接続に失敗する可能性があります。
VPC コンソールの vSwitch ページで、vSwitch の CIDR ブロックを取得できます。
ステップ 1: データ配信タスクの作成
ARMS コンソールにログインします。
左側のナビゲーションペインで、Managed Service for Prometheus > データ配信 を選択します。
データ配信 ページで、上部ナビゲーションバーでリージョンを選択し、タスクの作成 をクリックします。
表示されるダイアログボックスで、タスク名 と タスクの説明 パラメータを設定し、OK をクリックします。
タスクの編集 ページで、データソースとイベントターゲットを設定します。
+ データソースを追加 をクリックし、パラメータを設定して、OK をクリックします。次の表にパラメータを示します。
パラメータ
説明
例
Prometheus インスタンス
配信するデータの Prometheus インスタンスを選択します。
c78cb8273c02*****
データフィルタリング
ラベルを使用して、含めるまたは除外するメトリックを指定します。
正規表現がサポートされています。複数の条件を区切るには、改行を使用します。条件間の関係が論理 AND の場合にのみ、データが配信されます。
__name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization regionId=cn-hangzhou id=i-2ze0mxp.*
ターゲットの追加 をクリックし、宛先タイプ パラメータを ApsaraMQ for Kafka に設定し、必要な情報を設定して、OK をクリックします。
タスクの編集 ページで、OK と 保存 をクリックします。
ステップ 2: 配信されたモニタリングデータの表示
Apsara MQ for Kafka の負荷を軽減するために、Prometheus からのモニタリングデータは、Apsara MQ for Kafka に配信される前に、Snappy を使用して JsonArray 形式に変換されます。詳細については、Snappy を参照してください。
方法 1: コンソールを使用してモニタリングデータを表示する
ApsaraMQ for Kafka コンソールにログインします。
リソースの分布 セクションの 概要 ページで、管理するApsaraMQ for Kafkaインスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションペインで、トピック をクリックします。管理するトピックを見つけ、アクション 列の 詳細 をクリックします。CloudMonitor または メッセージクエリ タブをクリックして、インポートしたモニタリングデータを表示します。
方法 2: Kafka コンシューマーを使用してモニタリングデータを表示する
Kafka コンシューマーを初期化します。詳細については、「インスタンスエンドポイントを使用してメッセージを送受信する」トピックの 単一のコンシューマーを使用してメッセージをサブスクライブする セクションを参照してください。
次のコンテンツを
KafkaConsumerDemo.java
ファイルに追加します。次のサンプルコードでは、初期化された Kafka コンシューマーが Snappy を使用してモニタリングデータを解凍して消費します。public static void main(String[] args) { // Kafka コンシューマーを最初に初期化します。 while (true){ try { ConsumerRecords<String, byte[]> records = consumer.poll(1000); // 次のポーリングサイクルが開始される前に、すべてのメッセージが消費される必要があります。合計期間は、SESSION_TIMEOUT_MS_CONFIG で指定されたタイムアウト期間を超えてはなりません。 // メッセージを消費し、結果を非同期的に返すために、個別のスレッドプールを作成することをお勧めします。 for (ConsumerRecord<String, byte[]> record : records) { byte[] compressedData = record.value(); byte[] data = Snappy.uncompress(compressedData); System.out.println(new String(data)); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } }
KafkaConsumerDemo.java
ファイルをコンパイルして実行します。次に、JSON 形式の次のモニタリングデータが表示されます。[{ "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "2.5", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "+Inf", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.005", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.025", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }]