KafkaProducer SDKまたは収集エージェントを使用して、ログを収集し、収集したログをKafkaプロトコルを使用してLog Serviceにアップロードできます。 このトピックでは、Kafkaプロトコルを使用してログをLog Serviceにアップロードする方法について説明します。

制限事項

  • Kafka 0.8.0〜Kafka 2.1.1 (メッセージ形式バージョン2) のみがサポートされています。
  • ログ送信のセキュリティを確保するには、SASL_SSLプロトコルを使用する必要があります。
  • Logstoreに複数のシャードが含まれている場合は、負荷分散モードでログをアップロードする必要があります。
  • Kafkaプロトコルを使用して、KafkaProducer SDKまたは収集エージェントを使用して収集されたログのみをLog Serviceにアップロードできます。

データ解析

Kafkaプロトコルを使用してアップロードされたログは、contentフィールドに保存されます。 ログがJSONタイプの場合、contentフィールドにJSONインデックスを設定できます。 詳細については、「JSON データ型」をご参照ください。

KafkaプロデューサーまたはBeatsを使用してログをアップロードする場合、収集設定でtopicまたはheadersパラメーターを設定して、ログをJSON形式で自動的に表示できます。 Log Serviceは自動的にcontentフィールドを展開します。 この場合、contentフィールドにJSONインデックスを設定する必要はありません。 詳細については、「設定」をご参照ください。

設定

Kafkaプロトコルを使用してログをLog Serviceにアップロードする場合は、次のパラメーターを設定する必要があります。
項目説明
接続タイプセキュリティプロトコル。 ログ送信のセキュリティを確保するには、SASL_SSLプロトコルを使用する必要があります。
ホスト初期接続が確立されるアドレス。 Log Serviceプロジェクトのエンドポイントは、プロジェクト名で指定できます。エンドポイントの形式。 エンドポイントは、Log Serviceプロジェクトのリージョンによって異なります。 詳細については、「エンドポイント」をご参照ください。
  • 内部エンドポイントの例: test-project-1.cn-hangzhou-intranet.log.aliyuncs.com:10011。 ポート番号は10011です。
  • パブリックエンドポイントの例: test-project-1.cn-hangzhou.log.aliyuncs.com:10012。 ポート番号は10012です。
トピックログストアの名前

KafkaプロデューサーまたはBeatsを使用してログをアップロードし、出力形式をJSONとして指定した場合、topicパラメーターをLogstore name.json形式の値に設定して、JSONログを自動的に展開できます。 詳細については、「例6: Kafkaプロデューサーを使用したログのアップロード」をご参照ください。

ヘッダーKafkaプロデューサーまたはBeatsを使用してログをアップロードし、出力形式をJSONとして指定する場合、headersパラメーターに次の値を指定して、JSONログを自動的に展開できます。
  ヘッダー:
    -key: "data-parse-format"
      値: "json"

詳細については、「例1: Beatsを使用したログのアップロード 」をご参照ください。

ユーザー名プロジェクトの名前。
パスワード${access-key-id }#${ access-key-secret} 形式のAccessKeyペア。 ${access-key-id}${access-key-secret} をAccessKey IDとAccessKey secretに置き換えます。 RAMユーザーのAccessKeyペアを使用することを推奨します。 詳細については、「RAMユーザーを作成し、RAMユーザーにLog Serviceへのアクセスを許可する」をご参照ください。
証明書ファイルエンドポイントの証明書ファイル。 Log Serviceの各エンドポイントには証明書があります。 このパラメーターをサーバーのルート証明書へのパスに設定します。 例: /etc/ssl/certs/ca-bundle.crt
説明 Kafkaコンシューマーグループを使用してLog Serviceのデータをリアルタイムで消費する場合は、 [設定] を起票し、Alibaba Cloudテクニカルサポートにお問い合わせください。

例1: Beatsを使用したログのアップロード

Metricbeat、Packetbeat、Winlogbeat、Auditbeat、Filebeat、HeartbeatなどのBeatsを使用してログを収集できます。 ログが収集されたら、Kafkaプロトコルを使用してログをLog Serviceにアップロードできます。 詳細については、「Beats-Kafka-Output」をご参照ください。

  • 例 1
    • 設定例
      output.kafka: 
        # initial brokers for reading cluster metadata 
        hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
        username: "yourusername" 
        password: "yourpassword" 
        ssl.certificate_authorities: 
        # message topic selection + partitioning 
        topic: 'test-logstore-1' 
        partition.round_robin: 
          reachable_only: false 
      
        required_acks: 1 
        compression: gzip 
        max_message_bytes: 1000000
    • サンプルログ

      デフォルトでは、BeatsはJSON形式のログを提供します。 ログはLog Serviceにアップロードされ、コンテンツフィールドに保存されます。 contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。

      ビート
  • 例 2
    • 設定例
      output.kafka:
        enabled: true
        hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"]
        username: "test-project-1"
        password: "access-key-id#access-key-secret"
        ssl.certificate_authorities:
        topic: 'test-logstore-1'
        ヘッダー:
          -key: "data-parse-format"
            値: "json"
        パーティション。ハッシュ:
          reachable_only: false
    • サンプルログ
      Youを設定できますヘッダパラメータに自動的にJSONログ。 ログの収集

例2: Collectdを使用したログのアップロード

collectdは、システムとアプリケーションのメトリックを定期的に収集するデーモンプロセスです。 Kafkaプロトコルを使用して、収集したメトリクスをLog Serviceにアップロードできます。 collectdの詳細については、「collectd」をご参照ください。 Log Serviceにメトリクスをアップロードする方法の詳細については、「Kafkaプラグインの書き込み」をご参照ください。

collectdを使用して収集されたログをLog Serviceにアップロードする前に、collectd-write_kafkaプラグインと関連する依存関係をインストールする必要があります。 CentOSサーバーでは、sudo yum install collectd-write_kafkaコマンドを実行して、collectd-write_kafkaプラグインをインストールできます。 Forより情報をインストールする方法についてRPM Package Manager (RPM) パッケージ、訪問RPMリソースcollectd読み書き_kafka

  • 設定例

    Inこの例、collectd JSON提供フォーマットログ。 コマンド形式とGraphite形式のログもサポートされています。 詳細は、「collectd」をご参照ください。

    <Plugin write_kafka>
      プロパティ "metadata.br oker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
      Property "security.protocol" "sasl_ssl" 
      Property "sasl.mechanism" "PLAIN" 
      プロパティ "sasl.us ername" "yourusername" 
      プロパティ "sasl.password" "yourpassword" 
      Property "broker.address.family" "v4"  
      <Topic "test-logstore-1">
        Format JSON 
        Key "content"  
      </トピック>
    </プラグイン>
                        
  • サンプルログ

    JSON形式のログがLog Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。

    収集

例3: Telegrafを使用したログのアップロード

TelegrafはGoプログラミング言語のエージェントであり、メトリックの収集、処理、および集計に使用されます。 Telegrafは少量のメモリリソースしか消費しません。 詳細については、「Telegraf」をご参照ください。 Telegrafは、さまざまなプラグインと統合機能を提供します。 Telegrafを使用して、Telegrafが実行されているシステムまたはサードパーティAPIからメトリックを取得できます。 Telegrafを使用して、StatsDおよびKafkaコンシューマを使用してメトリックを監視することもできます。

Telegrafを使用して収集されたログをLog Serviceにアップロードする前に、Telegrafの設定ファイルを変更する必要があります。

  • 設定例
    この例では、TelegrafはJSON形式のログを提供します。 グラファイト-およびCarbon2-formattedログもサポートされています。 詳細については、「Telegraf」をご参照ください。
    説明 tls_caに有効なパスを指定する必要があります。 サーバー上のルート証明書へのパスを指定できます。 ほとんどの場合、Linuxサーバーのルート証明書へのパスは /etc/ssl/certs/ca-bundle.crtです。
    [[outputs.kafka]] 
      ## URLs of kafka brokers 
      brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
      ## Kafka topic for producer messages 
      topic = "test-logstore-1" 
      routing_key = "content" 
      ## CompressionCodec represents the various compression codecs recognized by 
      ## Kafka in messages. 
      ## 0 : No compression 
      ## 1 : Gzip compression 
      ## 2 : Snappy compression 
      ## 3 : LZ4 compression 
      compression_codec = 1 
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" 
      # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" 
      ## Use TLS but skip chain & host verification 
      # insecure_skip_verify = false 
      ## Optional SASL Config 
      sasl_username = "yourusername" 
      sasl_password = "yourpassword" 
      ## Data format to output. 
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md 
      data_format = "json"
  • サンプルログ

    JSON形式のログがLog Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。

    テレグラフ

例4: Fluentdを使用したログのアップロード

Fluentdはオープンソースのログコレクターです。 FluentdはCloud Native Computing Foundation (CNCF) のプロジェクトであり、すべてのコンポーネントはApache 2ライセンスで利用できます。 詳細については、「Fluentd」をご参照ください。

Fluentdは、さまざまな入力、処理、および出力プラグインをサポートします。 Fluentdを使用してログを収集し、fluent-plugin-kafkaプラグインを使用して収集したログをLog Serviceにアップロードできます。 プラグインのインストールと設定のみが必要です。 詳細については、「fluent-plugin-kafka」をご参照ください。

  • 設定例
    この例では、FluentdはJSON形式のログを提供します。 数十のフォーマットがサポートされています。 詳細については、「Fluentd Formatter」をご参照ください。
    <match **>
      @type kafka 
      # ブローカー: ブローカーまたは飼育係を選択できます。 
      ブローカーs test-project-1.cn-hangzhou.log.aliyuncs.com:10012 
      default_topic test-logstore-1 
      default_message_key content 
      output_data_type json 
      output_include_tag true 
      output_include_time true 
      sasl_over_ssl true 
      username yourusername // yourusernameを実際の値に置き換えます。 
      password yourpassword // yourpasswordを実際の値に置き換えます。 
      ssl_ca_certs_from_system true 
      # ruby-kafka producer options 
      max_send_retries 10000 
      required_acks 1 
      compression_codec gzip
    </match>
  • サンプルログ
    JSON形式のログがLog Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。 Fluentd

例5: Logstashを使用したログのアップロード

Logstashは、リアルタイム処理機能を提供するオープンソースのログ収集エンジンです。 Logstashを使用して、さまざまなソースからログを動的に収集できます。 詳細は、「Logstash」をご参照ください。

Logstash は、組み込みの Kafka 出力プラグインを提供します。 Kafkaプロトコルを使用して、ログを収集し、収集したログをLog ServiceにアップロードするようにLogstashを設定できます。 Log Serviceは、データ送信中にSASL_SSLプロトコルを使用します。 SSL証明書とJava Authentication and Authorization Service (JAAS) ファイルを設定する必要があります。
  • 設定例
    1. JAASファイルを作成し、ディレクトリに保存します。 例: /etc/kafka/kafka_client_jaas.conf
      JAASファイルに次のコンテンツを追加します。
      KafkaClient { 
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="yourusername" 
        password="yourpassword";
      };
    2. SSL証明書を設定し、証明書をディレクトリに保存します。 例: /etc/kafka/client-root.truststore.jks
      ルート証明書をダウンロードし、証明書をディレクトリに保存します。 例: /etc/kafka/root.pem keytoolコマンドを実行して、ファイルを生成します。jks形式。 初めてコマンドを実行してファイルを生成するときは、パスワードを設定する必要があります。
      keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
    3. Logstashを設定します。
      この例では、LogstashはJSON形式のログを提供します。 数十のフォーマットがサポートされています。 詳細については、「Logstashコーデック」をご参照ください。
      説明 次の設定は、接続テストに使用されます。 本番環境では、stdoutフィールドを削除することを推奨します。
      input { stdin { } }
      output { 
        stdout { codec => rubydebug } 
        kafka { 
          topic_id => "test-logstore-1" 
          bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
          security_protocol => "SASL_SSL" 
          ssl_truststore_location => "/etc/client-root.truststore.jks" 
          ssl_truststore_password => "123456" 
          jaas_path => "/etc/kafka_client_jaas.conf" 
          sasl_mechanism => "PLAIN" 
          codec => "json" 
          client_id => "kafka-logstash" 
        } 
      }
  • サンプルログ
    JSON形式のログがLog Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSON データ型」をご参照ください。 Logstash

例6: Kafkaプロデューサーを使用したログのアップロード

Kafkaプロデューサーを使用してログをLog Serviceにアップロードする場合、収集設定でtopicまたはheadersパラメーターを設定して、JSONログを自動的に展開できます。

  • 設定例
    public static void produce(){
        // 設定情報。 
        Properties props2 = new Properties();
    
        String project = "etl-test-tlj";
        文字列トピック="test3.json";
        props2.put("bootstrap.servers" 、"kafka.log.aliyuncs.com:9093");
        props2.put("security.protocol" 、"sasl_ssl");
        props2.put("sasl.mechanism" 、"PLAIN");
        props2.put("sasl.jaas.config" 、
                "org.apache.kafka.com mon.security.plain.PlainLoginModuleにはusername=\" "+ project +"\"password=\" access-key-id#access-key-secret\";" が必要です。
    
        // データキーと値のシリアル化クラスを指定します。 
        props2.put("key.serializer" 、StringSerializer.class);
        props2.put("value.serializer" 、StringSerializer.class);
    
        // プロデューサーインスタンスを作成します。 
        KafkaProducer<String,String> producer = new KafkaProducer<>(props2);
    
        // レコードを送信します。 
        for(int i=0;i<1;i ++){
            ProducerRecord record = new ProducerRecord<String, String>(topic, "{\" logName\": \" error4\"}");
            record.headers().add(new RecordHeader("data-parse-format","json".getBytes()));
            producer.send (レコード);
        }
        producer.close();
    }
  • サンプルログログの収集

エラーメッセージ

Kafkaプロトコルを使用してログをアップロードできない場合は、エラーメッセージが返されます。 次の表に、エラーメッセージを示します。 詳細については、「エラーリスト」をご参照ください。
エラーメッセージ説明ソリューション
NetworkExceptionネットワーク例外が発生した場合に返されるエラーメッセージ。 Try再び後1秒。
TopicAuthorizationException認証が失敗した場合に返されるエラーメッセージ。 AccessKeyペアが無効であるか、指定されたプロジェクトまたはLogstoreにデータを書き込む権限がない場合、認証は失敗します。 この場合、有効なAccessKeyペアを入力し、そのペアに必要な書き込み権限があることを確認します。
UnknownTopicOrPartitionException次のいずれかのエラーが発生した場合に返されるエラーメッセージ。
  • 指定されたプロジェクトまたはLogstoreは存在しません。
  • 指定されたプロジェクトが存在するリージョンは、指定されたエンドポイントのリージョンとは異なります。

指定したプロジェクトまたはLogstoreが存在することを確認します。 指定されたプロジェクトまたはLogstoreが作成されてもエラーが続く場合は、指定されたプロジェクトが存在するリージョンが指定されたエンドポイントのリージョンと同じかどうかを確認します。

KafkaStorageExceptionサーバーエラーが発生した場合に返されるエラーメッセージ。 1秒後にもう一度お試しください。