Log Service では、Logtail、SDK、および API に加えて、Kafka プロトコルに準拠して Log Service にデータを書き込むこともできます。 さまざまな言語および収集エージェントで Kafka Producer SDK を使用して、収集データを Kafka にエクスポートすることができます。

制限事項
- サポートされている Kafka プロトコルのバージョンは、Kafka 0.8.0 から Kafka 2.1.1 までです。
- 安全なデータ送信には SASL_SSL 接続プロトコルを使用する必要があります。
- Logstore に複数のシャードが含まれている場合、負荷分散モードでデータを書き込む必要があります。
- 現在、プロデューサーまたはエージェントのみを使用して、Kafkaプロトコルに準拠してLog Serviceにデータを書き込むことができます。
設定項目
パラメータ | 説明 | 例 |
---|---|---|
Connection protocol | 安全なデータ送信のための接続プロトコルです。 SASL_SSL を使用する必要があります。 | SASL_SSL |
hosts | 初期接続のクラスターアドレスです。 イントラネット (クラシックネットワークまたはVPC) アドレスのポート番号は 10011 です。 インターネットアドレスのポート番号は 10012 です。 ターゲットプロジェクトがあるサービスエンドポイントを選択する必要があります。 詳細は、「サービスエンドポイント」をご参照ください。 |
|
topic | Log Service のマップされたログストア名です。 事前にログストアを作成する必要があります。 | test-logstore-1 |
username | Log Service のマップされたプロジェクト名です。 | <yourusername> |
password | ${access-key-id}#${access-key-secret} 形式のご使用の AccessKey に関する情報です。 $ {access-key-id} を AccessKey ID に、$ {access-key-secret} を AccessKey Secret に置き換える必要があります。 RAM ユーザーの AccessKey を使用することをお勧めします。 詳細については、「RAM サブアカウントへの Log Service アクセス権限の付与」をご参照ください。 | <yourpassword> |
証明書 | 証明書のディレクトリです。 Log Service の各ドメイン名には CA 証明書があります。 デフォルトのルート証明書のみを使用する必要があります。 | /etc/ssl/certs/ca-bundle.crt |
エラーコード
エラーコード | 説明 | 解決方法 |
---|---|---|
NetworkException | 内部エラーが発生した場合に返されるエラーメッセージです。 | 3 秒待ってからやり直します。 |
TopicAuthorizationException | 認証が失敗したため、返されたエラーメッセージです。 通常、AccessKey は無効であるか、対応するプロジェクトまたは Logstore にデータを書き込む権限がありません。 | 有効な AccessKey を入力し、必要な書き込み権限があることを確認してください。 |
UnknownTopicOrPartitionException |
次のエラーのいずれかが発生したため、エラーメッセージが返されました。
|
|
KafkaStorageException | 内部エラーが発生した場合に返されるエラーメッセージです。 | 3秒待ってからやり直します。 |
例
Log Service にデータを書き込みます。 Log Service のプロジェクトの名前は test-project-1
、Logstore の名前は test-logstore-1
です。 Project が存在するリージョンは cn-hangzhou です。 対応する書き込み許可を持つ RAM ユーザーの AccessKey ID は <yourAccessKeyId>
、AccessKey Secret は <yourAccessKeySecret>
です。
- 例 1:Beats ソフトウェアを使用した Log Service へのデータの書き込み
Metricbeat、Packetbeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat などの Beats ソフトウェアを使用して、収集データを Kafka にエクスポートできます。 詳細については、「Kafka 出力の設定」をご参照ください。 サンプルコードは次のとおりです。
output.kafka: # initial brokers for reading cluster metadata hosts: ["cn-hangzhou.log.aliyuncs.com:10012"] username: "<yourusername>" password: "<yourpassword>" ssl.certificate_authorities: # message topic selection + partitioning topic: 'test-logstore-1' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
デフォルトでは、Beats ソフトウェアは JSON 形式のログを Kafka にエクスポートします。 コンテンツフィールドの JSON タイプインデックスを作成することもできます。 詳細については、「JSON データ型」をご参照ください。 以下の図はログサンプルです。 - 例 2:Collectd を使用した Log Service へのデータの書き込み
Collectd は、システムまたはアプリケーションのパフォーマンスメトリックを定期的に収集するために使用されるデーモンです。 Collectd を使用して、収集したデータを Kafka にエクスポートすることもできます。 詳細については、「Write Kafka プラグイン」をご参照ください。
収集したデータを Collectd から Kafka にエクスポートする場合は、Write Kafka プラグインと関連する依存関係をインストールする必要があります。 CentOS では、
sudo yum install collectd-write_kafka
コマンドを直接実行してプラグインをインストールできます。 Red-Hat Package Manager (RPM) リソースの詳細については、 「RPMリソースcollectd-write_kafka」をご参照ください。サンプルコードは次のとおりです。<Plugin write_kafka> Property "metadata.broker.list" "cn-hangzhou.log.aliyuncs.com:10012" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "<yourusername>" Property "sasl.password" "<yourpassword>" Property "broker.address.family" "v4" <Topic "test-logstore-1"> Format JSON Key "content" </Topic> </Plugin>
上記のサンプルコードでは、Kafka にエクスポートされたデータの形式は JSON に設定されています。 Collectd は、JSON の他に、Command および Graphite 形式もサポートしています。 詳細については、「収集された設定ドキュメント」をご参照ください。
JSON 形式を使用する場合、コンテンツフィールドの JSON タイプインデックスを作成できます。 詳細については、「JSON タイプ」をご参照ください。 詳細については、以下の図をご参照ください。 - 例 3:Telegraf を使用した Log Service へのデータの書き込み
Telegraf は InfluxData のサブプロジェクトで、 メトリックを収集、処理、および集計するために Go でコンパイルされたエージェントです。 使用するメモリリソースを抑えるように設計されています。 Telegraf は、プラグインを介してサービスを構築し、サードパーティコンポーネントのメトリックを収集するために使用できます。 さらに、Telegraf には統合機能があります。 実行されるシステムからメトリックを取得し、サードパーティ API を介してメトリックを取得し、さらに StatsD および Kafka コンシューマサービスを介してメトリックを監視することもできます。
Telegraf は、Kafka にデータをエクスポートできます。 したがって、設定ファイルを変更するだけで、Telegraf を使用したデータ収集および Log Service へのデータ書き込みが可能になります。 サンプルコードは次のとおりです。[[outputs.kafka]] ## URLs of kafka brokers brokers = ["cn-hangzhou.log.aliyuncs.com:10012"] ## Kafka topic for producer messages topic = "test-logstore-1" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "<yourusername>" sasl_password = "<yourpassword>" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
注 Telegraf に有効なtls_ca
ディレクトリを設定する必要があります。 デフォルトのルート証明書を使用できます。 Linux 環境の一般的なルート証明書ディレクトリは/etc/ssl/certs/ca-bundle.crt
です。JSON 形式を使用する場合、コンテンツフィールドの JSON タイプインデックスを作成できます。 詳細については、「JSON タイプ」をご参照ください。 詳細は以下の図をご参照ください。 - 例 4:Fluentd を使用した Log Service へのデータの書き込み
Fluentd は、統合されたログ層を提供するオープンソースのデータコレクターです。 統一された方法でデータを収集できるため、データの使用および理解が簡単です。 Fluentd は、Cloud Native Computing Foundation (CNCF) のメンバープロジェクトです。 Apache 2 ライセンスプロトコルに準拠しています。
Fluentd には入力、処理、および出力プラグインが数多く備わっています。 特に、 Kafkaプラグイン は Fluentd が Kafka にデータをエクスポートするときに便利です。 必要なのは、インストール と設定のみです。
サンプルコードは次のとおりです。JSON 形式を使用する場合、コンテンツフィールドのJSON タイプインデックスを作成できます。 詳細については、「JSON タイプ」をご参照ください。 詳細は以下の図をご参照ください。
上記のサンプルコードでは、Kafka にエクスポートされたデータの形式は JSON に設定されています。 Fluentd は JSON の他にも、10 以上の形式をサポートしています。 詳細については、「Fluentd Formatter」をご参照ください。<match **> @type kafka # Brokers: You can choose either brokers or zookeeper. brokers cn-hangzhou.log.aliyuncs.com:10012 default_topic test-logstore-1 default_message_key content output_data_type json output_include_tag true output_include_time true sasl_over_ssl true username <yourusername> password <yourpassword> ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 10000 required_acks 1 compression_codec gzip </match>
- 例 5:Logstash を使用した Log Service へのデータの書き込み
Logstash は、リアルタイムでデータを収集するためのオープンソースエンジンです。 Logstash を使用すると、さまざまなソースからデータを動的に収集し、データを処理 (データのフィルター処理や変換など) して、結果をターゲットアドレスにエクスポートできます。 出力結果に基づいて、データをさらに分析できます。
Logstash は、組み込みの Kafka 出力プラグインを提供します。 Logstash を直接有効にして、Log Service にデータを書き込むことができます。 ただし、Log Service は Kafka プロトコルに準拠して SASL_SSL 接続プロトコルを使用するため、SSL 証明書と SASL jass ファイルを設定する必要があります。- jaas ファイルを作成し、/etc/kafka/kafka_client_jaas.conf などターゲットディレクトリに保存します。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="<yourusername>" password="<yourpassword>"; };
- SSL 証明書を設定し、ターゲットディレクトリ ( /etc/kafka/client-root.truststore.jksなど) に保存します。
Log Service の各ドメイン名はそれぞれ CA 証明書を持っています。 GlobalSign Root CA をダウンロードし Base64 でエンコードされたルート証明書をターゲットディレクトリ (/etc/kafka/ca-root など) に保存するだけです。 次に、keytool コマンドを実行して JKS ファイルを生成します。 初めてJKSファイルを生成するときは、パスワードを設定する必要があります。
keytool -keystore client.truststore.jks -alias root -import -file /etc/kafka/ca-root
- Logstash を設定します。 サンプルコードは次のとおりです。
input { stdin { } } output { stdout { codec => rubydebug } kafka { topic_id => "test-logstore-1" bootstrap_servers => "cn-hangzhou.log.aliyuncs.com:10012" security_protocol => "SASL_SSL" ssl_truststore_location => "/etc/client-root.truststore.jks" ssl_truststore_password => "123456" jaas_path => "/etc/kafka_client_jaas.conf" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
注 上記のサンプルコードの設定は、接続性テストに使用されます。 実際のアプリケーションでは、stdout 出力設定を削除することをお勧めします。JSON 形式を使用する場合、コンテンツフィールドの JSON タイプインデックスを作成できます。 詳細については、「JSON タイプ」をご参照ください。 詳細については、以下の図をご参照ください。
- jaas ファイルを作成し、/etc/kafka/kafka_client_jaas.conf などターゲットディレクトリに保存します。