All Products
Search
Document Center

Simple Log Service:Use the Kafka protocol to upload logs

Last Updated:Sep 27, 2024

You can collect logs by using tools such as KafkaProducer SDK, Beats, collectd, Fluentd, Logstash, Telegraf, and Vector, and upload the logs to Simple Log Service by using the Kafka protocol. This topic describes how to use a log collection tool to collect logs and upload the collected logs to Simple Log Service by using the Kafka protocol.

Limits

  • Only Kafka 2.1.0 and later versions are supported.

  • You must use the SASL_SSL protocol to ensure the security of log transmission.

Data parsing

Logs that are uploaded by using the Kafka protocol are stored in the content field. If the logs are of the JSON type, you can configure a JSON index for the content field. For more information, see JSON type.

If you use a KafkaProducer SDK or Beats to collect logs, you can configure the topic or headers parameter in the collection configuration to automatically display logs in the JSON format. In this case, Simple Log Service automatically expands the content field. You do not need to configure a JSON index for the content field. For more information, see Configuration method.

Configuration method

When you use the Kafka protocol to upload logs to Simple Log Service, you must configure the related parameters. The following table describes the parameters.

Note

The parameter names vary based on the log collection tool. Configure the parameters based on your business scenario.

Parameter

Description

Connection type

The security protocol. You must use the SASL_SSL protocol to ensure the security of log transmission.

hosts

The address to which an initial connection is established. You can specify the endpoint of a Simple Log Service project in the Project name.Endpoint format. For more information, see Endpoints.

  • Example of an internal endpoint: test-project-1.cn-hangzhou-intranet.log.aliyuncs.com:10011. The port number is 10011.

  • Example of a public endpoint: test-project-1.cn-hangzhou.log.aliyuncs.com:10012. The port number is 10012.

topic

The name of the Logstore.

If you use a KafkaProducer SDK or Beats to collect logs and specify the output format as JSON, you can set the topic parameter to a value in the Logstore name.json format to automatically expand JSON logs. For more information, see Example 6: Use a KafkaProducer SDK to upload logs.

headers

If you use a KafkaProducer SDK or Beats to collect logs and specify the output format as JSON, you can set the headers parameter to the following content to automatically expand JSON logs:

  headers:
    - key: "data-parse-format"
      value: "json"

For more information, see Example 1: Use Beats to upload logs.

username

The name of the project.

password

The AccessKey pair. You must specify a value in the ${access-key-id}#${access-key-secret} format. Replace ${access-key-id} and ${access-key-secret} with your AccessKey ID and AccessKey secret. We recommend that you use the AccessKey pair of a Resource Access Management (RAM) user. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service.

Certificate file

The certificate file of the endpoint. Each endpoint of Simple Log Service has a certificate. Set this parameter to the path to the root certificate on your server. Example: /etc/ssl/certs/ca-bundle.crt.

Note

If you want to use a Kafka consumer group to consume data from Simple Log Service in real time, submit a ticket to contact Alibaba Cloud technical support.

Example 1: Use Beats to upload logs

You can use Beats such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat to collect logs. After the logs are collected, you can use the Kafka protocol to upload the logs to Simple Log Service. For more information, see Beats-Kafka-Output.

  • Example 1:

    • Sample configuration

      output.kafka: 
        # initial brokers for reading cluster metadata 
        hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
        username: "yourusername" 
        password: "yourpassword" 
        ssl.certificate_authorities: 
        # message topic selection + partitioning 
        topic: 'test-logstore-1' 
        partition.round_robin: 
          reachable_only: false 
      
        required_acks: 1 
        compression: gzip 
        max_message_bytes: 1000000
    • Sample log

      By default, Beats provides JSON-formatted logs. The logs are uploaded to Simple Log Service and stored in the content field. You can create a JSON index for the content field. For more information, see JSON type.

      Beats系列软件

  • Example 2:

    • Sample configuration

      output.kafka:
        enabled: true
        hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"]
        username: "test-project-1"
        password: "access-key-id#access-key-secret"
        ssl.certificate_authorities:
        topic: 'test-logstore-1'
        headers:
          - key: "data-parse-format"
            value: "json"
        partition.hash:
          reachable_only: false
    • Sample log

      You can configure the headers parameter to automatically expand JSON logs.采集日志

Example 2: Use collectd to upload logs

collectd is a daemon process that periodically collects the performance metrics of systems and applications. You can upload the collected metrics to Simple Log Service by using the Kafka protocol. For more information, see Write Kafka Plugin.

Before you upload the logs that are collected by collectd to Simple Log Service, you must install the collectd-write_kafka plug-in and related dependencies. For example, on a CentOS Linux server, you can run the sudo yum install collectd-write_kafka command to install the collectd-write_kafka plug-in. For more information about how to install RPM Package Manager (RPM) packages, visit Collectd-write_kafka.

  • Sample configuration

    collectd supports various formats, including JSON, Command, and Graphite. In this example, the JSON format is used. For more information, see collectd documentation.

    <Plugin write_kafka>
      Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
      Property "security.protocol" "sasl_ssl" 
      Property "sasl.mechanism" "PLAIN" 
      Property "sasl.username" "yourusername" 
      Property "sasl.password" "yourpassword" 
      Property "broker.address.family" "v4"  
      <Topic "test-logstore-1">
        Format JSON 
        Key "content"  
      </Topic>
    </Plugin>
                        
  • Sample log

    After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.

    Collectd

Example 3: Use Telegraf to upload logs

Telegraf is an agent written in the Go programming language and is used to collect, process, and aggregate metrics. Telegraf consumes only a small amount of memory resources. For more information, see Telegraf. Telegraf provides various plug-ins and integration capabilities. You can use Telegraf to retrieve metrics from the systems on which Telegraf runs or by calling third-party APIs. You can also use Telegraf to monitor metrics by using StatsD and Kafka consumers.

Before you upload the logs that are collected by using Telegraf to Simple Log Service, you must modify the configuration file of Telegraf.

  • Sample configuration

    Telegraf supports various formats, including JSON, Carbon2, and Graphite. In this example, the JSON format is used. For more information, see Output Data Formats of Telegraf.

    Note

    You must specify a valid path for tls_ca. You can specify the path to the root certificate on your server. In most cases, the path to the root certificate on a Linux server is /etc/ssl/certs/ca-bundle.crt.

    [[outputs.kafka]] 
      ## URLs of kafka brokers 
      brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
      ## Kafka topic for producer messages 
      topic = "test-logstore-1" 
      routing_key = "content" 
      ## CompressionCodec represents the various compression codecs recognized by 
      ## Kafka in messages. 
      ## 0 : No compression 
      ## 1 : Gzip compression 
      ## 2 : Snappy compression 
      ## 3 : LZ4 compression 
      compression_codec = 1 
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" 
      # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" 
      ## Use TLS but skip chain & host verification 
      # insecure_skip_verify = false 
      ## Optional SASL Config 
      sasl_username = "yourusername" 
      sasl_password = "yourpassword" 
      ## Data format to output. 
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md 
      data_format = "json"
  • Sample log

    After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.

    Telegraf

Example 4: Use Fluentd to upload logs

Fluentd is an open source log collector. It is a project under the Cloud Native Computing Foundation (CNCF). All components of Fluentd are available under the Apache 2 License. For more information, see Fluentd.

Fluentd supports various input, processing, and output plug-ins. You can collect logs by using Fluentd and upload the collected logs to Simple Log Service by using the fluent-plugin-kafka plug-in. You need to only install and configure the plug-in. For more information, see fluent-plugin-kafka.

  • Sample configuration

    Fluentd supports dozens of formats. In this example, the JSON format is used. For more information, see Fluentd Formatter.

    <match **>
      @type kafka 
      # Brokers: you can choose either brokers or zookeeper. 
      brokers      test-project-1.cn-hangzhou.log.aliyuncs.com:10012 
      default_topic test-logstore-1 
      default_message_key content 
      output_data_type json 
      output_include_tag true 
      output_include_time true 
      sasl_over_ssl true 
      username yourusername  // The username. Replace yourusername with the actual value. 
      password "yourpassword"   // The password. Replace yourpassword with the actual value. 
      ssl_ca_certs_from_system true 
      # ruby-kafka producer options 
      max_send_retries 10000 
      required_acks 1 
      compression_codec gzip 
    </match>
  • Sample log

    After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.Fluentd

Example 5: Use Logstash to upload logs

Logstash is an open source log collection engine that provides real-time processing capabilities. You can use Logstash to dynamically collect logs from different sources. For more information, see Logstash.

Logstash provides a built-in Kafka output plug-in. You can configure Logstash to collect logs and upload the collected logs to Simple Log Service by using the Kafka protocol. Simple Log Service uses the SASL_SSL protocol during data transmission. You must configure an SSL certificate and a Java Authentication and Authorization Service (JAAS) file.

  • Sample configuration

    1. Create a JAAS file and save the file to a directory. Example: /etc/kafka/kafka_client_jaas.conf.

      Add the following content to the JAAS file:

      KafkaClient { 
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="yourusername" 
        password="yourpassword"; 
      };
    2. Configure an SSL certificate and save the certificate to a directory. Example: /etc/kafka/client-root.truststore.jks.

      Download the root certificate and save the certificate to a directory. Example: /etc/kafka/root.pem. Run a keytool command to generate a file in the .jks format. The first time you run the command to generate a file, you must configure a password.

      keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
    3. Configure Logstash.

      Logstash supports dozens of formats. In this example, the JSON format is used For more information, see Logstash Codec.

      Note

      The following configuration is used to test network connectivity. In a production environment, we recommend that you remove the stdout field.

      output { 
        stdout { codec => rubydebug } 
        kafka { 
          topic_id => "test-logstore-1" 
          bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
          security_protocol => "SASL_SSL" 
          ssl_truststore_location => "/etc/client-root.truststore.jks" 
          ssl_truststore_password => "123456" 
          jaas_path => "/etc/kafka_client_jaas.conf" 
          sasl_mechanism => "PLAIN" 
          codec => "json" 
          client_id => "kafka-logstash" 
        } 
      }
  • Sample log

    After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.Logstash

Example 6: Use a KafkaProducer SDK to upload logs

  • Sample configuration

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // Configuration information. 
            Properties props = new Properties();
            String project = "etl-dev";
            String logstore = "testlog";
            // Set the following parameter to true if you want to automatically expand JSON logs:
            boolean parseJson = true;
            // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Simple Log Service is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. 
            // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can save your AccessKey ID and AccessKey secret in your configuration file if required. 
            // To prevent key leaks, we recommend that you do not save your AccessKey ID and AccessKey secret in the code.
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-huhehaote.log.aliyuncs.com"; // The endpoint varies based on the region of your Simple Log Service project.
            String port = "10012"; // For a public endpoint, set the port number to 10012. For an internal endpoint, set the port number to 10011.
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put("bootstrap.servers", hosts);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put("enable.idempotence", "false"); // The Kafka write interface of Simple Log Service does not support transactions.
    
            // Specify the serialization class for data keys and values. 
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", StringSerializer.class);
    
            // Create a producer instance. 
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            // Send records.
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                ProducerRecord record = new ProducerRecord<String, String>(topic, content);
                producer.send(record);
            }
            producer.close();
        }
    }
  • POM dependency

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • Sample log

    image

Example 7: Use Fluent Bit to upload logs

Fluent Bit is a lightweight and highly scalable logging and metrics processor and forwarder. Fluent Bit supports various input, processing, and output plug-ins. You can use the Kafka output plug-in to upload logs to Simple Log Service. For more information, see Kafka output plugin.

  • Sample configuration

    For more information about the configuration method, see Configuration method.

    [Output]
        Name    kafka
        Match    *
        Brokers   etl-shanghai.cn-shanghai.log.aliyuncs.com:10012
        Topics    etl-out
        Format    json
        rdkafka.sasl.username   yourusername 
        rdkafka.sasl.password   yourpassword  
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism     PLAIN
  • Sample logFluent-bit

Example 8: Use Vector to upload logs

Vector is a lightweight and high-performance log processing tool that can report logs by using the Kafka protocol. For more information, see Vector. The following example shows the configuration for Vector to write logs to Simple Log Service by using the Kafka protocol.

  • Sample configuration

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["file_logs"] # The source. In this example, a log file is monitored.
      bootstrap_servers = "etl-dev.cn-huhehaote.log.aliyuncs.com:10012"
      compression = "gzip"
      healthcheck = true
      topic = "dst-kafka.json" # dst-kafka is the name of the Logstore. The .json suffix indicates that JSON logs are automatically expanded.
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "etl-dev" # etl-dev is the name of the project.
      sasl.password = "{{The AccessKey ID and AccessKey secret of the RAM user in the {AK#SK} format.}}"
      tls.enabled = true
  • Sample log

    image.png

Errors

If a log fails to be uploaded by using the Kafka protocol, an error is reported. The following table describes the errors. For more information, see Error list.

Error

Description

Solution

NetworkException

A network exception occurred.

Wait for 1 second and try again.

TopicAuthorizationException

Authentication failed.

If your AccessKey pair is invalid or the AccessKey pair does not have permissions to write data to the specified project or Logstore, authentication fails. In this case, enter a valid AccessKey pair and make sure that the AccessKey pair has the required write permissions.

UnknownTopicOrPartitionException

One of the following errors occurred:

  • The specified project or Logstore does not exist.

  • The region where the specified project resides is different from the region of the specified endpoint.

Make sure that the specified project or Logstore exists. If the specified project or Logstore exists but the error persists, check whether the region where the specified project resides is the same as the region of the specified endpoint.

KafkaStorageException

A server error occurred.

Wait for 1 second and try again.