Gunakan alat pengumpulan log seperti SDK Kafka Producer, Beats, Collectd, Fluentd, Logstash, Telegraf, dan Vector untuk mengunggah log ke Simple Log Service (SLS) melalui protokol Kafka. Topik ini menjelaskan cara mengunggah log dari alat-alat tersebut ke SLS menggunakan protokol Kafka.
Batasan
Versi protokol Kafka yang didukung paling awal adalah 2.1.0.
Anda harus menggunakan protokol koneksi SASL_SSL untuk memastikan transmisi log yang aman.
Izin
Anda harus memiliki salah satu izin berikut.
Kebijakan ini memberikan izin untuk mengelola SLS. Untuk informasi selengkapnya tentang cara memberikan izin, lihat Berikan izin kepada RAM user dan Kelola izin Peran RAM.
Kebijakan kustom
Untuk membuat kebijakan kustom, pada tab Script Editor, ganti konten yang ada di kotak konfigurasi dengan skrip berikut. Untuk informasi selengkapnya, lihat Buat kebijakan kustom.
CatatanDalam skrip, ganti
project_namedengan nama proyek Anda yang sebenarnya.{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project_name", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project_name/logstore/*", "Effect": "Allow" } ] }Lampirkan kebijakan kustom yang telah Anda buat ke RAM user. Untuk informasi selengkapnya, lihat Berikan izin kepada RAM user.
Konfigurasi
Saat menggunakan protokol Kafka untuk mengunggah log, Anda harus mengonfigurasi parameter berikut.
Nama konfigurasi | Nilai konfigurasi | Deskripsi | Contoh |
SLS_KAFKA_ENDPOINT | Titik akhir kluster yang digunakan untuk koneksi awal. Formatnya adalah |
| aliyun-project-test adalah nama proyek.
|
SLS_PROJECT | Nama proyek | Nama proyek SLS. | aliyun-project-test |
|
SLS_LOGSTORE |
Nama logstore |
Nama logstore SLS. Jika Anda menambahkan akhiran |
Misalnya, nama logstore adalah
|
SLS_PASSWORD | Rahasia AccessKey yang memiliki izin tulis pada SLS. | Untuk informasi tentang apa itu pasangan AccessKey dan cara membuatnya, lihat Buat pasangan AccessKey. Nilainya terdiri dari ID AccessKey dan rahasia AccessKey, dipisahkan oleh simbol
| LTAI****************#yourAccessKeySecret |
Jika Anda ingin menggunakan kelompok konsumen Kafka untuk mengonsumsi data dari SLS secara waktu nyata, ajukan tiket untuk berkonsultasi dengan dukungan teknis Alibaba Cloud.
Contoh 1: Gunakan Beats untuk mengunggah log
Beats—seperti Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, dan Heartbeat—dapat mengumpulkan log dan mengunggahnya ke SLS menggunakan protokol Kafka.
Contoh konfigurasi
Untuk informasi selengkapnya tentang parameter yang diawali dengan
SLS_dalam contoh, lihat Konfigurasi.output.kafka: # initial brokers for reading cluster metadata hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # message topic selection + partitioning topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Contoh 2: Gunakan Collectd untuk mengunggah log
Collectd adalah daemon yang secara berkala mengumpulkan metrik kinerja sistem dan aplikasi serta mengunggah metrik tersebut ke SLS menggunakan protokol Kafka. Untuk informasi selengkapnya, lihat Write Kafka Plugin.
Saat mengunggah log yang dikumpulkan oleh Collectd ke SLS, Anda juga harus menginstal plugin Kafka dan dependensinya. Misalnya, di CentOS, jalankan perintah sudo yum install collectd-write_kafka untuk menginstal plugin Kafka. Untuk informasi selengkapnya tentang cara menginstal paket Manajer Paket RPM (RPM), lihat Collectd-write_kafka.
Contoh konfigurasi
Dalam contoh ini, format output diatur ke JSON. Format lain, seperti Command dan Graphite, juga didukung. Untuk informasi selengkapnya, lihat dokumentasi konfigurasi Collectd.
Untuk informasi selengkapnya tentang parameter yang diawali dengan
SLS_dalam contoh, lihat Konfigurasi.
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
Contoh 3: Gunakan Telegraf untuk mengunggah log
Telegraf adalah agen berbasis Go yang menggunakan sedikit memori untuk mengumpulkan, memproses, dan mengagregasi metrik data. Telegraf menyediakan berbagai plugin dan fitur integrasi. Gunakan Telegraf untuk mendapatkan berbagai metrik dari sistem host-nya, mengambil metrik dari API pihak ketiga, serta mendengarkan metrik melalui layanan statsd dan konsumen Kafka.
Sebelum mengunggah log yang dikumpulkan oleh Telegraf ke SLS menggunakan protokol Kafka, Anda harus memodifikasi file konfigurasi.
Contoh konfigurasi
Dalam contoh ini, format output diatur ke JSON. Format lain, seperti Graphite dan Carbon2, juga didukung. Untuk informasi selengkapnya, lihat format output Telegraf.
CatatanAnda harus mengonfigurasi path yang valid untuk tls_ca di Telegraf. Gunakan path sertifikat root yang disediakan oleh server. Di lingkungan Linux, path sertifikat CA root biasanya /etc/ssl/certs/ca-bundle.crt.
Untuk informasi selengkapnya tentang parameter yang diawali dengan
SLS_dalam contoh, lihat Konfigurasi.
# Kafka output plugin configuration [[outputs.kafka]] ## URLs of kafka brokers brokers = ["SLS_KAFKA_ENDPOINT"] ## Kafka topic for producer messages topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
Contoh 4: Gunakan Fluentd untuk mengunggah log
Fluentd adalah pengumpul log open source dan proyek dari Cloud Native Computing Foundation (CNCF) yang dikembangkan di bawah Lisensi Apache 2.
Fluentd mendukung berbagai plugin input, pemrosesan, dan output. Gunakan plugin Kafka untuk mengunggah log ke SLS. Anda hanya perlu menginstal dan mengonfigurasi plugin Kafka. Untuk informasi selengkapnya, lihat fluent-plugin-kafka.
Contoh konfigurasi
Dalam contoh ini, format output diatur ke JSON. Puluhan format lain juga didukung. Untuk informasi selengkapnya, lihat Fluentd Formatter.
Untuk informasi selengkapnya tentang parameter yang diawali dengan
SLS_dalam contoh, lihat Konfigurasi.
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
Contoh 5: Gunakan Logstash untuk mengunggah log
Logstash adalah mesin pengumpulan log open source waktu nyata. Gunakan Logstash untuk mengumpulkan log secara dinamis dari berbagai sumber.
Untuk mengunggah log menggunakan protokol Kafka, Anda harus menggunakan Logstash 7.10.1 atau versi yang lebih baru.
Logstash memiliki plugin output Kafka bawaan. Konfigurasikan Logstash untuk mengunggah log ke SLS menggunakan protokol Kafka. Karena SLS menggunakan protokol koneksi SASL_SSL, Anda juga harus mengonfigurasi sertifikat SSL dan file JAAS.
Contoh konfigurasi
Dalam contoh ini, format output diatur ke JSON. Puluhan format lain juga didukung. Untuk informasi selengkapnya, lihat Logstash Codec.
CatatanContoh ini menunjukkan konfigurasi untuk uji konektivitas. Kami menyarankan agar Anda menghapus konfigurasi output stdout di lingkungan produksi.
Untuk informasi selengkapnya tentang parameter yang diawali dengan
SLS_dalam contoh, lihat Konfigurasi.
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
Contoh 6: Gunakan Fluent-bit untuk mengunggah log
Fluent-bit adalah prosesor dan forwarder log serta metrik yang ringan dan sangat skalabel. Fluent-bit mendukung berbagai plugin input, pemrosesan, dan output. Gunakan plugin Kafka untuk mengunggah log ke SLS. Untuk informasi selengkapnya, lihat Kafka output plugin.
Contoh konfigurasi
Untuk informasi selengkapnya tentang parameter yang diawali dengan
SLS_dalam contoh, lihat Konfigurasi.[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
Contoh 7: Konfigurasikan Vector untuk mengunggah log menggunakan protokol Kafka
Vector adalah perangkat lunak pemrosesan log yang ringan dan berkinerja tinggi yang mendukung pelaporan log menggunakan protokol Kafka. Bagian berikut menjelaskan cara mengonfigurasi Vector untuk menulis data ke SLS dalam mode kompatibel Kafka.
Contoh konfigurasi
Untuk informasi selengkapnya tentang parameter yang diawali dengan
SLS_dalam contoh, lihat Konfigurasi.[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
Contoh 8: Gunakan produsen Kafka untuk mengunggah log
Java
Dependensi
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>Kode contoh
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // The configurations. Properties props = new Properties(); String project = "etl-shanghai-b"; String logstore = "testlog"; // Set this parameter to true if you want the content of the producer to be parsed as a JSON log. boolean parseJson = false; // An Alibaba Cloud account has full permissions on all API operations. This poses a high security threat. We recommend that you create and use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. // This section provides an example on how to save an AccessKey ID and an AccessKey secret to environment variables. You can also save the AccessKey ID and AccessKey secret to a configuration file as needed. // We recommend that you do not save the AccessKey ID and AccessKey secret in the code to prevent security risks. String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-shanghai.log.aliyuncs.com"; // Configure this parameter based on the endpoint of the project. String port = "10012"; // Use port 10012 for the Internet and port 10011 for the internal network. String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); //Create a producer instance. KafkaProducer<String,String> producer = new KafkaProducer<>(props); //Send records. for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; // If needed, use the following method to set the timestamp of the message. // long timestamp = System.currentTimeMillis(); // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content); ProducerRecord<String, String> record = new ProducerRecord<>(topic, content); producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("ERROR: Failed to send message: " + exception.getMessage()); exception.printStackTrace(); } else { System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset() + ", timestamp: " + metadata.timestamp()); } }); } producer.close(); } }
Python
Dependensi
pip install confluent-kafkaKode contoh
#!/bin/env python3 import time import os from confluent_kafka import Producer def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset())) def main(): project = "etl-shanghai-b" logstore = "testlog" parse_json = False # Get credentials from environment variables access_key_id = os.getenv("SLS_ACCESS_KEY_ID") access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET") endpoint = "cn-shanghai.log.aliyuncs.com" port = "10012" # Use port 10012 for the Internet and port 10011 for the internal network. hosts = f"{project}.{endpoint}:{port}" topic = logstore if parse_json: topic = topic + ".json" # Configure Kafka producer conf = { 'bootstrap.servers': hosts, 'security.protocol': 'sasl_ssl', 'sasl.mechanisms': 'PLAIN', 'sasl.username': project, 'sasl.password': f"{access_key_id}#{access_key_secret}", 'enable.idempotence': False, } # Create producer instance producer = Producer(conf) # Send message content = "{\"msg\": \"Hello World\"}" producer.produce(topic=topic, value=content.encode('utf-8'), #timestamp=int(time.time() * 1000), # (Optional) Set the timestamp of the record in milliseconds. callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. producer.flush() if __name__ == '__main__': main()
Golang
Dependensi
go get github.com/confluentinc/confluent-kafka-go/kafkaKode contoh
package main import ( "fmt" "log" "os" // "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { project := "etl-shanghai-b" logstore := "testlog" parseJson := false // Get credentials from environment variables accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID") accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET") endpoint := "cn-shanghai.log.aliyuncs.com" port := "10012" // Use port 10012 for the Internet and port 10011 for the internal network. hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port) topic := logstore if parseJson { topic = topic + ".json" } // Configure Kafka producer config := &kafka.ConfigMap{ "bootstrap.servers": hosts, "security.protocol": "sasl_ssl", "sasl.mechanisms": "PLAIN", "sasl.username": project, "sasl.password": accessKeyID + "#" + accessKeySecret, "enable.idempotence": false, } // Create producer instance producer, err := kafka.NewProducer(config) if err != nil { log.Fatalf("Failed to create producer: %v", err) } defer producer.Close() // Send messages in batches. messages := []string{ "{\"msg\": \"Hello World 1\"}", "{\"msg\": \"Hello World 2\"}", "{\"msg\": \"Hello World 3\"}", } for _, content := range messages { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(content), //Timestamp: time.Now(), // Set the time if needed. }, nil) if err != nil { log.Printf("Failed to produce message: %v", err) } } // Enable a goroutine to listen for whether the producer successfully sends the message. go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error) } else { fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) } } } }() producer.Flush(5 * 1000) }
Pesan error
Jika Anda gagal mengunggah log menggunakan protokol Kafka, pesan error akan dikembalikan dalam format pesan error Kafka. Tabel berikut menjelaskan pesan-pesan error tersebut. Untuk informasi selengkapnya tentang pesan error protokol Kafka, lihat daftar error.
Pesan error | Deskripsi | Solusi yang direkomendasikan |
NetworkException | Terjadi error jaringan. | Tunggu 1 detik lalu coba lagi. |
TopicAuthorizationException |
Autentikasi gagal. |
Error ini biasanya menunjukkan bahwa pasangan AccessKey yang diberikan tidak valid atau tidak memiliki izin tulis ke proyek atau logstore yang ditentukan. Pastikan Anda menggunakan pasangan AccessKey yang valid dengan izin tulis yang diperlukan. |
UnknownTopicOrPartitionException |
Error ini dapat terjadi karena salah satu alasan berikut:
|
Pastikan proyek dan logstore yang ditentukan ada. Jika error tetap terjadi, verifikasi bahwa wilayah proyek sesuai dengan wilayah titik akhir. |
KafkaStorageException | Terjadi exception di server. | Tunggu 1 detik lalu coba lagi. |