All Products
Search
Document Center

Simple Log Service:Use the Kafka protocol to upload logs

Last Updated:Oct 29, 2025

Use log collection tools, such as the Kafka Producer SDK, Beats, Collectd, Fluentd, Logstash, Telegraf, and Vector, to upload logs to Simple Log Service (SLS) using the Kafka protocol. This topic describes how to use the Kafka protocol to upload logs from these tools to SLS.

Limits

  • The earliest supported version of the Kafka protocol is 2.1.0.

  • You must use the SASL_SSL connection protocol to ensure secure log transmission.

Permissions

You must have one of the following permissions.

  • System policies for SLS

    This policy grants permissions to manage SLS. For more information about how to grant permissions, see Grant permissions to a RAM user and Grant permissions to a RAM role.

  • Custom policies

    1. To create a custom policy, on the Script Editor tab, replace the existing content in the configuration box with the following script. For more information, see Create a custom policy.

      Note

      In the script, replace project_name with your actual project name.

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project_name",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project_name/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. Attach the custom policy that you created to a RAM user. For more information, see Grant permissions to a RAM user.

Configurations

When you use the Kafka protocol to upload logs, you must configure the following parameters.

Configuration name

Configuration value

Description

Example

SLS_KAFKA_ENDPOINT

The endpoint of the cluster that is used for the initial connection. The format is Project name.Endpoint:Port. You must configure this parameter based on the endpoint of the project. For more information, see Endpoints.

  • Internal network: The port number is 10011. Example: Project name.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Internet: The port number is 10012. Example: Project name.cn-hangzhou.log.aliyuncs.com:10012.

aliyun-project-test is the project name. cn-hangzhou-xxx.aliyuncs.com is the endpoint. 10011 and 10012 are the port numbers for the internal network and the Internet.

  • Internal network: aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Internet: aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012.

SLS_PROJECT

Project name

The name of the SLS project.

aliyun-project-test

SLS_LOGSTORE

Logstore name

The name of the logstore. If you append .json to the logstore name, SLS attempts to parse the log as a JSON log.

For example, the logstore name is test-logstore.

  • If you set the value to test-logstore, the uploaded log content is stored in the content field.

  • If you set the value to test-logstore.json, the uploaded log content is parsed as a JSON log. The keys at the first layer of the JSON data are used as field names, and the corresponding values are used as field values.

SLS_PASSWORD

The AccessKey secret that has the write permissions on SLS.

For information about what an AccessKey pair is and how to create one, see Create an AccessKey pair.

The value consists of the AccessKey ID and the AccessKey secret, separated by the # symbol.

  • AccessKey ID: the AccessKey ID of an Alibaba Cloud account or a RAM user.

  • AccessKey secret: the AccessKey secret of an Alibaba Cloud account or a RAM user.

LTAI****************#yourAccessKeySecret

Note

If you want to use a Kafka consumer group to consume data from SLS in real time, submit a ticket to consult Alibaba Cloud technical support.

Example 1: Use Beats to upload logs

Beats, such as MetricBeat, PacketBeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat, can collect logs and upload them to SLS using the Kafka protocol.

  • Configuration example

    For more information about the parameters that start with SLS_ in the example, see Configurations.

    output.kafka:
      # initial brokers for reading cluster metadata
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # message topic selection + partitioning
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

Example 2: Use Collectd to upload logs

Collectd is a daemon that periodically collects system and application performance metrics and uploads the metrics to SLS using the Kafka protocol. For more information, see Write Kafka Plugin.

When you upload logs collected by Collectd to SLS, you must also install the Kafka plugin and its dependencies. For example, in CentOS, run the sudo yum install collectd-write_kafka command to install the Kafka plugin. For more information about how to install the RPM Package Manager (RPM) package, see Collectd-write_kafka.

  • Configuration example

    • In this example, the output format is set to JSON. Other formats, such as Command and Graphite, are also supported. For more information, see the Collectd configuration document.

    • For more information about the parameters that start with SLS_ in the example, see Configurations.

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

Example 3: Use Telegraf to upload logs

Telegraf is a Go-based agent program that uses a small amount of memory to collect, process, and aggregate data metrics. Telegraf provides a wide range of plugins and integration features. Use Telegraf to obtain various metrics from its host system, retrieve metrics from third-party APIs, and listen for metrics using statsd and Kafka consumer services.

Before you upload logs collected by Telegraf to SLS using the Kafka protocol, you must modify the configuration file.

  • Configuration example

    • In this example, the output format is set to JSON. Other formats, such as Graphite and Carbon2, are also supported. For more information, see Telegraf output formats.

      Note

      You must configure a valid path for tls_ca in Telegraf. Use the path of the root certificate provided by the server. In a Linux environment, the path of the root CA certificate is typically /etc/ssl/certs/ca-bundle.crt.

    • For more information about the parameters that start with SLS_ in the example, see Configurations.

    # Kafka output plugin configuration
    [[outputs.kafka]]
      ## URLs of kafka brokers
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## Kafka topic for producer messages
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec represents the various compression codecs recognized by
      ## Kafka in messages.
      ## 0 : No compression
      ## 1 : Gzip compression
      ## 2 : Snappy compression
      ## 3 : LZ4 compression
      compression_codec = 1
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## Use TLS but skip chain & host verification
      # insecure_skip_verify = false
      ## Optional SASL Config
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## Data format to output.
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

Example 4: Use Fluentd to upload logs

Fluentd is an open source log collector and a project of the Cloud Native Computing Foundation (CNCF) developed under the Apache 2 License.

Fluentd supports a variety of input, processing, and output plugins. Use the Kafka plugin to upload logs to SLS. You only need to install and configure the Kafka plugin. For more information, see fluent-plugin-kafka.

  • Configuration example

    • In this example, the output format is set to JSON. Dozens of other formats are also supported. For more information, see Fluentd Formatter.

    • For more information about the parameters that start with SLS_ in the example, see Configurations.

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka producer options
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

Example 5: Use Logstash to upload logs

Logstash is an open source, real-time log collection engine. Use Logstash to dynamically collect logs from different sources.

Note

To upload logs using the Kafka protocol, you must use Logstash 7.10.1 or later.

Logstash has a built-in Kafka output plugin. Configure Logstash to upload logs to SLS using the Kafka protocol. Because SLS uses the SASL_SSL connection protocol, you must also configure an SSL certificate and a JAAS file.

  • Configuration example

    • In this example, the output format is set to JSON. Dozens of other formats are also supported. For more information, see Logstash Codec.

      Note

      This example shows the configurations for a connectivity test. We recommend that you delete the stdout output configuration in a production environment.

    • For more information about the parameters that start with SLS_ in the example, see Configurations.

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

Example 6: Use Fluent-bit to upload logs

Fluent-bit is a lightweight and highly scalable log and metric processor and forwarder. It supports a variety of input, processing, and output plugins. Use the Kafka plugin to upload logs to SLS. For more information, see Kafka output plugin.

  • Configuration example

    For more information about the parameters that start with SLS_ in the example, see Configurations.

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

Example 7: Configure Vector to upload logs using the Kafka protocol

Vector is a lightweight, high-performance log processing software program that supports log reporting using the Kafka protocol. The following section describes how to configure Vector to write data to SLS in Kafka-compatible mode.

  • Configuration example

    For more information about the parameters that start with SLS_ in the example, see Configurations.

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

Example 8: Use a Kafka producer to upload logs

Java

  • Dependencies

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • Sample code

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // The configurations.
            Properties props = new Properties();
            String project = "etl-shanghai-b";
            String logstore = "testlog";
            // Set this parameter to true if you want the content of the producer to be parsed as a JSON log.
            boolean parseJson = false;
            // An Alibaba Cloud account has full permissions on all API operations. This poses a high security threat. We recommend that you create and use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console.
            // This section provides an example on how to save an AccessKey ID and an AccessKey secret to environment variables. You can also save the AccessKey ID and AccessKey secret to a configuration file as needed.
            // We recommend that you do not save the AccessKey ID and AccessKey secret in the code to prevent security risks.
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-shanghai.log.aliyuncs.com"; // Configure this parameter based on the endpoint of the project.
            String port = "10012"; // Use port 10012 for the Internet and port 10011 for the internal network.
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
    
            //Create a producer instance.
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            //Send records.
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                // If needed, use the following method to set the timestamp of the message.
                // long timestamp = System.currentTimeMillis();
                // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content);
    
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, content);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("ERROR: Failed to send message: " + exception.getMessage());
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent successfully to topic: " + metadata.topic() +
                                         ", partition: " + metadata.partition() +
                                         ", offset: " + metadata.offset() +
                                         ", timestamp: " + metadata.timestamp());
                    }
                });
            }
    
            producer.close();
        }
    }
    

Python

  • Dependencies

    pip install confluent-kafka
  • Sample code

    #!/bin/env python3
    import time
    import os
    from confluent_kafka import Producer
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
    
    def main():
        project = "etl-shanghai-b"
        logstore = "testlog"
        parse_json = False
    
        # Get credentials from environment variables
        access_key_id = os.getenv("SLS_ACCESS_KEY_ID")
        access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET")
        endpoint = "cn-shanghai.log.aliyuncs.com"
        port = "10012" # Use port 10012 for the Internet and port 10011 for the internal network.
    
        hosts = f"{project}.{endpoint}:{port}"
        topic = logstore
        if parse_json:
            topic = topic + ".json"
    
        # Configure Kafka producer
        conf = {
            'bootstrap.servers': hosts,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': project,
            'sasl.password': f"{access_key_id}#{access_key_secret}",
            'enable.idempotence': False,
        }
    
        # Create producer instance
        producer = Producer(conf)
    
        # Send message
        content = "{\"msg\": \"Hello World\"}"
        producer.produce(topic=topic,
                         value=content.encode('utf-8'),
                         #timestamp=int(time.time() * 1000),  # (Optional) Set the timestamp of the record in milliseconds.
                         callback=delivery_report)
    
        # Wait for any outstanding messages to be delivered and delivery report
        # callbacks to be triggered.
        producer.flush()
    
    if __name__ == '__main__':
        main()
    

Golang

  • Dependencies

    go get github.com/confluentinc/confluent-kafka-go/kafka
  • Sample code

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    //	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    	project := "etl-shanghai-b"
    	logstore := "testlog"
    	parseJson := false
    
    	// Get credentials from environment variables
    	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
    	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
    	endpoint := "cn-shanghai.log.aliyuncs.com"
    	port := "10012" // Use port 10012 for the Internet and port 10011 for the internal network.
    
    	hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port)
    	topic := logstore
    	if parseJson {
    		topic = topic + ".json"
    	}
    
    	// Configure Kafka producer
    	config := &kafka.ConfigMap{
    		"bootstrap.servers":  hosts,
    		"security.protocol":  "sasl_ssl",
    		"sasl.mechanisms":    "PLAIN",
    		"sasl.username":      project,
    		"sasl.password":      accessKeyID + "#" + accessKeySecret,
    		"enable.idempotence": false,
    	}
    
    	// Create producer instance
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Fatalf("Failed to create producer: %v", err)
    	}
    	defer producer.Close()
    
    	// Send messages in batches.
    	messages := []string{
    		"{\"msg\": \"Hello World 1\"}",
    		"{\"msg\": \"Hello World 2\"}",
    		"{\"msg\": \"Hello World 3\"}",
    	}
    
    	for _, content := range messages {
    		err := producer.Produce(&kafka.Message{
    			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    			Value:          []byte(content),
    			//Timestamp:      time.Now(), // Set the time if needed.
    		}, nil)
    
    		if err != nil {
    			log.Printf("Failed to produce message: %v", err)
    		}
    	}
    
    	// Enable a goroutine to listen for whether the producer successfully sends the message.
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
    				} else {
    					fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
    						*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
    				}
    			}
    		}
    	}()
    
    	producer.Flush(5 * 1000)
    }

Error messages

If you fail to upload logs using the Kafka protocol, an error message is returned in the Kafka error message format. The following table describes the error messages. For more information about Kafka protocol error messages, see the error list.

Error message

Description

Recommended solution

NetworkException

A network error occurred.

Wait for 1 second and then retry.

TopicAuthorizationException

Authentication failed.

This error is usually caused by an invalid AccessKey pair or insufficient permissions to write data to the specified project or logstore. Specify a valid AccessKey pair that has write permissions.

UnknownTopicOrPartitionException

This error may occur for one of the following reasons:

  • The specified project or logstore does not exist.

  • The region of the project is different from the region specified in the endpoint.

Make sure that the specified project and logstore exist. If the error persists, check whether the project region is the same as the region specified in the endpoint.

KafkaStorageException

An exception occurred on the server.

Wait for 1 second and then retry.