SLS supports log ingestion through the Kafka protocol from collectors such as Kafka Producer SDK, Beats, Collectd, Fluentd, Logstash, Telegraf, Fluent-bit, and Vector.
Limits
-
The earliest supported version of the Kafka protocol is 2.1.0.
-
The SASL_SSL connection protocol is required for secure log transmission.
Permissions
Your account must have one of the following permissions.
-
This policy grants permissions to manage SLS. For authorization instructions, see Manage RAM user permissions and Manage permissions for a RAM role.
-
Custom policies
-
Create a custom policy. On the Script Editor tab, replace the existing content with the following script. For more information, see Create a custom policy.
NoteIn the script, replace
project_namewith your actual project name.{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project_name", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project_name/logstore/*", "Effect": "Allow" } ] } -
Attach the custom policy to a RAM user. For more information, see Manage RAM user permissions.
-
Configuration parameters
Configure the following parameters for Kafka protocol log upload.
|
Configuration name |
Configuration value |
Description |
Example |
|
SLS_KAFKA_ENDPOINT |
The initial connection endpoint in the format |
|
aliyun-project-test is the project name.
|
|
SLS_PROJECT |
Project name |
The name of the SLS project. |
aliyun-project-test |
|
SLS_LOGSTORE |
Logstore name |
The Logstore name. If you append |
For example, the logstore name is
|
|
SLS_PASSWORD |
The AccessKey secret with write permissions on SLS. |
For information about what an AccessKey pair is and how to create one, see Create an AccessKey pair. The value consists of the AccessKey ID and the AccessKey secret, separated by the
|
LTAI****************#yourAccessKeySecret |
To use a Kafka consumer group to consume data from SLS in real time, submit a ticket to contact Alibaba Cloud technical support.
Example 1: Use Beats to upload logs
Beats (MetricBeat, PacketBeat, Winlogbeat, Auditbeat, Filebeat, Heartbeat) can upload logs to SLS through the Kafka protocol.
-
Configuration example
For the
SLS_-prefixed parameters in this example, see Configuration parameters.output.kafka: # initial brokers for reading cluster metadata hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # message topic selection + partitioning topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Example 2: Use Collectd to upload logs
Collectd is a daemon that periodically collects system and application performance metrics. It uploads metrics to SLS through the Write Kafka Plugin.
Install the Kafka plugin and its dependencies. For example, in CentOS, run sudo yum install collectd-write_kafka. RPM packages are available at Collectd-write_kafka.
-
Configuration example
-
This example uses JSON output. Other formats (Command, Graphite) are documented in the Collectd configuration document.
-
For the
SLS_-prefixed parameters in this example, see Configuration parameters.
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin> -
Example 3: Use Telegraf to upload logs
Telegraf is a lightweight Go-based agent for collecting, processing, and aggregating metrics from host systems, third-party APIs, and services.
Modify the Telegraf configuration file to upload logs to SLS through the Kafka protocol.
-
Configuration example
-
This example uses JSON output. Other formats (Graphite, Carbon2) are documented in Telegraf output formats.
NoteConfigure a valid tls_ca path in Telegraf. Use the root certificate path provided by the server. In Linux, the root CA certificate path is typically /etc/ssl/certs/ca-bundle.crt.
-
For the
SLS_-prefixed parameters in this example, see Configuration parameters.
# Kafka output plugin configuration [[outputs.kafka]] ## URLs of kafka brokers brokers = ["SLS_KAFKA_ENDPOINT"] ## Kafka topic for producer messages topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json" -
Example 4: Use Fluentd to upload logs
Fluentd is an open source log collector.
Install and configure fluent-plugin-kafka to upload logs to SLS.
-
Configuration example
-
This example uses JSON output. Other formats are documented in Fluentd Formatter.
-
For the
SLS_-prefixed parameters in this example, see Configuration parameters.
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match> -
Example 5: Use Logstash to upload logs
Logstash is an open source, real-time log collection engine that collects logs from different sources.
Kafka protocol log upload requires Logstash 7.10.1 or later.
Logstash includes a built-in Kafka output plugin. Because SLS requires SASL_SSL, you must also configure an SSL certificate and a JAAS file.
-
Configuration example
-
This example uses JSON output. Other formats are documented in Logstash Codec.
NoteThis example is for connectivity testing only. Remove the stdout output configuration in production environments.
-
For the
SLS_-prefixed parameters in this example, see Configuration parameters.
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } } -
Example 6: Use Fluent-bit to upload logs
Fluent-bit is a lightweight log and metrics processor that supports uploading logs to SLS through the Kafka output plugin.
-
Configuration example
For the
SLS_-prefixed parameters in this example, see Configuration parameters.[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
Example 7: Configure Vector to upload logs using the Kafka protocol
Vector is a lightweight, high-performance log processing tool. Configure Vector to write data to SLS in Kafka-compatible mode as follows.
-
Configuration example
For the
SLS_-prefixed parameters in this example, see Configuration parameters.[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
Example 8: Use a Kafka producer to upload logs
Java
-
Dependencies
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency> -
Sample code
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // The configurations. Properties props = new Properties(); String project = "etl-shanghai-b"; String logstore = "testlog"; // Set this parameter to true if you want the content of the producer to be parsed as a JSON log. boolean parseJson = false; // An Alibaba Cloud account has permissions on all API operations, which poses high security risks. We recommend creating and using a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. // The following example stores the AccessKey ID and AccessKey secret in environment variables. Alternatively, store them in a configuration file. // Do not hard-code the AccessKey ID and AccessKey secret in source code to avoid credential leaks. String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-shanghai.log.aliyuncs.com"; // Configure this parameter based on the endpoint of the project. String port = "10012"; // Use port 10012 for the Internet and port 10011 for the internal network. String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); //Create a producer instance. KafkaProducer<String,String> producer = new KafkaProducer<>(props); //Send records. for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; // If needed, use the following method to set the timestamp of the message. // long timestamp = System.currentTimeMillis(); // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content); ProducerRecord<String, String> record = new ProducerRecord<>(topic, content); producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("ERROR: Failed to send message: " + exception.getMessage()); exception.printStackTrace(); } else { System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset() + ", timestamp: " + metadata.timestamp()); } }); } producer.close(); } }
Python
-
Dependencies
pip install confluent-kafka -
Sample code
#!/bin/env python3 import time import os from confluent_kafka import Producer def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset())) def main(): project = "etl-shanghai-b" logstore = "testlog" parse_json = False # Get credentials from environment variables access_key_id = os.getenv("SLS_ACCESS_KEY_ID") access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET") endpoint = "cn-shanghai.log.aliyuncs.com" port = "10012" # Use port 10012 for the Internet and port 10011 for the internal network. hosts = f"{project}.{endpoint}:{port}" topic = logstore if parse_json: topic = topic + ".json" # Configure Kafka producer conf = { 'bootstrap.servers': hosts, 'security.protocol': 'sasl_ssl', 'sasl.mechanisms': 'PLAIN', 'sasl.username': project, 'sasl.password': f"{access_key_id}#{access_key_secret}", 'enable.idempotence': False, } # Create producer instance producer = Producer(conf) # Send message content = "{\"msg\": \"Hello World\"}" producer.produce(topic=topic, value=content.encode('utf-8'), #timestamp=int(time.time() * 1000), # (Optional) Set the timestamp of the record in milliseconds. callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. producer.flush() if __name__ == '__main__': main()
Golang
-
Dependencies
go get github.com/confluentinc/confluent-kafka-go/kafka -
Sample code
package main import ( "fmt" "log" "os" // "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { project := "etl-shanghai-b" logstore := "testlog" parseJson := false // Get credentials from environment variables accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID") accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET") endpoint := "cn-shanghai.log.aliyuncs.com" port := "10012" // Use port 10012 for the Internet and port 10011 for the internal network. hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port) topic := logstore if parseJson { topic = topic + ".json" } // Configure Kafka producer config := &kafka.ConfigMap{ "bootstrap.servers": hosts, "security.protocol": "sasl_ssl", "sasl.mechanisms": "PLAIN", "sasl.username": project, "sasl.password": accessKeyID + "#" + accessKeySecret, "enable.idempotence": false, } // Create producer instance producer, err := kafka.NewProducer(config) if err != nil { log.Fatalf("Failed to create producer: %v", err) } defer producer.Close() // Send messages in batches. messages := []string{ "{\"msg\": \"Hello World 1\"}", "{\"msg\": \"Hello World 2\"}", "{\"msg\": \"Hello World 3\"}", } for _, content := range messages { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(content), //Timestamp: time.Now(), // Set the time if needed. }, nil) if err != nil { log.Printf("Failed to produce message: %v", err) } } // Enable a goroutine to listen for whether the producer successfully sends the message. go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error) } else { fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) } } } }() producer.Flush(5 * 1000) }
Troubleshooting
The following table lists errors for Kafka protocol log upload. The complete reference is available in the error list.
|
Error message |
Description |
Recommended solution |
|
NetworkException |
A network error occurred. |
Wait one second and retry. |
|
TopicAuthorizationException |
Authentication failed. |
The AccessKey pair is invalid, or the account lacks write permissions for the specified project or Logstore. Specify a valid AccessKey pair with write permissions. |
|
UnknownTopicOrPartitionException |
This error occurs for one of the following reasons:
|
Verify that the project and Logstore exist. If the error persists, check whether the project region matches the endpoint region. |
|
KafkaStorageException |
A server-side exception occurred. |
Wait one second and retry. |