Use log collection tools, such as the Kafka Producer SDK, Beats, Collectd, Fluentd, Logstash, Telegraf, and Vector, to upload logs to Simple Log Service (SLS) using the Kafka protocol. This topic describes how to use the Kafka protocol to upload logs from these tools to SLS.
Limits
The earliest supported version of the Kafka protocol is 2.1.0.
You must use the SASL_SSL connection protocol to ensure secure log transmission.
Permissions
You must have one of the following permissions.
This policy grants permissions to manage SLS. For more information about how to grant permissions, see Grant permissions to a RAM user and Grant permissions to a RAM role.
Custom policies
To create a custom policy, on the Script Editor tab, replace the existing content in the configuration box with the following script. For more information, see Create a custom policy.
NoteIn the script, replace
project_namewith your actual project name.{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project_name", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project_name/logstore/*", "Effect": "Allow" } ] }Attach the custom policy that you created to a RAM user. For more information, see Grant permissions to a RAM user.
Configurations
When you use the Kafka protocol to upload logs, you must configure the following parameters.
Configuration name | Configuration value | Description | Example |
SLS_KAFKA_ENDPOINT | The endpoint of the cluster that is used for the initial connection. The format is |
| aliyun-project-test is the project name.
|
SLS_PROJECT | Project name | The name of the SLS project. | aliyun-project-test |
SLS_LOGSTORE | Logstore name | The name of the logstore. If you append | For example, the logstore name is
|
SLS_PASSWORD | The AccessKey secret that has the write permissions on SLS. | For information about what an AccessKey pair is and how to create one, see Create an AccessKey pair. The value consists of the AccessKey ID and the AccessKey secret, separated by the
| LTAI****************#yourAccessKeySecret |
If you want to use a Kafka consumer group to consume data from SLS in real time, submit a ticket to consult Alibaba Cloud technical support.
Example 1: Use Beats to upload logs
Beats, such as MetricBeat, PacketBeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat, can collect logs and upload them to SLS using the Kafka protocol.
Configuration example
For more information about the parameters that start with
SLS_in the example, see Configurations.output.kafka: # initial brokers for reading cluster metadata hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # message topic selection + partitioning topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Example 2: Use Collectd to upload logs
Collectd is a daemon that periodically collects system and application performance metrics and uploads the metrics to SLS using the Kafka protocol. For more information, see Write Kafka Plugin.
When you upload logs collected by Collectd to SLS, you must also install the Kafka plugin and its dependencies. For example, in CentOS, run the sudo yum install collectd-write_kafka command to install the Kafka plugin. For more information about how to install the RPM Package Manager (RPM) package, see Collectd-write_kafka.
Configuration example
In this example, the output format is set to JSON. Other formats, such as Command and Graphite, are also supported. For more information, see the Collectd configuration document.
For more information about the parameters that start with
SLS_in the example, see Configurations.
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
Example 3: Use Telegraf to upload logs
Telegraf is a Go-based agent program that uses a small amount of memory to collect, process, and aggregate data metrics. Telegraf provides a wide range of plugins and integration features. Use Telegraf to obtain various metrics from its host system, retrieve metrics from third-party APIs, and listen for metrics using statsd and Kafka consumer services.
Before you upload logs collected by Telegraf to SLS using the Kafka protocol, you must modify the configuration file.
Configuration example
In this example, the output format is set to JSON. Other formats, such as Graphite and Carbon2, are also supported. For more information, see Telegraf output formats.
NoteYou must configure a valid path for tls_ca in Telegraf. Use the path of the root certificate provided by the server. In a Linux environment, the path of the root CA certificate is typically /etc/ssl/certs/ca-bundle.crt.
For more information about the parameters that start with
SLS_in the example, see Configurations.
# Kafka output plugin configuration [[outputs.kafka]] ## URLs of kafka brokers brokers = ["SLS_KAFKA_ENDPOINT"] ## Kafka topic for producer messages topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
Example 4: Use Fluentd to upload logs
Fluentd is an open source log collector and a project of the Cloud Native Computing Foundation (CNCF) developed under the Apache 2 License.
Fluentd supports a variety of input, processing, and output plugins. Use the Kafka plugin to upload logs to SLS. You only need to install and configure the Kafka plugin. For more information, see fluent-plugin-kafka.
Configuration example
In this example, the output format is set to JSON. Dozens of other formats are also supported. For more information, see Fluentd Formatter.
For more information about the parameters that start with
SLS_in the example, see Configurations.
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
Example 5: Use Logstash to upload logs
Logstash is an open source, real-time log collection engine. Use Logstash to dynamically collect logs from different sources.
To upload logs using the Kafka protocol, you must use Logstash 7.10.1 or later.
Logstash has a built-in Kafka output plugin. Configure Logstash to upload logs to SLS using the Kafka protocol. Because SLS uses the SASL_SSL connection protocol, you must also configure an SSL certificate and a JAAS file.
Configuration example
In this example, the output format is set to JSON. Dozens of other formats are also supported. For more information, see Logstash Codec.
NoteThis example shows the configurations for a connectivity test. We recommend that you delete the stdout output configuration in a production environment.
For more information about the parameters that start with
SLS_in the example, see Configurations.
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
Example 6: Use Fluent-bit to upload logs
Fluent-bit is a lightweight and highly scalable log and metric processor and forwarder. It supports a variety of input, processing, and output plugins. Use the Kafka plugin to upload logs to SLS. For more information, see Kafka output plugin.
Configuration example
For more information about the parameters that start with
SLS_in the example, see Configurations.[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
Example 7: Configure Vector to upload logs using the Kafka protocol
Vector is a lightweight, high-performance log processing software program that supports log reporting using the Kafka protocol. The following section describes how to configure Vector to write data to SLS in Kafka-compatible mode.
Configuration example
For more information about the parameters that start with
SLS_in the example, see Configurations.[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
Example 8: Use a Kafka producer to upload logs
Java
Dependencies
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>Sample code
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // The configurations. Properties props = new Properties(); String project = "etl-shanghai-b"; String logstore = "testlog"; // Set this parameter to true if you want the content of the producer to be parsed as a JSON log. boolean parseJson = false; // An Alibaba Cloud account has full permissions on all API operations. This poses a high security threat. We recommend that you create and use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. // This section provides an example on how to save an AccessKey ID and an AccessKey secret to environment variables. You can also save the AccessKey ID and AccessKey secret to a configuration file as needed. // We recommend that you do not save the AccessKey ID and AccessKey secret in the code to prevent security risks. String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-shanghai.log.aliyuncs.com"; // Configure this parameter based on the endpoint of the project. String port = "10012"; // Use port 10012 for the Internet and port 10011 for the internal network. String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); //Create a producer instance. KafkaProducer<String,String> producer = new KafkaProducer<>(props); //Send records. for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; // If needed, use the following method to set the timestamp of the message. // long timestamp = System.currentTimeMillis(); // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content); ProducerRecord<String, String> record = new ProducerRecord<>(topic, content); producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("ERROR: Failed to send message: " + exception.getMessage()); exception.printStackTrace(); } else { System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset() + ", timestamp: " + metadata.timestamp()); } }); } producer.close(); } }
Python
Dependencies
pip install confluent-kafkaSample code
#!/bin/env python3 import time import os from confluent_kafka import Producer def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset())) def main(): project = "etl-shanghai-b" logstore = "testlog" parse_json = False # Get credentials from environment variables access_key_id = os.getenv("SLS_ACCESS_KEY_ID") access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET") endpoint = "cn-shanghai.log.aliyuncs.com" port = "10012" # Use port 10012 for the Internet and port 10011 for the internal network. hosts = f"{project}.{endpoint}:{port}" topic = logstore if parse_json: topic = topic + ".json" # Configure Kafka producer conf = { 'bootstrap.servers': hosts, 'security.protocol': 'sasl_ssl', 'sasl.mechanisms': 'PLAIN', 'sasl.username': project, 'sasl.password': f"{access_key_id}#{access_key_secret}", 'enable.idempotence': False, } # Create producer instance producer = Producer(conf) # Send message content = "{\"msg\": \"Hello World\"}" producer.produce(topic=topic, value=content.encode('utf-8'), #timestamp=int(time.time() * 1000), # (Optional) Set the timestamp of the record in milliseconds. callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. producer.flush() if __name__ == '__main__': main()
Golang
Dependencies
go get github.com/confluentinc/confluent-kafka-go/kafkaSample code
package main import ( "fmt" "log" "os" // "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { project := "etl-shanghai-b" logstore := "testlog" parseJson := false // Get credentials from environment variables accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID") accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET") endpoint := "cn-shanghai.log.aliyuncs.com" port := "10012" // Use port 10012 for the Internet and port 10011 for the internal network. hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port) topic := logstore if parseJson { topic = topic + ".json" } // Configure Kafka producer config := &kafka.ConfigMap{ "bootstrap.servers": hosts, "security.protocol": "sasl_ssl", "sasl.mechanisms": "PLAIN", "sasl.username": project, "sasl.password": accessKeyID + "#" + accessKeySecret, "enable.idempotence": false, } // Create producer instance producer, err := kafka.NewProducer(config) if err != nil { log.Fatalf("Failed to create producer: %v", err) } defer producer.Close() // Send messages in batches. messages := []string{ "{\"msg\": \"Hello World 1\"}", "{\"msg\": \"Hello World 2\"}", "{\"msg\": \"Hello World 3\"}", } for _, content := range messages { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(content), //Timestamp: time.Now(), // Set the time if needed. }, nil) if err != nil { log.Printf("Failed to produce message: %v", err) } } // Enable a goroutine to listen for whether the producer successfully sends the message. go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error) } else { fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) } } } }() producer.Flush(5 * 1000) }
Error messages
If you fail to upload logs using the Kafka protocol, an error message is returned in the Kafka error message format. The following table describes the error messages. For more information about Kafka protocol error messages, see the error list.
Error message | Description | Recommended solution |
NetworkException | A network error occurred. | Wait for 1 second and then retry. |
TopicAuthorizationException | Authentication failed. | This error is usually caused by an invalid AccessKey pair or insufficient permissions to write data to the specified project or logstore. Specify a valid AccessKey pair that has write permissions. |
UnknownTopicOrPartitionException | This error may occur for one of the following reasons:
| Make sure that the specified project and logstore exist. If the error persists, check whether the project region is the same as the region specified in the endpoint. |
KafkaStorageException | An exception occurred on the server. | Wait for 1 second and then retry. |