All Products
Search
Document Center

Simple Log Service:Gunakan protokol Kafka untuk mengunggah log

Last Updated:Mar 26, 2026

Gunakan alat pengumpulan log seperti SDK Kafka Producer, Beats, Collectd, Fluentd, Logstash, Telegraf, dan Vector untuk mengunggah log ke Simple Log Service (SLS) melalui protokol Kafka. Topik ini menjelaskan cara mengunggah log dari alat-alat tersebut ke SLS menggunakan protokol Kafka.

Batasan

  • Versi protokol Kafka yang didukung paling awal adalah 2.1.0.

  • Anda harus menggunakan protokol koneksi SASL_SSL untuk memastikan transmisi log yang aman.

Izin

Anda harus memiliki salah satu izin berikut.

  • AliyunLogFullAccess

    Kebijakan ini memberikan izin untuk mengelola SLS. Untuk informasi selengkapnya tentang cara memberikan izin, lihat Berikan izin kepada RAM user dan Kelola izin Peran RAM.

  • Kebijakan kustom

    1. Untuk membuat kebijakan kustom, pada tab Script Editor, ganti konten yang ada di kotak konfigurasi dengan skrip berikut. Untuk informasi selengkapnya, lihat Buat kebijakan kustom.

      Catatan

      Dalam skrip, ganti project_name dengan nama proyek Anda yang sebenarnya.

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project_name",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project_name/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. Lampirkan kebijakan kustom yang telah Anda buat ke RAM user. Untuk informasi selengkapnya, lihat Berikan izin kepada RAM user.

Konfigurasi

Saat menggunakan protokol Kafka untuk mengunggah log, Anda harus mengonfigurasi parameter berikut.

Nama konfigurasi

Nilai konfigurasi

Deskripsi

Contoh

SLS_KAFKA_ENDPOINT

Titik akhir kluster yang digunakan untuk koneksi awal. Formatnya adalah Nama Proyek.Titik akhir:Port. Anda harus mengonfigurasi parameter ini berdasarkan titik akhir proyek. Untuk informasi selengkapnya, lihat Titik akhir.

  • Jaringan internal: Nomor port adalah 10011. Contoh: Nama Proyek.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Internet: Nomor port adalah 10012. Contoh: Nama Proyek.cn-hangzhou.log.aliyuncs.com:10012.

aliyun-project-test adalah nama proyek. cn-hangzhou-xxx.aliyuncs.com adalah titik akhir. 10011 dan 10012 adalah nomor port untuk jaringan internal dan Internet.

  • Jaringan internal: aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Internet: aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012.

SLS_PROJECT

Nama proyek

Nama proyek SLS.

aliyun-project-test

SLS_LOGSTORE

Nama logstore

Nama logstore SLS. Jika Anda menambahkan akhiran .json ke nama logstore, SLS akan mencoba mengurai log sebagai log JSON.

Misalnya, nama logstore adalah test-logstore.

  • Jika Anda mengatur nilainya menjadi test-logstore, konten log yang diunggah disimpan di bidang content.

  • Jika Anda mengatur nilainya menjadi test-logstore.json, konten log yang diunggah diurai sebagai log JSON. Kunci pada lapisan pertama data JSON digunakan sebagai nama bidang, dan nilai yang sesuai digunakan sebagai nilai bidang.

SLS_PASSWORD

Rahasia AccessKey yang memiliki izin tulis pada SLS.

Untuk informasi tentang apa itu pasangan AccessKey dan cara membuatnya, lihat Buat pasangan AccessKey.

Nilainya terdiri dari ID AccessKey dan rahasia AccessKey, dipisahkan oleh simbol #.

LTAI****************#yourAccessKeySecret

Catatan

Jika Anda ingin menggunakan kelompok konsumen Kafka untuk mengonsumsi data dari SLS secara waktu nyata, ajukan tiket untuk berkonsultasi dengan dukungan teknis Alibaba Cloud.

Contoh 1: Gunakan Beats untuk mengunggah log

Beats—seperti Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, dan Heartbeat—dapat mengumpulkan log dan mengunggahnya ke SLS menggunakan protokol Kafka.

  • Contoh konfigurasi

    Untuk informasi selengkapnya tentang parameter yang diawali dengan SLS_ dalam contoh, lihat Konfigurasi.

    output.kafka:
      # initial brokers for reading cluster metadata
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # message topic selection + partitioning
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

Contoh 2: Gunakan Collectd untuk mengunggah log

Collectd adalah daemon yang secara berkala mengumpulkan metrik kinerja sistem dan aplikasi serta mengunggah metrik tersebut ke SLS menggunakan protokol Kafka. Untuk informasi selengkapnya, lihat Write Kafka Plugin.

Saat mengunggah log yang dikumpulkan oleh Collectd ke SLS, Anda juga harus menginstal plugin Kafka dan dependensinya. Misalnya, di CentOS, jalankan perintah sudo yum install collectd-write_kafka untuk menginstal plugin Kafka. Untuk informasi selengkapnya tentang cara menginstal paket Manajer Paket RPM (RPM), lihat Collectd-write_kafka.

  • Contoh konfigurasi

    • Dalam contoh ini, format output diatur ke JSON. Format lain, seperti Command dan Graphite, juga didukung. Untuk informasi selengkapnya, lihat dokumentasi konfigurasi Collectd.

    • Untuk informasi selengkapnya tentang parameter yang diawali dengan SLS_ dalam contoh, lihat Konfigurasi.

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

Contoh 3: Gunakan Telegraf untuk mengunggah log

Telegraf adalah agen berbasis Go yang menggunakan sedikit memori untuk mengumpulkan, memproses, dan mengagregasi metrik data. Telegraf menyediakan berbagai plugin dan fitur integrasi. Gunakan Telegraf untuk mendapatkan berbagai metrik dari sistem host-nya, mengambil metrik dari API pihak ketiga, serta mendengarkan metrik melalui layanan statsd dan konsumen Kafka.

Sebelum mengunggah log yang dikumpulkan oleh Telegraf ke SLS menggunakan protokol Kafka, Anda harus memodifikasi file konfigurasi.

  • Contoh konfigurasi

    • Dalam contoh ini, format output diatur ke JSON. Format lain, seperti Graphite dan Carbon2, juga didukung. Untuk informasi selengkapnya, lihat format output Telegraf.

      Catatan

      Anda harus mengonfigurasi path yang valid untuk tls_ca di Telegraf. Gunakan path sertifikat root yang disediakan oleh server. Di lingkungan Linux, path sertifikat CA root biasanya /etc/ssl/certs/ca-bundle.crt.

    • Untuk informasi selengkapnya tentang parameter yang diawali dengan SLS_ dalam contoh, lihat Konfigurasi.

    # Kafka output plugin configuration
    [[outputs.kafka]]
      ## URLs of kafka brokers
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## Kafka topic for producer messages
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec represents the various compression codecs recognized by
      ## Kafka in messages.
      ## 0 : No compression
      ## 1 : Gzip compression
      ## 2 : Snappy compression
      ## 3 : LZ4 compression
      compression_codec = 1
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## Use TLS but skip chain & host verification
      # insecure_skip_verify = false
      ## Optional SASL Config
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## Data format to output.
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

Contoh 4: Gunakan Fluentd untuk mengunggah log

Fluentd adalah pengumpul log open source dan proyek dari Cloud Native Computing Foundation (CNCF) yang dikembangkan di bawah Lisensi Apache 2.

Fluentd mendukung berbagai plugin input, pemrosesan, dan output. Gunakan plugin Kafka untuk mengunggah log ke SLS. Anda hanya perlu menginstal dan mengonfigurasi plugin Kafka. Untuk informasi selengkapnya, lihat fluent-plugin-kafka.

  • Contoh konfigurasi

    • Dalam contoh ini, format output diatur ke JSON. Puluhan format lain juga didukung. Untuk informasi selengkapnya, lihat Fluentd Formatter.

    • Untuk informasi selengkapnya tentang parameter yang diawali dengan SLS_ dalam contoh, lihat Konfigurasi.

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka producer options
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

Contoh 5: Gunakan Logstash untuk mengunggah log

Logstash adalah mesin pengumpulan log open source waktu nyata. Gunakan Logstash untuk mengumpulkan log secara dinamis dari berbagai sumber.

Catatan

Untuk mengunggah log menggunakan protokol Kafka, Anda harus menggunakan Logstash 7.10.1 atau versi yang lebih baru.

Logstash memiliki plugin output Kafka bawaan. Konfigurasikan Logstash untuk mengunggah log ke SLS menggunakan protokol Kafka. Karena SLS menggunakan protokol koneksi SASL_SSL, Anda juga harus mengonfigurasi sertifikat SSL dan file JAAS.

  • Contoh konfigurasi

    • Dalam contoh ini, format output diatur ke JSON. Puluhan format lain juga didukung. Untuk informasi selengkapnya, lihat Logstash Codec.

      Catatan

      Contoh ini menunjukkan konfigurasi untuk uji konektivitas. Kami menyarankan agar Anda menghapus konfigurasi output stdout di lingkungan produksi.

    • Untuk informasi selengkapnya tentang parameter yang diawali dengan SLS_ dalam contoh, lihat Konfigurasi.

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

Contoh 6: Gunakan Fluent-bit untuk mengunggah log

Fluent-bit adalah prosesor dan forwarder log serta metrik yang ringan dan sangat skalabel. Fluent-bit mendukung berbagai plugin input, pemrosesan, dan output. Gunakan plugin Kafka untuk mengunggah log ke SLS. Untuk informasi selengkapnya, lihat Kafka output plugin.

  • Contoh konfigurasi

    Untuk informasi selengkapnya tentang parameter yang diawali dengan SLS_ dalam contoh, lihat Konfigurasi.

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

Contoh 7: Konfigurasikan Vector untuk mengunggah log menggunakan protokol Kafka

Vector adalah perangkat lunak pemrosesan log yang ringan dan berkinerja tinggi yang mendukung pelaporan log menggunakan protokol Kafka. Bagian berikut menjelaskan cara mengonfigurasi Vector untuk menulis data ke SLS dalam mode kompatibel Kafka.

  • Contoh konfigurasi

    Untuk informasi selengkapnya tentang parameter yang diawali dengan SLS_ dalam contoh, lihat Konfigurasi.

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

Contoh 8: Gunakan produsen Kafka untuk mengunggah log

Java

  • Dependensi

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • Kode contoh

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // The configurations.
            Properties props = new Properties();
            String project = "etl-shanghai-b";
            String logstore = "testlog";
            // Set this parameter to true if you want the content of the producer to be parsed as a JSON log.
            boolean parseJson = false;
            // An Alibaba Cloud account has full permissions on all API operations. This poses a high security threat. We recommend that you create and use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console.
            // This section provides an example on how to save an AccessKey ID and an AccessKey secret to environment variables. You can also save the AccessKey ID and AccessKey secret to a configuration file as needed.
            // We recommend that you do not save the AccessKey ID and AccessKey secret in the code to prevent security risks.
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-shanghai.log.aliyuncs.com"; // Configure this parameter based on the endpoint of the project.
            String port = "10012"; // Use port 10012 for the Internet and port 10011 for the internal network.
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
    
            //Create a producer instance.
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            //Send records.
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                // If needed, use the following method to set the timestamp of the message.
                // long timestamp = System.currentTimeMillis();
                // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content);
    
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, content);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("ERROR: Failed to send message: " + exception.getMessage());
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent successfully to topic: " + metadata.topic() +
                                         ", partition: " + metadata.partition() +
                                         ", offset: " + metadata.offset() +
                                         ", timestamp: " + metadata.timestamp());
                    }
                });
            }
    
            producer.close();
        }
    }
    

Python

  • Dependensi

    pip install confluent-kafka
  • Kode contoh

    #!/bin/env python3
    import time
    import os
    from confluent_kafka import Producer
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}] at offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
    
    def main():
        project = "etl-shanghai-b"
        logstore = "testlog"
        parse_json = False
    
        # Get credentials from environment variables
        access_key_id = os.getenv("SLS_ACCESS_KEY_ID")
        access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET")
        endpoint = "cn-shanghai.log.aliyuncs.com"
        port = "10012" # Use port 10012 for the Internet and port 10011 for the internal network.
    
        hosts = f"{project}.{endpoint}:{port}"
        topic = logstore
        if parse_json:
            topic = topic + ".json"
    
        # Configure Kafka producer
        conf = {
            'bootstrap.servers': hosts,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': project,
            'sasl.password': f"{access_key_id}#{access_key_secret}",
            'enable.idempotence': False,
        }
    
        # Create producer instance
        producer = Producer(conf)
    
        # Send message
        content = "{\"msg\": \"Hello World\"}"
        producer.produce(topic=topic,
                         value=content.encode('utf-8'),
                         #timestamp=int(time.time() * 1000),  # (Optional) Set the timestamp of the record in milliseconds.
                         callback=delivery_report)
    
        # Wait for any outstanding messages to be delivered and delivery report
        # callbacks to be triggered.
        producer.flush()
    
    if __name__ == '__main__':
        main()
    

Golang

  • Dependensi

    go get github.com/confluentinc/confluent-kafka-go/kafka
  • Kode contoh

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    //	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    	project := "etl-shanghai-b"
    	logstore := "testlog"
    	parseJson := false
    
    	// Get credentials from environment variables
    	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
    	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
    	endpoint := "cn-shanghai.log.aliyuncs.com"
    	port := "10012" // Use port 10012 for the Internet and port 10011 for the internal network.
    
    	hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port)
    	topic := logstore
    	if parseJson {
    		topic = topic + ".json"
    	}
    
    	// Configure Kafka producer
    	config := &kafka.ConfigMap{
    		"bootstrap.servers":  hosts,
    		"security.protocol":  "sasl_ssl",
    		"sasl.mechanisms":    "PLAIN",
    		"sasl.username":      project,
    		"sasl.password":      accessKeyID + "#" + accessKeySecret,
    		"enable.idempotence": false,
    	}
    
    	// Create producer instance
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Fatalf("Failed to create producer: %v", err)
    	}
    	defer producer.Close()
    
    	// Send messages in batches.
    	messages := []string{
    		"{\"msg\": \"Hello World 1\"}",
    		"{\"msg\": \"Hello World 2\"}",
    		"{\"msg\": \"Hello World 3\"}",
    	}
    
    	for _, content := range messages {
    		err := producer.Produce(&kafka.Message{
    			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    			Value:          []byte(content),
    			//Timestamp:      time.Now(), // Set the time if needed.
    		}, nil)
    
    		if err != nil {
    			log.Printf("Failed to produce message: %v", err)
    		}
    	}
    
    	// Enable a goroutine to listen for whether the producer successfully sends the message.
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
    				} else {
    					fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
    						*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
    				}
    			}
    		}
    	}()
    
    	producer.Flush(5 * 1000)
    }

Pesan error

Jika Anda gagal mengunggah log menggunakan protokol Kafka, pesan error akan dikembalikan dalam format pesan error Kafka. Tabel berikut menjelaskan pesan-pesan error tersebut. Untuk informasi selengkapnya tentang pesan error protokol Kafka, lihat daftar error.

Pesan error

Deskripsi

Solusi yang direkomendasikan

NetworkException

Terjadi error jaringan.

Tunggu 1 detik lalu coba lagi.

TopicAuthorizationException

Autentikasi gagal.

Error ini biasanya menunjukkan bahwa pasangan AccessKey yang diberikan tidak valid atau tidak memiliki izin tulis ke proyek atau logstore yang ditentukan. Pastikan Anda menggunakan pasangan AccessKey yang valid dengan izin tulis yang diperlukan.

UnknownTopicOrPartitionException

Error ini dapat terjadi karena salah satu alasan berikut:

  • Proyek atau logstore yang ditentukan tidak ada.

  • Wilayah proyek berbeda dengan wilayah yang ditentukan dalam titik akhir.

Pastikan proyek dan logstore yang ditentukan ada. Jika error tetap terjadi, verifikasi bahwa wilayah proyek sesuai dengan wilayah titik akhir.

KafkaStorageException

Terjadi exception di server.

Tunggu 1 detik lalu coba lagi.