Log Service では、Logtail、SDK、および API に加えて、Kafka プロトコルに準拠して Log Service にデータを書き込むこともできます。 さまざまな言語および収集エージェントで  Kafka Producer SDK を使用して、収集データを Kafka にエクスポートすることができます。

制限事項

  • サポートされている Kafka プロトコルのバージョンは、Kafka 0.8.0 から Kafka 2.1.1 までです。
  • 安全なデータ送信には SASL_SSL 接続プロトコルを使用する必要があります。
  • Logstore に複数のシャードが含まれている場合、負荷分散モードでデータを書き込む必要があります。
  • 現在、プロデューサーまたはエージェントのみを使用して、Kafkaプロトコルに準拠してLog Serviceにデータを書き込むことができます。

設定項目

Kafka プロトコルを使用してデータを収集する場合、いくつかのパラメーターを設定する必要があります。 次の表で、関連パラメーターについて説明します。
パラメータ 説明 
Connection protocol 安全なデータ送信のための接続プロトコルです。 SASL_SSL を使用する必要があります。 SASL_SSL
hosts 初期接続のクラスターアドレスです。 イントラネット (クラシックネットワークまたはVPC) アドレスのポート番号は 10011 です。 インターネットアドレスのポート番号は 10012 です。 ターゲットプロジェクトがあるサービスエンドポイントを選択する必要があります。 詳細は、「サービスエンドポイント」をご参照ください。
  • cn-hangzhou-intranet.log.aliyuncs.com:10011
  • cn-hangzhou.log.aliyuncs.com:10012
topic Log Service のマップされたログストア名です。 事前にログストアを作成する必要があります。 test-logstore-1
username Log Service のマップされたプロジェクト名です。 <yourusername>
password ${access-key-id}#${access-key-secret} 形式のご使用の AccessKey に関する情報です。 $ {access-key-id} を AccessKey ID に、$ {access-key-secret} を AccessKey Secret に置き換える必要があります。 RAM ユーザーの AccessKey を使用することをお勧めします。 詳細については、「RAM サブアカウントへの Log Service アクセス権限の付与」をご参照ください。 <yourpassword>
証明書 証明書のディレクトリです。 Log Service の各ドメイン名には CA 証明書があります。 デフォルトのルート証明書のみを使用する必要があります。 /etc/ssl/certs/ca-bundle.crt

エラーコード

Kafka プロトコルに準拠したログデータの収集に失敗した場合、システムは失敗の特定の原因に対して Kafka エラーコードを返します。 Kafka エラーコードの詳細については、「 エラーリスト」をご参照ください。 次の表に、特定のエラーコード、説明、および対応するソリューションを示します。
エラーコード 説明 解決方法
NetworkException 内部エラーが発生した場合に返されるエラーメッセージです。 3 秒待ってからやり直します。
TopicAuthorizationException 認証が失敗したため、返されたエラーメッセージです。 通常、AccessKey は無効であるか、対応するプロジェクトまたは Logstore にデータを書き込む権限がありません。 有効な AccessKey を入力し、必要な書き込み権限があることを確認してください。
UnknownTopicOrPartitionException

次のエラーのいずれかが発生したため、エラーメッセージが返されました。

  • 対応するプロジェクトまたは Logstore は存在しません。
  • プロジェクトが配置されているリージョンは、入力したエンドポイントが示すリージョンとは異なります。
  1. 事前にプロジェクトと Logstore を作成します。
  2. プロジェクトが配置されているリージョンは、入力したエンドポイントが示すリージョンと必ず同じにします。
KafkaStorageException 内部エラーが発生した場合に返されるエラーメッセージです。 3秒待ってからやり直します。

Log Service にデータを書き込みます。 Log Service のプロジェクトの名前は test-project-1、Logstore の名前は test-logstore-1 です。 Project が存在するリージョンは cn-hangzhou です。 対応する書き込み許可を持つ RAM ユーザーの AccessKey ID は <yourAccessKeyId>、AccessKey Secret は <yourAccessKeySecret> です。

  • 例 1:Beats ソフトウェアを使用した Log Service へのデータの書き込み
    Metricbeat、Packetbeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat などの Beats ソフトウェアを使用して、収集データを Kafka にエクスポートできます。 詳細については、「 Kafka 出力の設定」をご参照ください。 サンプルコードは次のとおりです。
    output.kafka: 
      # initial brokers for reading cluster metadata 
      hosts: ["cn-hangzhou.log.aliyuncs.com:10012"] 
      username: "<yourusername>" 
      password: "<yourpassword>" 
      ssl.certificate_authorities: 
      # message topic selection + partitioning 
      topic: 'test-logstore-1' 
      partition.round_robin: 
        reachable_only: false 
    
      required_acks: 1 
      compression: gzip 
      max_message_bytes: 1000000
    デフォルトでは、Beats ソフトウェアは JSON 形式のログを Kafka にエクスポートします。 コンテンツフィールドの JSON タイプインデックスを作成することもできます。 詳細については、「 JSON データ型」をご参照ください。 以下の図はログサンプルです。
  • 例 2:Collectd を使用した Log Service へのデータの書き込み

    Collectd は、システムまたはアプリケーションのパフォーマンスメトリックを定期的に収集するために使用されるデーモンです。 Collectd を使用して、収集したデータを Kafka にエクスポートすることもできます。 詳細については、「Write Kafka プラグイン」をご参照ください。

    収集したデータを Collectd から Kafka にエクスポートする場合は、Write Kafka プラグインと関連する依存関係をインストールする必要があります。 CentOS では、sudo yum install collectd-write_kafka コマンドを直接実行してプラグインをインストールできます。 Red-Hat Package Manager (RPM) リソースの詳細については、 「RPMリソースcollectd-write_kafka」をご参照ください。

    サンプルコードは次のとおりです。
    <Plugin write_kafka>
      Property "metadata.broker.list" "cn-hangzhou.log.aliyuncs.com:10012" 
      Property "security.protocol" "sasl_ssl" 
      Property "sasl.mechanism" "PLAIN" 
      Property "sasl.username" "<yourusername>" 
      Property "sasl.password" "<yourpassword>" 
      Property "broker.address.family" "v4"  
      <Topic "test-logstore-1">
        Format JSON 
        Key "content"  
      </Topic>
    </Plugin>
    						

    上記のサンプルコードでは、Kafka にエクスポートされたデータの形式は JSON に設定されています。 Collectd は、JSON の他に、Command および Graphite 形式もサポートしています。 詳細については、「収集された設定ドキュメント」をご参照ください。

    JSON 形式を使用する場合、コンテンツフィールドの JSON タイプインデックスを作成できます。 詳細については、「 JSON タイプ」をご参照ください。 詳細については、以下の図をご参照ください。
  • 例 3:Telegraf を使用した Log Service へのデータの書き込み

    Telegraf は InfluxData のサブプロジェクトで、 メトリックを収集、処理、および集計するために Go でコンパイルされたエージェントです。 使用するメモリリソースを抑えるように設計されています。 Telegraf は、プラグインを介してサービスを構築し、サードパーティコンポーネントのメトリックを収集するために使用できます。 さらに、Telegraf には統合機能があります。 実行されるシステムからメトリックを取得し、サードパーティ API を介してメトリックを取得し、さらに StatsD および Kafka コンシューマサービスを介してメトリックを監視することもできます。

    Telegraf は、Kafka にデータをエクスポートできます。 したがって、設定ファイルを変更するだけで、Telegraf を使用したデータ収集および Log Service へのデータ書き込みが可能になります。 サンプルコードは次のとおりです。
    [[outputs.kafka]] 
      ## URLs of kafka brokers 
      brokers = ["cn-hangzhou.log.aliyuncs.com:10012"] 
      ## Kafka topic for producer messages 
      topic = "test-logstore-1" 
      routing_key = "content" 
      ## CompressionCodec represents the various compression codecs recognized by 
      ## Kafka in messages. 
      ## 0 : No compression 
      ## 1 : Gzip compression 
      ## 2 : Snappy compression 
      ## 3 : LZ4 compression 
      compression_codec = 1 
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" 
      # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" 
      ## Use TLS but skip chain & host verification 
      # insecure_skip_verify = false 
      ## Optional SASL Config 
      sasl_username = "<yourusername>" 
      sasl_password = "<yourpassword>" 
      ## Data format to output. 
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md 
      data_format = "json"
    Telegraf に有効な tls_ca ディレクトリを設定する必要があります。 デフォルトのルート証明書を使用できます。 Linux 環境の一般的なルート証明書ディレクトリは /etc/ssl/certs/ca-bundle.crt です。
    上記のサンプルコードでは、Kafka にエクスポートされたデータの形式は JSON に設定されています。 Telegraf は、JSON の他に Graphite や Carbon2 などの他の形式もサポートしています。 詳細については、「 Telegraf 出力データ形式」をご参照ください。
    JSON 形式を使用する場合、コンテンツフィールドの JSON タイプインデックスを作成できます。 詳細については、「 JSON タイプ」をご参照ください。 詳細は以下の図をご参照ください。
  • 例 4:Fluentd を使用した Log Service へのデータの書き込み

    Fluentd は、統合されたログ層を提供するオープンソースのデータコレクターです。 統一された方法でデータを収集できるため、データの使用および理解が簡単です。 Fluentd は、Cloud Native Computing Foundation (CNCF) のメンバープロジェクトです。 Apache 2 ライセンスプロトコルに準拠しています。

    Fluentd には入力、処理、および出力プラグインが数多く備わっています。 特に、 Kafkaプラグイン は Fluentd が Kafka にデータをエクスポートするときに便利です。 必要なのは、インストール と設定のみです。

    サンプルコードは次のとおりです。
    <match **>
      @type kafka 
      # Brokers: You can choose either brokers or zookeeper. 
      brokers      cn-hangzhou.log.aliyuncs.com:10012 
      default_topic test-logstore-1 
      default_message_key content 
      output_data_type json 
      output_include_tag true 
      output_include_time true 
      sasl_over_ssl true 
      username <yourusername> 
      password <yourpassword> 
      ssl_ca_certs_from_system true 
      # ruby-kafka producer options 
      max_send_retries 10000 
      required_acks 1 
      compression_codec gzip 
    </match>
    上記のサンプルコードでは、Kafka にエクスポートされたデータの形式は JSON に設定されています。 Fluentd は JSON の他にも、10 以上の形式をサポートしています。 詳細については、「 Fluentd Formatter」をご参照ください。
    JSON 形式を使用する場合、コンテンツフィールドのJSON タイプインデックスを作成できます。 詳細については、「JSON タイプ」をご参照ください。 詳細は以下の図をご参照ください。
  • 例 5:Logstash を使用した Log Service へのデータの書き込み

    Logstash は、リアルタイムでデータを収集するためのオープンソースエンジンです。 Logstash を使用すると、さまざまなソースからデータを動的に収集し、データを処理 (データのフィルター処理や変換など) して、結果をターゲットアドレスにエクスポートできます。 出力結果に基づいて、データをさらに分析できます。

    Logstash は、組み込みの Kafka 出力プラグインを提供します。 Logstash を直接有効にして、Log Service にデータを書き込むことができます。 ただし、Log Service は Kafka プロトコルに準拠して SASL_SSL 接続プロトコルを使用するため、SSL 証明書と SASL jass ファイルを設定する必要があります。
    1. jaas ファイルを作成し、/etc/kafka/kafka_client_jaas.conf などターゲットディレクトリに保存します。
      KafkaClient { 
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="<yourusername>" 
        password="<yourpassword>"; 
      };
    2. SSL 証明書を設定し、ターゲットディレクトリ ( /etc/kafka/client-root.truststore.jksなど) に保存します
      Log Service の各ドメイン名はそれぞれ CA 証明書を持っています。 GlobalSign Root CA をダウンロードし Base64 でエンコードされたルート証明書をターゲットディレクトリ ( /etc/kafka/ca-root など) に保存するだけです。 次に、 keytool コマンドを実行して JKS ファイルを生成します。 初めてJKSファイルを生成するときは、パスワードを設定する必要があります。
      keytool -keystore client.truststore.jks -alias root -import -file /etc/kafka/ca-root
    3. Logstash を設定します。 サンプルコードは次のとおりです。
      input { stdin { } } 
      output { 
        stdout { codec => rubydebug } 
        kafka { 
          topic_id => "test-logstore-1" 
          bootstrap_servers => "cn-hangzhou.log.aliyuncs.com:10012" 
          security_protocol => "SASL_SSL" 
          ssl_truststore_location => "/etc/client-root.truststore.jks" 
          ssl_truststore_password => "123456" 
          jaas_path => "/etc/kafka_client_jaas.conf" 
          sasl_mechanism => "PLAIN" 
          codec => "json" 
          client_id => "kafka-logstash" 
        } 
      }
      上記のサンプルコードの設定は、接続性テストに使用されます。 実際のアプリケーションでは、stdout 出力設定を削除することをお勧めします。
      上記のサンプルコードでは、Kafka にエクスポートされたデータの形式は JSON に設定されています。 JSON に加えて、Logstash は 10を超える形式もサポートしています。 詳細については、「Logstash Codec プラグイン」をご参照ください。
      JSON 形式を使用する場合、コンテンツフィールドの JSON タイプインデックスを作成できます。 詳細については、「 JSON タイプ」をご参照ください。 詳細については、以下の図をご参照ください。