All Products
Search
Document Center

Simple Log Service:Use the Kafka protocol to upload logs

Last Updated:Jun 03, 2026

SLS supports log ingestion through the Kafka protocol from collectors such as Kafka Producer SDK, Beats, Collectd, Fluentd, Logstash, Telegraf, Fluent-bit, and Vector.

Limits

  • The earliest supported version of the Kafka protocol is 2.1.0.

  • The SASL_SSL connection protocol is required for secure log transmission.

Permissions

Your account must have one of the following permissions.

  • System policies for SLS

    This policy grants permissions to manage SLS. For authorization instructions, see Manage RAM user permissions and Manage permissions for a RAM role.

  • Custom policies

    1. Create a custom policy. On the Script Editor tab, replace the existing content with the following script. For more information, see Create a custom policy.

      Note

      In the script, replace project_name with your actual project name.

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project_name",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project_name/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. Attach the custom policy to a RAM user. For more information, see Manage RAM user permissions.

Configuration parameters

Configure the following parameters for Kafka protocol log upload.

Configuration name

Configuration value

Description

Example

SLS_KAFKA_ENDPOINT

The initial connection endpoint in the format Project name.Endpoint:Port. Configure this parameter based on the endpoint of the project. For more information, see Endpoints.

  • Internal network: The port number is 10011. Example: Project name.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Internet: The port number is 10012. Example: Project name.cn-hangzhou.log.aliyuncs.com:10012.

aliyun-project-test is the project name. cn-hangzhou-xxx.aliyuncs.com is the endpoint. 10011 and 10012 are the port numbers for the internal network and the Internet.

  • Internal network: aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Internet: aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012.

SLS_PROJECT

Project name

The name of the SLS project.

aliyun-project-test

SLS_LOGSTORE

Logstore name

The Logstore name. If you append .json to the Logstore name, SLS attempts to parse the log content as JSON.

For example, the logstore name is test-logstore.

  • If you set the value to test-logstore, the uploaded log content is stored in the content field.

  • If you set the value to test-logstore.json, the uploaded log content is parsed as a JSON log. The keys at the first layer of the JSON data are used as field names, and the corresponding values are used as field values.

SLS_PASSWORD

The AccessKey secret with write permissions on SLS.

For information about what an AccessKey pair is and how to create one, see Create an AccessKey pair.

The value consists of the AccessKey ID and the AccessKey secret, separated by the # symbol.

  • AccessKey ID: the AccessKey ID of an Alibaba Cloud account or a RAM user.

  • AccessKey secret: the AccessKey secret of an Alibaba Cloud account or a RAM user.

LTAI****************#yourAccessKeySecret

Note

To use a Kafka consumer group to consume data from SLS in real time, submit a ticket to contact Alibaba Cloud technical support.

Example 1: Use Beats to upload logs

Beats (MetricBeat, PacketBeat, Winlogbeat, Auditbeat, Filebeat, Heartbeat) can upload logs to SLS through the Kafka protocol.

  • Configuration example

    For the SLS_-prefixed parameters in this example, see Configuration parameters.

    output.kafka:
      # initial brokers for reading cluster metadata
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # message topic selection + partitioning
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

Example 2: Use Collectd to upload logs

Collectd is a daemon that periodically collects system and application performance metrics. It uploads metrics to SLS through the Write Kafka Plugin.

Install the Kafka plugin and its dependencies. For example, in CentOS, run sudo yum install collectd-write_kafka. RPM packages are available at Collectd-write_kafka.

  • Configuration example

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

Example 3: Use Telegraf to upload logs

Telegraf is a lightweight Go-based agent for collecting, processing, and aggregating metrics from host systems, third-party APIs, and services.

Modify the Telegraf configuration file to upload logs to SLS through the Kafka protocol.

  • Configuration example

    • This example uses JSON output. Other formats (Graphite, Carbon2) are documented in Telegraf output formats.

      Note

      Configure a valid tls_ca path in Telegraf. Use the root certificate path provided by the server. In Linux, the root CA certificate path is typically /etc/ssl/certs/ca-bundle.crt.

    • For the SLS_-prefixed parameters in this example, see Configuration parameters.

    # Kafka output plugin configuration
    [[outputs.kafka]]
      ## URLs of kafka brokers
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## Kafka topic for producer messages
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec represents the various compression codecs recognized by
      ## Kafka in messages.
      ## 0 : No compression
      ## 1 : Gzip compression
      ## 2 : Snappy compression
      ## 3 : LZ4 compression
      compression_codec = 1
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## Use TLS but skip chain & host verification
      # insecure_skip_verify = false
      ## Optional SASL Config
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## Data format to output.
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

Example 4: Use Fluentd to upload logs

Fluentd is an open source log collector.

Install and configure fluent-plugin-kafka to upload logs to SLS.

  • Configuration example

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka producer options
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

Example 5: Use Logstash to upload logs

Logstash is an open source, real-time log collection engine that collects logs from different sources.

Note

Kafka protocol log upload requires Logstash 7.10.1 or later.

Logstash includes a built-in Kafka output plugin. Because SLS requires SASL_SSL, you must also configure an SSL certificate and a JAAS file.

  • Configuration example

    • This example uses JSON output. Other formats are documented in Logstash Codec.

      Note

      This example is for connectivity testing only. Remove the stdout output configuration in production environments.

    • For the SLS_-prefixed parameters in this example, see Configuration parameters.

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

Example 6: Use Fluent-bit to upload logs

Fluent-bit is a lightweight log and metrics processor that supports uploading logs to SLS through the Kafka output plugin.

  • Configuration example

    For the SLS_-prefixed parameters in this example, see Configuration parameters.

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

Example 7: Configure Vector to upload logs using the Kafka protocol

Vector is a lightweight, high-performance log processing tool. Configure Vector to write data to SLS in Kafka-compatible mode as follows.

  • Configuration example

    For the SLS_-prefixed parameters in this example, see Configuration parameters.

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

Example 8: Use a Kafka producer to upload logs

Java

  • Dependencies

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • Sample code

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // The configurations.
            Properties props = new Properties();
            String project = "etl-shanghai-b";
            String logstore = "testlog";
            // Set this parameter to true if you want the content of the producer to be parsed as a JSON log.
            boolean parseJson = false;
            // An Alibaba Cloud account has permissions on all API operations, which poses high security risks. We recommend creating and using a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console.
            // The following example stores the AccessKey ID and AccessKey secret in environment variables. Alternatively, store them in a configuration file.
            // Do not hard-code the AccessKey ID and AccessKey secret in source code to avoid credential leaks.
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-shanghai.log.aliyuncs.com"; // Configure this parameter based on the endpoint of the project.
            String port = "10012"; // Use port 10012 for the Internet and port 10011 for the internal network.
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
    
            //Create a producer instance.
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            //Send records.
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                // If needed, use the following method to set the timestamp of the message.
                // long timestamp = System.currentTimeMillis();
                // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content);
    
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, content);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("ERROR: Failed to send message: " + exception.getMessage());
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent successfully to topic: " + metadata.topic() +
                                         ", partition: " + metadata.partition() +
                                         ", offset: " + metadata.offset() +
                                         ", timestamp: " + metadata.timestamp());
                    }
                });
            }
    
            producer.close();
        }
    }
    

Python

  • Dependencies

    pip install confluent-kafka
  • Sample code

    #!/bin/env python3
    import time
    import os
    from confluent_kafka import Producer
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
    
    def main():
        project = "etl-shanghai-b"
        logstore = "testlog"
        parse_json = False
    
        # Get credentials from environment variables
        access_key_id = os.getenv("SLS_ACCESS_KEY_ID")
        access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET")
        endpoint = "cn-shanghai.log.aliyuncs.com"
        port = "10012" # Use port 10012 for the Internet and port 10011 for the internal network.
    
        hosts = f"{project}.{endpoint}:{port}"
        topic = logstore
        if parse_json:
            topic = topic + ".json"
    
        # Configure Kafka producer
        conf = {
            'bootstrap.servers': hosts,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': project,
            'sasl.password': f"{access_key_id}#{access_key_secret}",
            'enable.idempotence': False,
        }
    
        # Create producer instance
        producer = Producer(conf)
    
        # Send message
        content = "{\"msg\": \"Hello World\"}"
        producer.produce(topic=topic,
                         value=content.encode('utf-8'),
                         #timestamp=int(time.time() * 1000),  # (Optional) Set the timestamp of the record in milliseconds.
                         callback=delivery_report)
    
        # Wait for any outstanding messages to be delivered and delivery report
        # callbacks to be triggered.
        producer.flush()
    
    if __name__ == '__main__':
        main()
    

Golang

  • Dependencies

    go get github.com/confluentinc/confluent-kafka-go/kafka
  • Sample code

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    //	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    	project := "etl-shanghai-b"
    	logstore := "testlog"
    	parseJson := false
    
    	// Get credentials from environment variables
    	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
    	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
    	endpoint := "cn-shanghai.log.aliyuncs.com"
    	port := "10012" // Use port 10012 for the Internet and port 10011 for the internal network.
    
    	hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port)
    	topic := logstore
    	if parseJson {
    		topic = topic + ".json"
    	}
    
    	// Configure Kafka producer
    	config := &kafka.ConfigMap{
    		"bootstrap.servers":  hosts,
    		"security.protocol":  "sasl_ssl",
    		"sasl.mechanisms":    "PLAIN",
    		"sasl.username":      project,
    		"sasl.password":      accessKeyID + "#" + accessKeySecret,
    		"enable.idempotence": false,
    	}
    
    	// Create producer instance
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Fatalf("Failed to create producer: %v", err)
    	}
    	defer producer.Close()
    
    	// Send messages in batches.
    	messages := []string{
    		"{\"msg\": \"Hello World 1\"}",
    		"{\"msg\": \"Hello World 2\"}",
    		"{\"msg\": \"Hello World 3\"}",
    	}
    
    	for _, content := range messages {
    		err := producer.Produce(&kafka.Message{
    			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    			Value:          []byte(content),
    			//Timestamp:      time.Now(), // Set the time if needed.
    		}, nil)
    
    		if err != nil {
    			log.Printf("Failed to produce message: %v", err)
    		}
    	}
    
    	// Enable a goroutine to listen for whether the producer successfully sends the message.
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
    				} else {
    					fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
    						*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
    				}
    			}
    		}
    	}()
    
    	producer.Flush(5 * 1000)
    }

Troubleshooting

The following table lists errors for Kafka protocol log upload. The complete reference is available in the error list.

Error message

Description

Recommended solution

NetworkException

A network error occurred.

Wait one second and retry.

TopicAuthorizationException

Authentication failed.

The AccessKey pair is invalid, or the account lacks write permissions for the specified project or Logstore. Specify a valid AccessKey pair with write permissions.

UnknownTopicOrPartitionException

This error occurs for one of the following reasons:

  • The specified project or logstore does not exist.

  • The region of the project is different from the region specified in the endpoint.

Verify that the project and Logstore exist. If the error persists, check whether the project region matches the endpoint region.

KafkaStorageException

A server-side exception occurred.

Wait one second and retry.