Gunakan alat pengumpulan log seperti Kafka Producer SDK, Beats, Collectd, Fluentd, Logstash, Telegraf, dan Vector untuk mengunggah log ke Simple Log Service (SLS) menggunakan protokol Kafka. Topik ini menjelaskan cara menggunakan protokol Kafka untuk mengunggah log dari alat-alat tersebut ke SLS.
Batasan
Versi paling awal dari protokol Kafka yang didukung adalah 2.1.0.
Anda harus menggunakan protokol koneksi SASL_SSL untuk memastikan transmisi log yang aman.
Izin
Anda harus memiliki salah satu izin berikut:
Kebijakan ini memberikan izin untuk mengelola SLS. Untuk informasi lebih lanjut tentang cara memberikan izin, lihat Berikan Izin kepada Pengguna RAM dan Berikan Izin kepada Peran RAM.
Kebijakan Kustom
Untuk membuat kebijakan kustom, pada tab Script Editor, ganti konten yang ada di kotak konfigurasi dengan skrip berikut. Untuk informasi lebih lanjut, lihat Buat Kebijakan Kustom.
CatatanDi dalam skrip, ganti
project_namedengan nama proyek Anda yang sebenarnya.{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project_name", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project_name/logstore/*", "Effect": "Allow" } ] }Lampirkan kebijakan kustom yang Anda buat ke Pengguna RAM. Untuk informasi lebih lanjut, lihat Berikan Izin kepada Pengguna RAM.
Konfigurasi
Saat menggunakan protokol Kafka untuk mengunggah log, Anda harus mengonfigurasi parameter berikut:
Nama Konfigurasi | Nilai Konfigurasi | Deskripsi | Contoh |
SLS_KAFKA_ENDPOINT | Titik akhir kluster yang digunakan untuk koneksi awal. Formatnya adalah |
| aliyun-project-test adalah nama proyek.
|
SLS_PROJECT | Nama Proyek | Nama proyek SLS. | aliyun-project-test |
SLS_LOGSTORE | Nama Logstore | Nama logstore. Jika Anda menambahkan | Sebagai contoh, nama logstore adalah
|
SLS_PASSWORD | Rahasia AccessKey yang memiliki izin tulis pada SLS. | Untuk informasi tentang apa itu Pasangan Kunci Akses dan cara pembuatannya, lihat Buat Pasangan Kunci Akses. Nilai tersebut terdiri dari ID AccessKey dan rahasia AccessKey, dipisahkan oleh simbol
| LTAI****************#yourAccessKeySecret |
Jika ingin menggunakan kelompok konsumen Kafka untuk mengonsumsi data dari SLS secara real-time, ajukan tiket untuk berkonsultasi dengan dukungan teknis Alibaba Cloud.
Contoh 1: Gunakan Beats untuk mengunggah log
Beats, seperti MetricBeat, PacketBeat, Winlogbeat, Auditbeat, Filebeat, dan Heartbeat, dapat mengumpulkan log dan mengunggahnya ke SLS menggunakan protokol Kafka.
Contoh Konfigurasi
Untuk informasi lebih lanjut tentang parameter yang dimulai dengan
SLS_dalam contoh, lihat Konfigurasi.output.kafka: # broker awal untuk membaca metadata kluster hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # pemilihan topik pesan + partisi topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Contoh 2: Gunakan Collectd untuk mengunggah log
Collectd adalah daemon yang secara berkala mengumpulkan metrik kinerja sistem dan aplikasi serta mengunggah metrik tersebut ke SLS menggunakan protokol Kafka. Untuk informasi lebih lanjut, lihat Plugin Write Kafka.
Saat mengunggah log yang dikumpulkan oleh Collectd ke SLS, Anda juga harus menginstal plugin Kafka dan dependensinya. Sebagai contoh, di CentOS, jalankan perintah sudo yum install collectd-write_kafka untuk menginstal plugin Kafka. Untuk informasi lebih lanjut tentang cara menginstal paket RPM Package Manager (RPM), lihat Collectd-write_kafka.
Contoh Konfigurasi
Dalam contoh ini, format output diatur ke JSON. Format lain, seperti Command dan Graphite, juga didukung. Untuk informasi lebih lanjut, lihat Dokumen Konfigurasi Collectd.
Untuk informasi lebih lanjut tentang parameter yang dimulai dengan
SLS_dalam contoh, lihat Konfigurasi.
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
Contoh 3: Gunakan Telegraf untuk mengunggah log
Telegraf adalah program agen berbasis Go yang menggunakan sedikit memori untuk mengumpulkan, memproses, dan menggabungkan metrik data. Telegraf menyediakan berbagai macam plugin dan fitur integrasi. Gunakan Telegraf untuk mendapatkan berbagai metrik dari sistem host-nya, mengambil metrik dari API pihak ketiga, dan mendengarkan metrik menggunakan layanan statsd dan konsumen Kafka.
Sebelum mengunggah log yang dikumpulkan oleh Telegraf ke SLS menggunakan protokol Kafka, Anda harus memodifikasi file konfigurasi.
Contoh Konfigurasi
Dalam contoh ini, format output diatur ke JSON. Format lain, seperti Graphite dan Carbon2, juga didukung. Untuk informasi lebih lanjut, lihat Format Output Telegraf.
CatatanAnda harus mengonfigurasi jalur yang valid untuk tls_ca di Telegraf. Gunakan jalur sertifikat root yang disediakan oleh server. Di lingkungan Linux, jalur sertifikat CA root biasanya adalah /etc/ssl/certs/ca-bundle.crt.
Untuk informasi lebih lanjut tentang parameter yang dimulai dengan
SLS_dalam contoh, lihat Konfigurasi.
# Konfigurasi plugin keluaran Kafka [[outputs.kafka]] ## URL broker kafka brokers = ["SLS_KAFKA_ENDPOINT"] ## Topik Kafka untuk pesan produser topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec mewakili berbagai codec kompresi yang diakui oleh ## Kafka dalam pesan. ## 0 : Tanpa kompresi ## 1 : Kompresi Gzip ## 2 : Kompresi Snappy ## 3 : Kompresi LZ4 compression_codec = 1 ## Konfigurasi TLS opsional tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## Gunakan TLS tetapi lewati verifikasi rantai & host # insecure_skip_verify = false ## Konfigurasi SASL Opsional sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## Format data untuk keluaran. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
Contoh 4: Gunakan Fluentd untuk mengunggah log
Fluentd adalah kolektor log open source yang dikembangkan di bawah Lisensi Apache 2 dan merupakan proyek dari Asosiasi Komputasi Cloud Native (CNCF).
Fluentd mendukung berbagai plugin input, pemrosesan, dan output. Gunakan plugin Kafka untuk mengunggah log ke Simple Log Service (SLS). Anda hanya perlu menginstal dan mengonfigurasi plugin Kafka. Untuk informasi lebih lanjut, lihat fluent-plugin-kafka.
Contoh Konfigurasi
Pada contoh ini, format output diatur ke JSON. Berbagai format lain juga didukung. Untuk informasi lebih lanjut, lihat Formatter Fluentd.
Untuk detail lebih lanjut mengenai parameter yang diawali dengan
SLS_dalam contoh ini, lihat Konfigurasi.
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # opsi produksi ruby-kafka max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
Contoh 5: Gunakan Logstash untuk mengunggah log
Logstash adalah mesin pengumpulan log waktu nyata open source. Gunakan Logstash untuk mengumpulkan log secara dinamis dari berbagai sumber.
Untuk mengunggah log menggunakan protokol Kafka, Anda harus menggunakan Logstash versi 7.10.1 atau yang lebih baru.
Logstash dilengkapi dengan plugin output Kafka bawaan. Konfigurasikan Logstash untuk mengunggah log ke SLS menggunakan protokol Kafka. Karena SLS menggunakan protokol koneksi SASL_SSL, Anda juga perlu mengonfigurasi sertifikat SSL dan file JAAS.
Contoh Konfigurasi
Dalam contoh ini, format output diatur ke JSON. Puluhan format lainnya juga didukung. Untuk informasi lebih lanjut, lihat Kodek Logstash.
CatatanContoh ini menunjukkan konfigurasi untuk uji coba konektivitas. Kami sarankan Anda menghapus konfigurasi output stdout di lingkungan produksi.
Untuk informasi lebih lanjut tentang parameter yang dimulai dengan
SLS_dalam contoh, lihat Konfigurasi.
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
Contoh 6: Gunakan Fluent-bit untuk mengunggah log
Fluent-bit adalah prosesor dan pengirim log serta metrik yang ringan dan sangat skalabel. Ini mendukung berbagai macam plugin input, pemrosesan, dan output. Gunakan plugin Kafka untuk mengunggah log ke SLS. Untuk informasi lebih lanjut, lihat Plugin Output Kafka.
Contoh Konfigurasi
Untuk informasi lebih lanjut tentang parameter yang dimulai dengan
SLS_dalam contoh, lihat Konfigurasi.[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
Contoh 7: Konfigurasikan Vector untuk mengunggah log menggunakan protokol Kafka
Vector adalah perangkat lunak pemrosesan log berperforma tinggi dan ringan yang mendukung pelaporan log menggunakan protokol Kafka. Bagian berikut menjelaskan cara mengonfigurasi Vector untuk menulis data ke SLS dalam mode kompatibel Kafka.
Contoh Konfigurasi
Untuk informasi lebih lanjut tentang parameter yang dimulai dengan
SLS_dalam contoh, lihat Konfigurasi.[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
Contoh 8: Gunakan produsen Kafka untuk mengunggah log
Java
Dependensi
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>Contoh Kode
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // Konfigurasi. Properties props = new Properties(); String project = "etl-shanghai-b"; String logstore = "testlog"; // Atur parameter ini ke true jika Anda ingin konten dari produsen diurai sebagai log JSON. boolean parseJson = false; // Akun Alibaba Cloud memiliki izin penuh pada semua operasi API. Ini menimbulkan ancaman keamanan yang tinggi. Kami sarankan Anda membuat dan menggunakan Pengguna RAM untuk memanggil operasi API atau melakukan O&M rutin. Untuk membuat Pengguna RAM, masuk ke konsol RAM. // Bagian ini memberikan contoh cara menyimpan ID AccessKey dan Rahasia AccessKey ke variabel lingkungan. Anda juga dapat menyimpan ID AccessKey dan Rahasia AccessKey ke file konfigurasi sesuai kebutuhan. // Kami sarankan Anda tidak menyimpan ID AccessKey dan Rahasia AccessKey di dalam kode untuk mencegah risiko keamanan. String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-shanghai.log.aliyuncs.com"; // Konfigurasikan parameter ini berdasarkan titik akhir proyek. String port = "10012"; // Gunakan port 10012 untuk Internet dan port 10011 untuk jaringan internal. String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // Buat instance produsen. KafkaProducer<String,String> producer = new KafkaProducer<>(props); // Kirim rekaman. for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; // Jika diperlukan, gunakan metode berikut untuk mengatur timestamp pesan. // long timestamp = System.currentTimeMillis(); // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content); ProducerRecord<String, String> record = new ProducerRecord<>(topic, content); producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("ERROR: Gagal mengirim pesan: " + exception.getMessage()); exception.printStackTrace(); } else { System.out.println("Pesan berhasil dikirim ke topik: " + metadata.topic() + ", partisi: " + metadata.partition() + ", offset: " + metadata.offset() + ", timestamp: " + metadata.timestamp()); } }); } producer.close(); } }
Python
Dependensi
pip install confluent-kafkaContoh kode
#!/bin/env python3 import time import os from confluent_kafka import Producer def delivery_report(err, msg): """ Dipanggil sekali untuk setiap pesan yang diproduksi untuk menunjukkan hasil pengiriman. Dipicu oleh poll() atau flush(). """ if err is not None: print('Pengiriman pesan gagal: {}'.format(err)) else: print('Pesan berhasil dikirim ke {} [{}] pada offset {}'.format(msg.topic(), msg.partition(), msg.offset())) def main(): project = "etl-shanghai-b" logstore = "testlog" parse_json = False # Dapatkan kredensial dari variabel lingkungan access_key_id = os.getenv("SLS_ACCESS_KEY_ID") access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET") endpoint = "cn-shanghai.log.aliyuncs.com" port = "10012" # Gunakan port 10012 untuk Internet dan port 10011 untuk jaringan internal. hosts = f"{project}.{endpoint}:{port}" topic = logstore if parse_json: topic = topic + ".json" # Konfigurasikan produsen Kafka conf = { 'bootstrap.servers': hosts, 'security.protocol': 'sasl_ssl', 'sasl.mechanisms': 'PLAIN', 'sasl.username': project, 'sasl.password': f"{access_key_id}#{access_key_secret}", 'enable.idempotence': False, } # Buat instance produsen producer = Producer(conf) # Kirim pesan content = "{\"msg\": \"Hello World\"}" producer.produce(topic=topic, value=content.encode('utf-8'), #timestamp=int(time.time() * 1000), # (Opsional) Setel timestamp rekaman dalam milidetik. callback=delivery_report) # Tunggu pesan yang belum selesai dikirim dan laporan pengiriman dipicu. producer.flush() if __name__ == '__main__': main()
Golang
Dependensi
go get github.com/confluentinc/confluent-kafka-go/kafkaContoh kode
package main import ( "fmt" "log" "os" // "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { project := "etl-shanghai-b" logstore := "testlog" parseJson := false // Dapatkan kredensial dari variabel lingkungan accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID") accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET") endpoint := "cn-shanghai.log.aliyuncs.com" port := "10012" // Gunakan port 10012 untuk Internet dan port 10011 untuk jaringan internal. hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port) topic := logstore if parseJson { topic = topic + ".json" } // Konfigurasikan produsen Kafka config := &kafka.ConfigMap{ "bootstrap.servers": hosts, "security.protocol": "sasl_ssl", "sasl.mechanisms": "PLAIN", "sasl.username": project, "sasl.password": accessKeyID + "#" + accessKeySecret, "enable.idempotence": false, } // Buat instance produsen producer, err := kafka.NewProducer(config) if err != nil { log.Fatalf("Gagal membuat produsen: %v", err) } defer producer.Close() // Kirim pesan dalam batch. messages := []string{ "{\"msg\": \"Hello World 1\"}", "{\"msg\": \"Hello World 2\"}", "{\"msg\": \"Hello World 3\"}", } for _, content := range messages { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(content), //Timestamp: time.Now(), // Setel waktu jika diperlukan. }, nil) if err != nil { log.Printf("Gagal memproduksi pesan: %v", err) } } // Aktifkan goroutine untuk mendengarkan apakah produsen berhasil mengirim pesan. go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Pengiriman gagal: %v\n", ev.TopicPartition.Error) } else { fmt.Printf("Pesan terkirim ke topik %s [%d] pada offset %v\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset) } } } }() producer.Flush(5 * 1000) }
Pesan kesalahan
Jika gagal mengunggah log menggunakan protokol Kafka, pesan kesalahan akan dikembalikan dalam format pesan kesalahan Kafka. Tabel berikut menjelaskan pesan kesalahan. Untuk informasi lebih lanjut tentang pesan kesalahan protokol Kafka, lihat Daftar Kesalahan.
Pesan Kesalahan | Deskripsi | Solusi yang Direkomendasikan |
NetworkException | Kesalahan jaringan terjadi. | Tunggu selama 1 detik, lalu coba lagi. |
TopicAuthorizationException | Otentikasi gagal. | Kesalahan ini biasanya disebabkan oleh pasangan AccessKey yang tidak valid atau izin yang tidak cukup untuk menulis data ke proyek atau logstore tertentu. Tentukan pasangan AccessKey yang valid dengan izin tulis. |
UnknownTopicOrPartitionException | Kesalahan ini dapat terjadi karena salah satu alasan berikut:
| Pastikan bahwa proyek dan logstore yang ditentukan ada. Jika kesalahan tetap ada, periksa apakah wilayah proyek sama dengan wilayah yang ditentukan dalam titik akhir. |
KafkaStorageException | Penyimpangan terjadi di server. | Tunggu selama 1 detik, lalu coba lagi. |