All Products
Search
Document Center

Simple Log Service:Use the Kafka protocol to upload logs

Last Updated:Feb 29, 2024

You can collect logs by using tools such as KafkaProducer SDKs, Beats, Collectd, Fluentd, Logstash, Telegraf, and Vector, and upload the logs to Simple Log Service by using the Kafka protocol. This topic describes how to use a log collection tool to collect logs and upload the collected ogs to Simple Log Service by using the Kafka protocol.

Limits

  • Only Kafka 0.8.0 to Kafka 2.1.1 (message format version 2) are supported.

  • You must use the SASL_SSL protocol to ensure the security of log transmission.

Data parsing

Logs that are uploaded by using the Kafka protocol are stored in the content field. If the logs are of the JSON type, you can set a JSON index for the content field. For more information, see JSON type.

If you use a Kafka producer or Beats to upload logs, you can configure the topic or headers parameter in the collection configuration to automatically display logs in the JSON format. Simple Log Service automatically expands the content field. You do not need to configure a JSON index for the content field. For more information, see Configurations.

Configurations

You must configure the parameters described in the following table when you use the Kafka protocol to upload logs to Simple Log Service.

Note

The parameter names vary based on the log collection tool. Configure the parameters based on your business scenario.

Parameter

Description

Connection type

The security protocol. You must use the SASL_SSL protocol to ensure the security of log transmission.

hosts

The address to which an initial connection is established. You can specify the endpoint of a Simple Log Service project in the Project name.Endpoint format. The endpoint varies based on the region of your Simple Log Service project. For more information, see Endpoints.

  • Example of an internal endpoint: test-project-1.cn-hangzhou-intranet.log.aliyuncs.com:10011. The port number is 10011.

  • Example of a public endpoint: test-project-1.cn-hangzhou.log.aliyuncs.com:10012. The port number is 10012.

topic

The name of the Logstore.

If you use a Kafka producer or Beats to upload logs and specify the output format as JSON, you can set the topic parameter to a value in the Logstore name.json format to automatically expand JSON logs. For more information, see Example 6: Use a Kafka producer to upload logs.

headers

If you use a Kafka producer or Beats to upload logs and specify the output format as JSON, you can specify the following value for the headers parameter to automatically expand JSON logs:

  headers:
    - key: "data-parse-format"
      value: "json"

For more information, see Example 1: Use Beats to upload logs.

username

The name of the project.

password

The AccessKey pair that is in the ${access-key-id}#${access-key-secret} format. Replace ${access-key-id} and ${access-key-secret} with your AccessKey ID and AccessKey secret. We recommend that you use the AccessKey pair of a RAM user. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service.

Certificate file

The certificate file of the endpoint. Each endpoint of Simple Log Service has a certificate. Set this parameter to the path to the root certificate on your server. Example: /etc/ssl/certs/ca-bundle.crt.

Note

If you want to use a Kafka consumer group to consume data from Simple Log Service in real time, submit a ticket to contact Alibaba Cloud technical support.

Example 1: Use Beats to upload logs

You can use Beats such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat to collect logs. After the logs are collected, you can use the Kafka protocol to upload the logs to Simple Log Service. For more information, see Beats-Kafka-Output.

  • Example 1:

    • Configuration example

      output.kafka: 
        # initial brokers for reading cluster metadata 
        hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
        username: "yourusername" 
        password: "yourpassword" 
        ssl.certificate_authorities: 
        # message topic selection + partitioning 
        topic: 'test-logstore-1' 
        partition.round_robin: 
          reachable_only: false 
      
        required_acks: 1 
        compression: gzip 
        max_message_bytes: 1000000
    • Sample log

      By default, Beats provides JSON-formatted logs. The logs are uploaded to Simple Log Service and stored in the content field. You can create a JSON index for the content field. For more information, see JSON type.

      Beats系列软件

  • Example 2

    • Configuration example

      output.kafka:
        enabled: true
        hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"]
        username: "test-project-1"
        password: "access-key-id#access-key-secret"
        ssl.certificate_authorities:
        topic: 'test-logstore-1'
        headers:
          - key: "data-parse-format"
            value: "json"
        partition.hash:
          reachable_only: false
    • Sample log

      You can configure the headers parameter to automatically expand JSON logs.采集日志

Example 2: Use Collectd to upload logs

collectd is a daemon process that periodically collects the performance metrics of systems and applications. You can upload the collected metrics to Simple Log Service by using the Kafka protocol. For more information, see Write Kafka Plugin.

Before you upload the logs that are collected by using collectd to Simple Log Service, you must install the collectd-write_kafka plug-in and related dependencies. For example, on a CentOS Linux server, you can run the sudo yum install collectd-write_kafka command to install the collectd-write_kafka plug-in. For more information about how to install RPM Package Manager (RPM) packages, visit Collectd-write_kafka.

  • Configuration example

    In this example, collectd provides JSON-formatted logs. Command- and Graphite-formatted logs are also supported. For more information, see collectd documentation.

    <Plugin write_kafka>
      Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
      Property "security.protocol" "sasl_ssl" 
      Property "sasl.mechanism" "PLAIN" 
      Property "sasl.username" "yourusername" 
      Property "sasl.password" "yourpassword" 
      Property "broker.address.family" "v4"  
      <Topic "test-logstore-1">
        Format JSON 
        Key "content"  
      </Topic>
    </Plugin>
                        
  • Sample log

    After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.

    Collectd

Example 3: Use Telegraf to upload logs

Telegraf is an agent in the Go programming language and is used to collect, process, and aggregate metrics. Telegraf consumes only a small amount of memory resources. For more information, see Telegraf. Telegraf provides various plug-ins and integration capabilities. You can use Telegraf to retrieve metrics from the systems on which Telegraf runs or from third-party APIs. You can also use Telegraf to monitor metrics by using StatsD and Kafka consumers.

Before you upload the logs that are collected by using Telegraf to Simple Log Service, you must modify the configuration file of Telegraf.

  • Configuration example

    In this example, Telegraf provides JSON-formatted logs. Graphite- and Carbon2-formatted logs are also supported. For more information, see Output Data Formats of Telegraf.

    Note

    You must specify a valid path for tls_ca. You can specify the path to the root certificate on your server. In most cases, the path to the root certificate on a Linux server is /etc/ssl/certs/ca-bundle.crt.

    [[outputs.kafka]] 
      ## URLs of kafka brokers 
      brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
      ## Kafka topic for producer messages 
      topic = "test-logstore-1" 
      routing_key = "content" 
      ## CompressionCodec represents the various compression codecs recognized by 
      ## Kafka in messages. 
      ## 0 : No compression 
      ## 1 : Gzip compression 
      ## 2 : Snappy compression 
      ## 3 : LZ4 compression 
      compression_codec = 1 
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" 
      # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" 
      ## Use TLS but skip chain & host verification 
      # insecure_skip_verify = false 
      ## Optional SASL Config 
      sasl_username = "yourusername" 
      sasl_password = "yourpassword" 
      ## Data format to output. 
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md 
      data_format = "json"
  • Sample log

    After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.

    Telegraf

Example 4: Use Fluentd to upload logs

Fluentd is an open source log collector. Fluentd is a project under the Cloud Native Computing Foundation (CNCF), and all components are available under the Apache 2 License. For more information, see Fluentd.

Fluentd supports various input, processing, and output plug-ins. You can collect logs by using Fluentd and upload the collected logs to Simple Log Service by using the fluent-plugin-kafka plug-in. You need to only install and configure the plug-in. For more information, see fluent-plugin-kafka.

  • Configuration example

    In this example, Fluentd provides JSON-formatted logs. Dozens of formats are supported. For more information, see Fluentd Formatter.

    <match **>
      @type kafka 
      # Brokers: you can choose either brokers or zookeeper. 
      brokers      test-project-1.cn-hangzhou.log.aliyuncs.com:10012 
      default_topic test-logstore-1 
      default_message_key content 
      output_data_type json 
      output_include_tag true 
      output_include_time true 
      sasl_over_ssl true 
      username yourusername  // The username. Replace yourusername with the actual value. 
      password "yourpassword"   // The password. Replace yourpassword with the actual value. 
      ssl_ca_certs_from_system true 
      # ruby-kafka producer options 
      max_send_retries 10000 
      required_acks 1 
      compression_codec gzip 
    </match>
  • Sample log

    After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.Fluentd

Example 5: Use Logstash to upload logs

Logstash is an open source log collection engine that provides real-time processing capabilities. You can use Logstash to dynamically collect logs from different sources. For more information, see Logstash.

Logstash provides a built-in Kafka output plug-in. You can configure Logstash to collect logs and upload the collected logs to Simple Log Service by using the Kafka protocol. Simple Log Service uses the SASL_SSL protocol during data transmission. You must configure an SSL certificate and a Java Authentication and Authorization Service (JAAS) file.

  • Configuration example

    1. Create a JAAS file and save the file to a directory. Example: /etc/kafka/kafka_client_jaas.conf.

      Add the following content to the JAAS file:

      KafkaClient { 
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="yourusername" 
        password="yourpassword"; 
      };
    2. Configure an SSL certificate and save the certificate to a directory. Example: /etc/kafka/client-root.truststore.jks.

      Download the root certificate and save the certificate to a directory. Example: /etc/kafka/root.pem. Run a keytool command to generate a file in the .jks format. When you run the command to generate a file for the first time, you must configure a password.

      keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
    3. Configure Logstash.

      In this example, Logstash provides JSON-formatted logs. Dozens of formats are supported. For more information, see Logstash Codec.

      Note

      The following configuration is used for a connectivity test. In a production environment, we recommend that you remove the stdout field.

      input { stdin { } } 
      output { 
        stdout { codec => rubydebug } 
        kafka { 
          topic_id => "test-logstore-1" 
          bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
          security_protocol => "SASL_SSL" 
          ssl_truststore_location => "/etc/client-root.truststore.jks" 
          ssl_truststore_password => "123456" 
          jaas_path => "/etc/kafka_client_jaas.conf" 
          sasl_mechanism => "PLAIN" 
          codec => "json" 
          client_id => "kafka-logstash" 
        } 
      }
  • Sample log

    After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.Logstash

Example 6: Use a Kafka producer to upload logs

If you use a Kafka producer to upload logs to Simple Log Service, you can configure the topic or headers parameter in the collection configuration to automatically expand JSON logs.

  • Configuration example

    public static void produce(){
        // The configuration information. 
        Properties props2 = new Properties();
    
        String project = "etl-test-tlj";
        String topic = "test3.json";
        props2.put("bootstrap.servers", "kafka.log.aliyuncs.com:9093");
        props2.put("security.protocol", "sasl_ssl");
        props2.put("sasl.mechanism", "PLAIN");
        props2.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ project +"\" password=\"access-key-id#access-key-secret\";");
    
        // Specify the serialization class for data keys and values. 
        props2.put("key.serializer", StringSerializer.class);
        props2.put("value.serializer", StringSerializer.class);
    
        // Create a producer instance. 
        KafkaProducer<String,String> producer = new KafkaProducer<>(props2);
    
        // Send records. 
        for(int i=0;i<1;i++){
            ProducerRecord record = new ProducerRecord<String, String>(topic, "{\"logName\": \"error4\"}");
            record.headers().add(new RecordHeader("data-parse-format","json".getBytes()));
            producer.send(record);
        }
        producer.close();
    }
  • Sample log采集日志

Example 7: Use Fluent Bit to upload logs

Fluent Bit is a lightweight and highly scalable logging and metrics processor and forwarder. Fluent Bit supports multiple plug-ins for input, processing, and output. You can use the Kafka plug-in to upload logs to Simple Log Service. For more information, see Kafka output plugin.

  • Configuration example

    For more information about the configurations, see Configurations.

    [Output]
        Name    kafka
        Match    *
        Brokers   etl-shanghai.cn-shanghai.log.aliyuncs.com:10012
        Topics    etl-out
        Format    json
        rdkafka.sasl.username   yourusername 
        rdkafka.sasl.password   yourpassword  
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism     PLAIN
  • Sample logFluent-bit

Example 8: Configure Vector to upload logs by using the Kafka protocol

Vector is a lightweight and high-performance log processing tool that can be used to report logs by using the Kafka protocol. For more information, see Vector. The following example shows the configuration for Vector to write logs to Simple Log Service by using the Kafka protocol.

  • Configuration example

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["file_logs"] # Specifies the source. In this example, a log file is monitored.
      bootstrap_servers = "etl-dev.cn-huhehaote.log.aliyuncs.com:10012"
      compression = "gzip"
      healthcheck = true
      topic = "dst-kafka.json" # dst-kafka is the name of the Logstore. The name suffixed with .json indicates that JSON logs are automatically expanded.
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "etl-dev" # etl-dev is the name of the Simple Log Service project.
      sasl.password = "{{The AccessKey ID and AccessKey secret of the RAM user, in the {AK#SK} format.}}"
      tls.enabled = true
  • Sample log

    image.png

Error messages

If a log fails to be uploaded by using the Kafka protocol, an error message is returned. The following table describes the error messages. For more information, see Error list.

Error message

Description

Solution

NetworkException

The error message returned because a network exception occurred.

Try again after 1 second.

TopicAuthorizationException

The error message returned because the authentication failed.

If your AccessKey pair is invalid or the pair does not have permissions to write data to the specified project or Logstore, the authentication fails. In this case, enter a valid AccessKey pair and make sure that the pair has the required write permissions.

UnknownTopicOrPartitionException

The error message returned because one of the following errors occurred:

  • The specified project or Logstore does not exist.

  • The region where the specified project resides is different from the region of the specified endpoint.

Make sure that the project or Logstore that you specify exists. If the specified project or Logstore is created but the error persists, check whether the region where the specified project resides is the same as the region of the specified endpoint.

KafkaStorageException

The error message returned because a server error occurred.

Try again after 1 second.