Logstash は、SASL_SSL 認証を使用して、インターネット経由でログおよびイベントデータを ApsaraMQ for Kafka インスタンスに転送できます。このガイドでは、エンドポイントの取得、Topic の作成、Logstash の設定、メッセージの検証について説明します。
前提条件
開始する前に、以下をご確認ください:
ApsaraMQ for Kafka インスタンス (インターネットおよび VPC タイプ) を購入し、デプロイ済みであること。詳細については、「インターネットおよび VPC 接続インスタンスの購入とデプロイ」をご参照ください。
Logstash がインストール済みであること。詳細については、「Logstash のダウンロード」をご参照ください。
Java 開発キット (JDK) 8 がインストール済みであること。詳細については、「JDK 8 のダウンロード」をご参照ください。
ステップ 1: エンドポイントと認証情報の取得
Logstash は SSL エンドポイントを介して ApsaraMQ for Kafka に接続します。コンソールからエンドポイントと SASL 認証情報を取得します。
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース配布] セクションで、インスタンスが存在するリージョンを選択します。
[インスタンス] ページで、ターゲットインスタンスの名前をクリックします。
[インスタンス詳細] ページで、以下を収集します:

[エンドポイント情報] セクション: SSL エンドポイント (ポート 9093) をコピーします。
[設定情報] セクション: [ユーザー名] と [パスワード] をメモします。
エンドポイントタイプに関する詳細については、「エンドポイントの比較」をご参照ください。
ステップ 2: Topic の作成
Logstash が送信するメッセージを受信するための Topic を作成します。
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース配布] セクションで、インスタンスが存在するリージョンを選択します。
重要ご利用の Elastic Compute Service (ECS) インスタンスと同じリージョンに Topic を作成してください。Topic はリージョンをまたいで使用することはできません。たとえば、プロデューサーとコンシューマーが中国 (北京) リージョンの ECS インスタンスで実行されている場合、Topic も中国 (北京) に作成する必要があります。
[インスタンス] ページで、ターゲットインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、[Topic] をクリックします。
[トピック] ページで、[トピックの作成] をクリックします。
[Topic の作成] パネルで、Topic のプロパティを設定し、[OK] をクリックします。Topic が作成されると、[Topic] ページに表示されます。
パラメーター 説明 例 名前 トピック名。 demo 説明 トピックの簡単な説明。 demo test パーティション パーティションの数。 12 ストレージエンジン ストレージエンジンのタイプ。Professional Edition インスタンスでのみ設定可能です。Standard Edition はデフォルトで Cloud Storage になります。
- Cloud Storage:3 つの分散レプリカを持つ Alibaba Cloud ディスクを使用します。低レイテンシー、高パフォーマンス、高い耐久性、高い信頼性を備えています。Standard (高書き込み) インスタンスに必須です。
- Local Storage:オープンソースの Apache Kafka の同期レプリカ (ISR) アルゴリズムを使用し、3 つの分散レプリカを持ちます。Cloud Storage メッセージタイプ メッセージの順序付け動作。
- 通常メッセージ:同じキーを持つメッセージは、送信順に同じパーティションに送信されます。ブローカーに障害が発生した場合、順序は保証されないことがあります。ストレージエンジンが Cloud Storage の場合、自動的に選択されます。
- パーティション順序付きメッセージ:同じキーを持つメッセージは、送信順に同じパーティションに送信されます。ブローカーに障害が発生した場合でも順序は保証されますが、影響を受けるパーティションは復元されるまで利用できません。ストレージエンジンが Local Storage の場合、自動的に選択されます。通常メッセージ ログクリーンアップポリシー ストレージエンジンが Local Storage (Professional Edition) の場合にのみ設定可能です。
- 削除:デフォルトのポリシーです。メッセージを最大保持期間まで保持します。ストレージ使用量が 85% を超えると、最も古いメッセージから削除されます。
- 圧縮:各キーの最新の値のみを保持します。ログ圧縮されたトピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。圧縮 タグ トピックにアタッチするオプションのタグ。 demo
ステップ 3: Logstash の設定と実行
サーバー上で SSL 証明書、SASL 認証情報、Logstash 出力設定を行い、テストメッセージを送信します。
SSL 証明書のダウンロード
Logstash の bin ディレクトリに切り替え、トラストストア証明書をダウンロードします:
cd <logstash-install-dir>/bin
wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/raw/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jksJAAS 設定ファイルの作成
Logstash の bin ディレクトリに jaas.conf という名前のファイルを作成し、次の内容を記述します:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<your-username>"
password="<your-password>";
};プレースホルダーを実際の値に置き換えます:
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-username> | [設定情報] セクションからの [ユーザー名] | alikafka_pre-cn-v0h1\*\*\* |
<your-password> | [設定情報] セクションの [パスワード]。 | GQiSmqbQVe3b9hdKLDcIlkrBK6\*\*\* |
jaas_path の設定は JVM 全体に適用されます。単一の Logstash インスタンスで複数の Kafka 出力を実行し、それぞれに異なる認証情報が必要な場合は、代わりにインラインの sasl_jaas_config パラメーターを使用してください。詳細については、「Kafka 出力プラグイン」リファレンスをご参照ください。
Logstash 出力設定ファイルの作成
Logstash の bin ディレクトリに output.conf という名前のファイルを作成し、次の内容を記述します:
input {
stdin {}
}
output {
stdout { codec => json }
kafka {
bootstrap_servers => "<your-endpoint>"
topic_id => "<your-topic>"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
jaas_path => "<logstash-install-dir>/bin/jaas.conf"
ssl_truststore_password => "KafkaOnsClient"
ssl_truststore_location => "<logstash-install-dir>/bin/kafka.client.truststore.jks"
ssl_endpoint_identification_algorithm => ""
}
}これらのプレースホルダーを実際の値に置き換えます:
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-endpoint> | [エンドポイント情報] セクションの SSL エンドポイント (ポート 9093)。 | alikafka-pre-cn-zv\*\*\*\*\*-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-3.alikafka.aliyuncs.com:9093 |
<your-topic> | ステップ 2 で作成した Topic の名前。 | logstash_test |
<logstash-install-dir> | Logstash のインストールディレクトリへの絶対パス。 | /home/logstash-7.6.2 |
残りのパラメーターは固定値を使用します。変更しないでください:
| パラメーター | 固定値 | 説明 |
|---|---|---|
security_protocol | SASL_SSL | インターネット接続用のセキュリティプロトコル。 |
sasl_mechanism | PLAIN | SASL 認証メカニズム。 |
ssl_truststore_password | KafkaOnsClient | トラストストア証明書のパスワード。 |
ssl_endpoint_identification_algorithm | "" (空の文字列) | Logstash 6.x 以降で必須。ホスト名検証を無効にします。 |
テストメッセージの送信
出力設定を使用して Logstash を起動します:
./logstash -f output.confLogstash が起動したら、
testと入力して Enter キーを押します。stdout出力はメッセージをローカルに JSON 形式で表示し、kafka出力はメッセージをご利用の ApsaraMQ for Kafka インスタンスに送信します。
結果の確認
コンソールでパーティションステータスの確認とメッセージのクエリを行い、メッセージがご利用の ApsaraMQ for Kafka インスタンスに到達したことを確認します。
パーティションステータスの確認
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース配布] セクションで、インスタンスが存在するリージョンを選択します。
[インスタンス] ページで、ターゲットインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、[Topic] をクリックします。
Topic 名をクリックし、[Topic 詳細] ページで [パーティションステータス] タブをクリックします。
パラメーター 説明 パーティション ID パーティションの ID。 最小オフセット パーティション内の最も古いオフセット。 最大オフセット パーティション内の最新のオフセット。 メッセージ パーティション内のメッセージの総数。 最終更新日時 最新のメッセージが保存された時刻。 
オフセットによるメッセージのクエリ
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース配布] セクションで、インスタンスが存在するリージョンを選択します。
[インスタンス] ページで、ターゲットインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、[メッセージクエリ] をクリックします。
[検索方法] ドロップダウンリストから、[オフセットで検索] を選択します。
[Topic] ドロップダウンリストから Topic を選択し、[パーティション] ドロップダウンリストからパーティションを選択し、[オフセット] フィールドにオフセット値を入力して、[検索] をクリックします。コンソールは、指定された値以上のオフセットを持つすべてのメッセージを返します。たとえば、[パーティション] を
5に、[オフセット] を5に設定すると、パーティション 5 からオフセット 5 以上のすべてのメッセージが返されます。パラメーター 説明 パーティション メッセージが保存されているパーティション。 オフセット パーティション内のメッセージオフセット。 キー メッセージキー。文字列として表示されます。 [値] メッセージ本文。文字列として表示されます。 [作成日時] メッセージが送信されたときのタイムスタンプ。 ProducerRecordのタイムスタンプフィールドに値を指定した場合、その値が表示されます。値を指定しなかった場合は、メッセージが送信されたときのローカルシステム時刻が表示されます。タイムスタンプフィールドが0または無効な値に設定されている場合、時刻は1970/x/x x:x:x形式で表示されます。ApsaraMQ for Kafka バージョン 0.9 以前のクライアントでは、このフィールドを設定できません。[操作] [キーのダウンロード]: メッセージキーをダウンロードします。[値のダウンロード]: メッセージ本文をダウンロードします。コンソールにはメッセージごとに最大 1 KB まで表示されます。1 KB を超える内容を表示するには、メッセージをダウンロードしてください。一度にダウンロードできるメッセージは最大 10 MB です。
参考文献
Kafka 出力プラグイン -- Logstash の Kafka 出力プラグインの完全なパラメーターリファレンス。