KafkaProducer SDKまたは収集エージェントを使用して、ログを収集し、収集したログをKafkaプロトコルを使用してLog Serviceにアップロードできます。 このトピックでは、Kafkaプロトコルを使用してログをLog Serviceにアップロードする方法について説明します。
制限事項
- Kafka 0.8.0〜Kafka 2.1.1 (メッセージ形式バージョン2) のみがサポートされています。
- ログ送信のセキュリティを確保するには、SASL_SSLプロトコルを使用する必要があります。
- Logstoreに複数のシャードが含まれている場合は、負荷分散モードでログをアップロードする必要があります。
- Kafkaプロトコルを使用して、KafkaProducer SDKまたは収集エージェントを使用して収集されたログのみをLog Serviceにアップロードできます。
データ解析
Kafkaプロトコルを使用してアップロードされたログは、contentフィールドに保存されます。 ログがJSONタイプの場合、contentフィールドにJSONインデックスを設定できます。 詳細については、「JSON データ型」をご参照ください。
KafkaプロデューサーまたはBeatsを使用してログをアップロードする場合、収集設定でtopicまたはheadersパラメーターを設定して、ログをJSON形式で自動的に表示できます。 Log Serviceは自動的にcontentフィールドを展開します。 この場合、contentフィールドにJSONインデックスを設定する必要はありません。 詳細については、「設定」をご参照ください。
設定
項目 | 説明 |
---|---|
接続タイプ | セキュリティプロトコル。 ログ送信のセキュリティを確保するには、SASL_SSLプロトコルを使用する必要があります。 |
ホスト | 初期接続が確立されるアドレス。 Log Serviceプロジェクトのエンドポイントは、プロジェクト名で指定できます。エンドポイント の形式。 エンドポイントは、Log Serviceプロジェクトのリージョンによって異なります。 詳細については、「エンドポイント」をご参照ください。
|
トピック | ログストアの名前 KafkaプロデューサーまたはBeatsを使用してログをアップロードし、出力形式をJSONとして指定した場合、topicパラメーターを |
ヘッダー | KafkaプロデューサーまたはBeatsを使用してログをアップロードし、出力形式をJSONとして指定する場合、headersパラメーターに次の値を指定して、JSONログを自動的に展開できます。
詳細については、「例1: Beatsを使用したログのアップロード 」をご参照ください。 |
ユーザー名 | プロジェクトの名前。 |
パスワード | ${access-key-id }#${ access-key-secret} 形式のAccessKeyペア。 ${access-key-id} と ${access-key-secret} をAccessKey IDとAccessKey secretに置き換えます。 RAMユーザーのAccessKeyペアを使用することを推奨します。 詳細については、「RAMユーザーを作成し、RAMユーザーにLog Serviceへのアクセスを許可する」をご参照ください。 |
証明書ファイル | エンドポイントの証明書ファイル。 Log Serviceの各エンドポイントには証明書があります。 このパラメーターをサーバーのルート証明書へのパスに設定します。 例: /etc/ssl/certs/ca-bundle.crt |
例1: Beatsを使用したログのアップロード
Metricbeat、Packetbeat、Winlogbeat、Auditbeat、Filebeat、HeartbeatなどのBeatsを使用してログを収集できます。 ログが収集されたら、Kafkaプロトコルを使用してログをLog Serviceにアップロードできます。 詳細については、「Beats-Kafka-Output」をご参照ください。
- 例 1
- 設定例
output.kafka: # initial brokers for reading cluster metadata hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] username: "yourusername" password: "yourpassword" ssl.certificate_authorities: # message topic selection + partitioning topic: 'test-logstore-1' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
- サンプルログ
デフォルトでは、BeatsはJSON形式のログを提供します。 ログはLog Serviceにアップロードされ、コンテンツフィールドに保存されます。 contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。
- 設定例
- 例 2
- 設定例
output.kafka: enabled: true hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"] username: "test-project-1" password: "access-key-id#access-key-secret" ssl.certificate_authorities: topic: 'test-logstore-1' ヘッダー: -key: "data-parse-format" 値: "json" パーティション。ハッシュ: reachable_only: false
- サンプルログ Youを設定できますヘッダパラメータに自動的にJSONログ。
- 設定例
例2: Collectdを使用したログのアップロード
collectdは、システムとアプリケーションのメトリックを定期的に収集するデーモンプロセスです。 Kafkaプロトコルを使用して、収集したメトリクスをLog Serviceにアップロードできます。 collectdの詳細については、「collectd」をご参照ください。 Log Serviceにメトリクスをアップロードする方法の詳細については、「Kafkaプラグインの書き込み」をご参照ください。
collectdを使用して収集されたログをLog Serviceにアップロードする前に、collectd-write_kafkaプラグインと関連する依存関係をインストールする必要があります。 CentOSサーバーでは、sudo yum install collectd-write_kafka
コマンドを実行して、collectd-write_kafkaプラグインをインストールできます。 Forより情報をインストールする方法についてRPM Package Manager (RPM) パッケージ、訪問RPMリソースcollectd読み書き_kafka。
- 設定例
Inこの例、collectd JSON提供フォーマットログ。 コマンド形式とGraphite形式のログもサポートされています。 詳細は、「collectd」をご参照ください。
<Plugin write_kafka> プロパティ "metadata.br oker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" プロパティ "sasl.us ername" "yourusername" プロパティ "sasl.password" "yourpassword" Property "broker.address.family" "v4" <Topic "test-logstore-1"> Format JSON Key "content" </トピック> </プラグイン>
- サンプルログ
JSON形式のログがLog Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。
例3: Telegrafを使用したログのアップロード
TelegrafはGoプログラミング言語のエージェントであり、メトリックの収集、処理、および集計に使用されます。 Telegrafは少量のメモリリソースしか消費しません。 詳細については、「Telegraf」をご参照ください。 Telegrafは、さまざまなプラグインと統合機能を提供します。 Telegrafを使用して、Telegrafが実行されているシステムまたはサードパーティAPIからメトリックを取得できます。 Telegrafを使用して、StatsDおよびKafkaコンシューマを使用してメトリックを監視することもできます。
Telegrafを使用して収集されたログをLog Serviceにアップロードする前に、Telegrafの設定ファイルを変更する必要があります。
- 設定例 この例では、TelegrafはJSON形式のログを提供します。 グラファイト-およびCarbon2-formattedログもサポートされています。 詳細については、「Telegraf」をご参照ください。説明 tls_caに有効なパスを指定する必要があります。 サーバー上のルート証明書へのパスを指定できます。 ほとんどの場合、Linuxサーバーのルート証明書へのパスは /etc/ssl/certs/ca-bundle.crtです。
[[outputs.kafka]] ## URLs of kafka brokers brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] ## Kafka topic for producer messages topic = "test-logstore-1" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "yourusername" sasl_password = "yourpassword" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
- サンプルログ
JSON形式のログがLog Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。
例4: Fluentdを使用したログのアップロード
Fluentdはオープンソースのログコレクターです。 FluentdはCloud Native Computing Foundation (CNCF) のプロジェクトであり、すべてのコンポーネントはApache 2ライセンスで利用できます。 詳細については、「Fluentd」をご参照ください。
Fluentdは、さまざまな入力、処理、および出力プラグインをサポートします。 Fluentdを使用してログを収集し、fluent-plugin-kafkaプラグインを使用して収集したログをLog Serviceにアップロードできます。 プラグインのインストールと設定のみが必要です。 詳細については、「fluent-plugin-kafka」をご参照ください。
- 設定例 この例では、FluentdはJSON形式のログを提供します。 数十のフォーマットがサポートされています。 詳細については、「Fluentd Formatter」をご参照ください。
<match **> @type kafka # ブローカー: ブローカーまたは飼育係を選択できます。 ブローカーs test-project-1.cn-hangzhou.log.aliyuncs.com:10012 default_topic test-logstore-1 default_message_key content output_data_type json output_include_tag true output_include_time true sasl_over_ssl true username yourusername // yourusernameを実際の値に置き換えます。 password yourpassword // yourpasswordを実際の値に置き換えます。 ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 10000 required_acks 1 compression_codec gzip </match>
- サンプルログ JSON形式のログがLog Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。
例5: Logstashを使用したログのアップロード
Logstashは、リアルタイム処理機能を提供するオープンソースのログ収集エンジンです。 Logstashを使用して、さまざまなソースからログを動的に収集できます。 詳細は、「Logstash」をご参照ください。
- 設定例
- JAASファイルを作成し、ディレクトリに保存します。 例: /etc/kafka/kafka_client_jaas.conf JAASファイルに次のコンテンツを追加します。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="yourpassword"; };
- SSL証明書を設定し、証明書をディレクトリに保存します。 例: /etc/kafka/client-root.truststore.jks
- Logstashを設定します。 この例では、LogstashはJSON形式のログを提供します。 数十のフォーマットがサポートされています。 詳細については、「Logstashコーデック」をご参照ください。説明 次の設定は、接続テストに使用されます。 本番環境では、stdoutフィールドを削除することを推奨します。
input { stdin { } } output { stdout { codec => rubydebug } kafka { topic_id => "test-logstore-1" bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" security_protocol => "SASL_SSL" ssl_truststore_location => "/etc/client-root.truststore.jks" ssl_truststore_password => "123456" jaas_path => "/etc/kafka_client_jaas.conf" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
- JAASファイルを作成し、ディレクトリに保存します。 例: /etc/kafka/kafka_client_jaas.conf
- サンプルログ JSON形式のログがLog Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。
例6: Kafkaプロデューサーを使用したログのアップロード
Kafkaプロデューサーを使用してログをLog Serviceにアップロードする場合、収集設定でtopicまたはheadersパラメーターを設定して、JSONログを自動的に展開できます。
- 設定例
public static void produce(){ // 設定情報。 Properties props2 = new Properties(); String project = "etl-test-tlj"; 文字列トピック="test3.json"; props2.put("bootstrap.servers" 、"kafka.log.aliyuncs.com:9093"); props2.put("security.protocol" 、"sasl_ssl"); props2.put("sasl.mechanism" 、"PLAIN"); props2.put("sasl.jaas.config" 、 "org.apache.kafka.com mon.security.plain.PlainLoginModuleにはusername=\" "+ project +"\"password=\" access-key-id#access-key-secret\";" が必要です。 // データキーと値のシリアル化クラスを指定します。 props2.put("key.serializer" 、StringSerializer.class); props2.put("value.serializer" 、StringSerializer.class); // プロデューサーインスタンスを作成します。 KafkaProducer<String,String> producer = new KafkaProducer<>(props2); // レコードを送信します。 for(int i=0;i<1;i ++){ ProducerRecord record = new ProducerRecord<String, String>(topic, "{\" logName\": \" error4\"}"); record.headers().add(new RecordHeader("data-parse-format","json".getBytes())); producer.send (レコード); } producer.close(); }
- サンプルログ
エラーメッセージ
エラーメッセージ | 説明 | ソリューション |
---|---|---|
NetworkException | ネットワーク例外が発生した場合に返されるエラーメッセージ。 | Try再び後1秒。 |
TopicAuthorizationException | 認証が失敗した場合に返されるエラーメッセージ。 | AccessKeyペアが無効であるか、指定されたプロジェクトまたはLogstoreにデータを書き込む権限がない場合、認証は失敗します。 この場合、有効なAccessKeyペアを入力し、そのペアに必要な書き込み権限があることを確認します。 |
UnknownTopicOrPartitionException | 次のいずれかのエラーが発生した場合に返されるエラーメッセージ。
| 指定したプロジェクトまたはLogstoreが存在することを確認します。 指定されたプロジェクトまたはLogstoreが作成されてもエラーが続く場合は、指定されたプロジェクトが存在するリージョンが指定されたエンドポイントのリージョンと同じかどうかを確認します。 |
KafkaStorageException | サーバーエラーが発生した場合に返されるエラーメッセージ。 | 1秒後にもう一度お試しください。 |