You can use KafkaProducer SDKs or collection agents to collect logs and upload the collected logs to Log Service by using the Kafka protocol. This topic describes how to upload logs to Log Service by using the Kafka protocol.

Limits

  • Only Kafka 0.8.0 to Kafka 2.1.1 (message format version 2) are supported.
  • You must use the SASL_SSL protocol to ensure the security of log transmission.
  • If your Logstore contains multiple shards, you must upload logs in load balancing mode.
  • You can use the Kafka protocol to upload only the logs that are collected by using KafkaProducer SDKs or collection agents to Log Service.

Data parsing

Logs that are uploaded by using the Kafka protocol are stored in the content field. If the logs are of the JSON type, you can set a JSON index for the content field. For more information, see JSON type.

If you use a Kafka producer or Beats to upload logs, you can configure the topic or headers parameter in the collection configuration to automatically display logs in the JSON format. Log Service automatically expands the content field. In this case, you do not need to configure a JSON index for the content field. For more information, see Configurations.

Configurations

You must configure the following parameters when you use the Kafka protocol to upload logs to Log Service.
Parameter Description
Connection type The security protocol. You must use the SASL_SSL protocol to ensure the security of log transmission.
hosts The address to which an initial connection is established. You can specify the endpoint of a Log Service project in the Project name.Endpoint format. The endpoint varies based on the region of your Log Service project. For more information, see Endpoints.
  • Example of an internal endpoint: test-project-1.cn-hangzhou-intranet.log.aliyuncs.com:10011. The port number is 10011.
  • Example of a public endpoint: test-project-1.cn-hangzhou.log.aliyuncs.com:10012. The port number is 10012.
topic The name of the Logstore.

If you use a Kafka producer or Beats to upload logs and specify the output format as JSON, you can set the topic parameter to a value in the Logstore name.json format to automatically expand JSON logs. For more information, see Example 6: Use a Kafka producer to upload logs.

headers If you use a Kafka producers or Beats to upload logs and specify the output format as JSON, you can specify the following value for the headers parameter to automatically expand JSON logs:
  headers:
    - key: "data-parse-format"
      value: "json"

For more information, see Example 1: Use Beats to upload logs .

username The name of the project.
password The AccessKey pair that is in the ${access-key-id}#${access-key-secret} format. Replace ${access-key-id} and ${access-key-secret} with your AccessKey ID and AccessKey secret. We recommend that you use the AccessKey pair of a RAM user. For more information, see Create a RAM user and authorize the RAM user to access Log Service.
Certificate file The certificate file of the endpoint. Each endpoint of Log Service has a certificate. Set this parameter to the path to the root certificate on your server. Example: /etc/ssl/certs/ca-bundle.crt.
Note If you want to use a Kafka consumer group to consume data from Log Service in real time, submit a ticket to contact Alibaba Cloud technical support.

Example 1: Use Beats to upload logs

You can use Beats such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat to collect logs. After the logs are collected, you can use the Kafka protocol to upload the logs to Log Service. For more information, see Beats-Kafka-Output.

  • Example 1
    • Configuration example
      output.kafka: 
        # initial brokers for reading cluster metadata 
        hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
        username: "yourusername" 
        password: "yourpassword" 
        ssl.certificate_authorities: 
        # message topic selection + partitioning 
        topic: 'test-logstore-1' 
        partition.round_robin: 
          reachable_only: false 
      
        required_acks: 1 
        compression: gzip 
        max_message_bytes: 1000000
    • Sample log

      By default, Beats provides JSON-formatted logs. The logs are uploaded to Log Service and stored in the content field. You can create a JSON index for the content field. For more information, see JSON type.

      Beats
  • Example 2
    • Configuration example
      output.kafka:
        enabled: true
        hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"]
        username: "test-project-1"
        password: "access-key-id#access-key-secret"
        ssl.certificate_authorities:
        topic: 'test-logstore-1'
        headers:
          - key: "data-parse-format"
            value: "json"
        partition.hash:
          reachable_only: false
    • Sample log
      You can configure the headers parameter to automatically expand JSON logs. Collect logs

Example 2: Use Collectd to upload logs

collectd is a daemon process that periodically collects the metrics of systems and applications. You can upload the collected metrics to Log Service by using the Kafka protocol. For more information about collectd, see collectd. For more information about how to upload metrics to Log Service, see Write Kafka Plugin.

Before you upload the logs that are collected by using collectd to Log Service, you must install the collectd-write_kafka plug-in and related dependencies. On a CentOS server, you can run the sudo yum install collectd-write_kafka command to install the collectd-write_kafka plug-in. For more information about how to install RPM Package Manager (RPM) packages, visit RPM resource collectd-write_kafka.

  • Configuration example

    In this example, collectd provides JSON-formatted logs. Command- and Graphite-formatted logs are also supported. For more information, see collectd.

    <Plugin write_kafka>
      Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
      Property "security.protocol" "sasl_ssl" 
      Property "sasl.mechanism" "PLAIN" 
      Property "sasl.username" "yourusername" 
      Property "sasl.password" "yourpassword" 
      Property "broker.address.family" "v4"  
      <Topic "test-logstore-1">
        Format JSON 
        Key "content"  
      </Topic>
    </Plugin>
                        
  • Sample log

    After JSON-formatted logs are uploaded to Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.

    Collectd

Example 3: Use Telegraf to upload logs

Telegraf is an agent in the Go programming language and is used to collect, process, and aggregate metrics. Telegraf consumes only a small amount of memory resources. For more information, see Telegraf. Telegraf provides various plug-ins and integration capabilities. You can use Telegraf to retrieve metrics from the systems on which Telegraf runs or from third-party APIs. You can also use Telegraf to monitor metrics by using StatsD and Kafka consumers.

Before you upload the logs that are collected by using Telegraf to Log Service, you must modify the configuration file of Telegraf.

  • Configuration example
    In this example, Telegraf provides JSON-formatted logs. Graphite- and Carbon2-formatted logs are also supported. For more information, see Telegraf.
    Note You must specify a valid path for tls_ca. You can specify the path to the root certificate on your server. In most cases, the path to the root certificate on a Linux server is /etc/ssl/certs/ca-bundle.crt.
    [[outputs.kafka]] 
      ## URLs of kafka brokers 
      brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
      ## Kafka topic for producer messages 
      topic = "test-logstore-1" 
      routing_key = "content" 
      ## CompressionCodec represents the various compression codecs recognized by 
      ## Kafka in messages. 
      ## 0 : No compression 
      ## 1 : Gzip compression 
      ## 2 : Snappy compression 
      ## 3 : LZ4 compression 
      compression_codec = 1 
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" 
      # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" 
      ## Use TLS but skip chain & host verification 
      # insecure_skip_verify = false 
      ## Optional SASL Config 
      sasl_username = "yourusername" 
      sasl_password = "yourpassword" 
      ## Data format to output. 
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md 
      data_format = "json"
  • Sample log

    After JSON-formatted logs are uploaded to Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.

    Telegraf

Example 4: Use Fluentd to upload logs

Fluentd is an open source log collector. Fluentd is a project under the Cloud Native Computing Foundation (CNCF), and all components are available under the Apache 2 License. For more information, see Fluentd.

Fluentd supports various input, processing, and output plug-ins. You can collect logs by using Fluentd and upload the collected logs to Log Service by using the fluent-plugin-kafka plug-in. You need to only install and configure the plug-in. For more information, see fluent-plugin-kafka.

  • Configuration example
    In this example, Fluentd provides JSON-formatted logs. Dozens of formats are supported. For more information, see Fluentd Formatter.
    <match **>
      @type kafka 
      # Brokers: you can choose either brokers or zookeeper. 
      brokers      test-project-1.cn-hangzhou.log.aliyuncs.com:10012 
      default_topic test-logstore-1 
      default_message_key content 
      output_data_type json 
      output_include_tag true 
      output_include_time true 
      sasl_over_ssl true 
      username yourusername // Replace yourusername with the actual value. 
      password yourpassword // Replace yourpassword with the actual value. 
      ssl_ca_certs_from_system true 
      # ruby-kafka producer options 
      max_send_retries 10000 
      required_acks 1 
      compression_codec gzip 
    </match>
  • Sample log
    After JSON-formatted logs are uploaded to Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type. Fluentd

Example 5: Use Logstash to upload logs

Logstash is an open source log collection engine that provides real-time processing capabilities. You can use Logstash to dynamically collect logs from different sources. For more information, see Logstash.

Logstash provides a built-in Kafka output plug-in. You can configure Logstash to collect logs and upload the collected logs to Log Service by using the Kafka protocol. Log Service uses the SASL_SSL protocol during data transmission. You must configure an SSL certificate and a Java Authentication and Authorization Service (JAAS) file.
  • Configuration example
    1. Create a JAAS file and save the file to a directory. Example: /etc/kafka/kafka_client_jaas.conf.
      Add the following content to the JAAS file:
      KafkaClient { 
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="yourusername" 
        password="yourpassword"; 
      };
    2. Configure an SSL certificate and save the certificate to a directory. Example: /etc/kafka/client-root.truststore.jks.
      Download the root certificate and save the certificate to a directory. Example: /etc/kafka/root.pem. Run a keytool command to generate a file in the .jks format. The first time you run the command to generate a file, you must configure a password.
      keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
    3. Configure Logstash.
      In this example, Logstash provides JSON-formatted logs. Dozens of formats are supported. For more information, see Logstash Codec.
      Note The following configuration is used for a connectivity test. In a production environment, we recommend that you remove the stdout field.
      input { stdin { } } 
      output { 
        stdout { codec => rubydebug } 
        kafka { 
          topic_id => "test-logstore-1" 
          bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
          security_protocol => "SASL_SSL" 
          ssl_truststore_location => "/etc/client-root.truststore.jks" 
          ssl_truststore_password => "123456" 
          jaas_path => "/etc/kafka_client_jaas.conf" 
          sasl_mechanism => "PLAIN" 
          codec => "json" 
          client_id => "kafka-logstash" 
        } 
      }
  • Sample log
    After JSON-formatted logs are uploaded to Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type. Logstash

Example 6: Use a Kafka producer to upload logs

If you use a Kafka producer to upload logs to Log Service, you can configure the topic or headers parameter in the collection configuration to automatically expand JSON logs.

  • Configuration example
    public static void produce(){
        // The configuration information. 
        Properties props2 = new Properties();
    
        String project = "etl-test-tlj";
        String topic = "test3.json";
        props2.put("bootstrap.servers", "kafka.log.aliyuncs.com:9093");
        props2.put("security.protocol", "sasl_ssl");
        props2.put("sasl.mechanism", "PLAIN");
        props2.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ project +"\" password=\"access-key-id#access-key-secret\";");
    
        // Specify the serialization class for data keys and values. 
        props2.put("key.serializer", StringSerializer.class);
        props2.put("value.serializer", StringSerializer.class);
    
        // Create a producer instance. 
        KafkaProducer<String,String> producer = new KafkaProducer<>(props2);
    
        // Send records. 
        for(int i=0;i<1;i++){
            ProducerRecord record = new ProducerRecord<String, String>(topic, "{\"logName\": \"error4\"}");
            record.headers().add(new RecordHeader("data-parse-format","json".getBytes()));
            producer.send(record);
        }
        producer.close();
    }
  • Sample logCollect logs

Error messages

If a log fails to be uploaded by using the Kafka protocol, an error message is returned. The following table describes the error messages. For more information, see Error list.
Error message Description Solution
NetworkException The error message returned because a network exception occurred. Try again after 1 second.
TopicAuthorizationException The error message returned because the authentication failed. If your AccessKey pair is invalid or the pair does not have permissions to write data to the specified project or Logstore, the authentication fails. In this case, enter a valid AccessKey pair and make sure that the pair has the required write permissions.
UnknownTopicOrPartitionException The error message returned because one of the following errors occurred:
  • The specified project or Logstore does not exist.
  • The region where the specified project resides is different from the region of the specified endpoint.

Make sure that the project or Logstore that you specify exists. If the specified project or Logstore is created but the error persists, check whether the region where the specified project resides is the same as the region of the specified endpoint.

KafkaStorageException The error message returned because a server error occurred. Try again after 1 second.