全部产品
Search
文档中心

Simple Log Service:Gunakan protokol Kafka untuk mengunggah log

更新时间:Oct 30, 2025

Gunakan alat pengumpulan log seperti Kafka Producer SDK, Beats, Collectd, Fluentd, Logstash, Telegraf, dan Vector untuk mengunggah log ke Simple Log Service (SLS) menggunakan protokol Kafka. Topik ini menjelaskan cara menggunakan protokol Kafka untuk mengunggah log dari alat-alat tersebut ke SLS.

Batasan

  • Versi paling awal dari protokol Kafka yang didukung adalah 2.1.0.

  • Anda harus menggunakan protokol koneksi SASL_SSL untuk memastikan transmisi log yang aman.

Izin

Anda harus memiliki salah satu izin berikut:

  • AliyunLogFullAccess

    Kebijakan ini memberikan izin untuk mengelola SLS. Untuk informasi lebih lanjut tentang cara memberikan izin, lihat Berikan Izin kepada Pengguna RAM dan Berikan Izin kepada Peran RAM.

  • Kebijakan Kustom

    1. Untuk membuat kebijakan kustom, pada tab Script Editor, ganti konten yang ada di kotak konfigurasi dengan skrip berikut. Untuk informasi lebih lanjut, lihat Buat Kebijakan Kustom.

      Catatan

      Di dalam skrip, ganti project_name dengan nama proyek Anda yang sebenarnya.

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project_name",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project_name/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. Lampirkan kebijakan kustom yang Anda buat ke Pengguna RAM. Untuk informasi lebih lanjut, lihat Berikan Izin kepada Pengguna RAM.

Konfigurasi

Saat menggunakan protokol Kafka untuk mengunggah log, Anda harus mengonfigurasi parameter berikut:

Nama Konfigurasi

Nilai Konfigurasi

Deskripsi

Contoh

SLS_KAFKA_ENDPOINT

Titik akhir kluster yang digunakan untuk koneksi awal. Formatnya adalah Nama Proyek.Endpoint:Port. Anda harus mengonfigurasi parameter ini berdasarkan titik akhir proyek. Untuk informasi lebih lanjut, lihat Titik Akhir.

  • Jaringan internal: Nomor port adalah 10011. Contoh: Nama Proyek.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Internet: Nomor port adalah 10012. Contoh: Nama Proyek.cn-hangzhou.log.aliyuncs.com:10012.

aliyun-project-test adalah nama proyek. cn-hangzhou-xxx.aliyuncs.com adalah titik akhir. 10011 dan 10012 adalah nomor port untuk jaringan internal dan Internet.

  • Jaringan internal: aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Internet: aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012.

SLS_PROJECT

Nama Proyek

Nama proyek SLS.

aliyun-project-test

SLS_LOGSTORE

Nama Logstore

Nama logstore. Jika Anda menambahkan .json ke nama logstore, SLS mencoba mengurai log sebagai log JSON.

Sebagai contoh, nama logstore adalah test-logstore.

  • Jika Anda mengatur nilainya menjadi test-logstore, isi log yang diunggah disimpan di bidang content.

  • Jika Anda mengatur nilainya menjadi test-logstore.json, isi log yang diunggah diurai sebagai log JSON. Kunci-kunci di lapisan pertama data JSON digunakan sebagai nama bidang, dan nilai-nilai yang sesuai digunakan sebagai nilai bidang.

SLS_PASSWORD

Rahasia AccessKey yang memiliki izin tulis pada SLS.

Untuk informasi tentang apa itu Pasangan Kunci Akses dan cara pembuatannya, lihat Buat Pasangan Kunci Akses.

Nilai tersebut terdiri dari ID AccessKey dan rahasia AccessKey, dipisahkan oleh simbol #.

  • ID AccessKey: ID AccessKey dari akun Alibaba Cloud atau Pengguna RAM.

  • Rahasia AccessKey: Rahasia AccessKey dari akun Alibaba Cloud atau Pengguna RAM.

LTAI****************#yourAccessKeySecret

Catatan

Jika ingin menggunakan kelompok konsumen Kafka untuk mengonsumsi data dari SLS secara real-time, ajukan tiket untuk berkonsultasi dengan dukungan teknis Alibaba Cloud.

Contoh 1: Gunakan Beats untuk mengunggah log

Beats, seperti MetricBeat, PacketBeat, Winlogbeat, Auditbeat, Filebeat, dan Heartbeat, dapat mengumpulkan log dan mengunggahnya ke SLS menggunakan protokol Kafka.

  • Contoh Konfigurasi

    Untuk informasi lebih lanjut tentang parameter yang dimulai dengan SLS_ dalam contoh, lihat Konfigurasi.

    output.kafka:
      # broker awal untuk membaca metadata kluster
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # pemilihan topik pesan + partisi
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

Contoh 2: Gunakan Collectd untuk mengunggah log

Collectd adalah daemon yang secara berkala mengumpulkan metrik kinerja sistem dan aplikasi serta mengunggah metrik tersebut ke SLS menggunakan protokol Kafka. Untuk informasi lebih lanjut, lihat Plugin Write Kafka.

Saat mengunggah log yang dikumpulkan oleh Collectd ke SLS, Anda juga harus menginstal plugin Kafka dan dependensinya. Sebagai contoh, di CentOS, jalankan perintah sudo yum install collectd-write_kafka untuk menginstal plugin Kafka. Untuk informasi lebih lanjut tentang cara menginstal paket RPM Package Manager (RPM), lihat Collectd-write_kafka.

  • Contoh Konfigurasi

    • Dalam contoh ini, format output diatur ke JSON. Format lain, seperti Command dan Graphite, juga didukung. Untuk informasi lebih lanjut, lihat Dokumen Konfigurasi Collectd.

    • Untuk informasi lebih lanjut tentang parameter yang dimulai dengan SLS_ dalam contoh, lihat Konfigurasi.

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

Contoh 3: Gunakan Telegraf untuk mengunggah log

Telegraf adalah program agen berbasis Go yang menggunakan sedikit memori untuk mengumpulkan, memproses, dan menggabungkan metrik data. Telegraf menyediakan berbagai macam plugin dan fitur integrasi. Gunakan Telegraf untuk mendapatkan berbagai metrik dari sistem host-nya, mengambil metrik dari API pihak ketiga, dan mendengarkan metrik menggunakan layanan statsd dan konsumen Kafka.

Sebelum mengunggah log yang dikumpulkan oleh Telegraf ke SLS menggunakan protokol Kafka, Anda harus memodifikasi file konfigurasi.

  • Contoh Konfigurasi

    • Dalam contoh ini, format output diatur ke JSON. Format lain, seperti Graphite dan Carbon2, juga didukung. Untuk informasi lebih lanjut, lihat Format Output Telegraf.

      Catatan

      Anda harus mengonfigurasi jalur yang valid untuk tls_ca di Telegraf. Gunakan jalur sertifikat root yang disediakan oleh server. Di lingkungan Linux, jalur sertifikat CA root biasanya adalah /etc/ssl/certs/ca-bundle.crt.

    • Untuk informasi lebih lanjut tentang parameter yang dimulai dengan SLS_ dalam contoh, lihat Konfigurasi.

    # Konfigurasi plugin keluaran Kafka
    [[outputs.kafka]]
      ## URL broker kafka
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## Topik Kafka untuk pesan produser
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec mewakili berbagai codec kompresi yang diakui oleh
      ## Kafka dalam pesan.
      ## 0 : Tanpa kompresi
      ## 1 : Kompresi Gzip
      ## 2 : Kompresi Snappy
      ## 3 : Kompresi LZ4
      compression_codec = 1
      ## Konfigurasi TLS opsional tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## Gunakan TLS tetapi lewati verifikasi rantai & host
      # insecure_skip_verify = false
      ## Konfigurasi SASL Opsional
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## Format data untuk keluaran.
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

Contoh 4: Gunakan Fluentd untuk mengunggah log

Fluentd adalah kolektor log open source yang dikembangkan di bawah Lisensi Apache 2 dan merupakan proyek dari Asosiasi Komputasi Cloud Native (CNCF).

Fluentd mendukung berbagai plugin input, pemrosesan, dan output. Gunakan plugin Kafka untuk mengunggah log ke Simple Log Service (SLS). Anda hanya perlu menginstal dan mengonfigurasi plugin Kafka. Untuk informasi lebih lanjut, lihat fluent-plugin-kafka.

  • Contoh Konfigurasi

    • Pada contoh ini, format output diatur ke JSON. Berbagai format lain juga didukung. Untuk informasi lebih lanjut, lihat Formatter Fluentd.

    • Untuk detail lebih lanjut mengenai parameter yang diawali dengan SLS_ dalam contoh ini, lihat Konfigurasi.

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # opsi produksi ruby-kafka
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

Contoh 5: Gunakan Logstash untuk mengunggah log

Logstash adalah mesin pengumpulan log waktu nyata open source. Gunakan Logstash untuk mengumpulkan log secara dinamis dari berbagai sumber.

Catatan

Untuk mengunggah log menggunakan protokol Kafka, Anda harus menggunakan Logstash versi 7.10.1 atau yang lebih baru.

Logstash dilengkapi dengan plugin output Kafka bawaan. Konfigurasikan Logstash untuk mengunggah log ke SLS menggunakan protokol Kafka. Karena SLS menggunakan protokol koneksi SASL_SSL, Anda juga perlu mengonfigurasi sertifikat SSL dan file JAAS.

  • Contoh Konfigurasi

    • Dalam contoh ini, format output diatur ke JSON. Puluhan format lainnya juga didukung. Untuk informasi lebih lanjut, lihat Kodek Logstash.

      Catatan

      Contoh ini menunjukkan konfigurasi untuk uji coba konektivitas. Kami sarankan Anda menghapus konfigurasi output stdout di lingkungan produksi.

    • Untuk informasi lebih lanjut tentang parameter yang dimulai dengan SLS_ dalam contoh, lihat Konfigurasi.

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

Contoh 6: Gunakan Fluent-bit untuk mengunggah log

Fluent-bit adalah prosesor dan pengirim log serta metrik yang ringan dan sangat skalabel. Ini mendukung berbagai macam plugin input, pemrosesan, dan output. Gunakan plugin Kafka untuk mengunggah log ke SLS. Untuk informasi lebih lanjut, lihat Plugin Output Kafka.

  • Contoh Konfigurasi

    Untuk informasi lebih lanjut tentang parameter yang dimulai dengan SLS_ dalam contoh, lihat Konfigurasi.

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

Contoh 7: Konfigurasikan Vector untuk mengunggah log menggunakan protokol Kafka

Vector adalah perangkat lunak pemrosesan log berperforma tinggi dan ringan yang mendukung pelaporan log menggunakan protokol Kafka. Bagian berikut menjelaskan cara mengonfigurasi Vector untuk menulis data ke SLS dalam mode kompatibel Kafka.

  • Contoh Konfigurasi

    Untuk informasi lebih lanjut tentang parameter yang dimulai dengan SLS_ dalam contoh, lihat Konfigurasi.

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

Contoh 8: Gunakan produsen Kafka untuk mengunggah log

Java

  • Dependensi

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • Contoh Kode

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // Konfigurasi.
            Properties props = new Properties();
            String project = "etl-shanghai-b";
            String logstore = "testlog";
            // Atur parameter ini ke true jika Anda ingin konten dari produsen diurai sebagai log JSON.
            boolean parseJson = false;
            // Akun Alibaba Cloud memiliki izin penuh pada semua operasi API. Ini menimbulkan ancaman keamanan yang tinggi. Kami sarankan Anda membuat dan menggunakan Pengguna RAM untuk memanggil operasi API atau melakukan O&M rutin. Untuk membuat Pengguna RAM, masuk ke konsol RAM.
            // Bagian ini memberikan contoh cara menyimpan ID AccessKey dan Rahasia AccessKey ke variabel lingkungan. Anda juga dapat menyimpan ID AccessKey dan Rahasia AccessKey ke file konfigurasi sesuai kebutuhan.
            // Kami sarankan Anda tidak menyimpan ID AccessKey dan Rahasia AccessKey di dalam kode untuk mencegah risiko keamanan.
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-shanghai.log.aliyuncs.com"; // Konfigurasikan parameter ini berdasarkan titik akhir proyek.
            String port = "10012"; // Gunakan port 10012 untuk Internet dan port 10011 untuk jaringan internal.
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
    
            // Buat instance produsen.
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            // Kirim rekaman.
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                // Jika diperlukan, gunakan metode berikut untuk mengatur timestamp pesan.
                // long timestamp = System.currentTimeMillis();
                // ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, null, content);
    
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, content);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("ERROR: Gagal mengirim pesan: " + exception.getMessage());
                        exception.printStackTrace();
                    } else {
                        System.out.println("Pesan berhasil dikirim ke topik: " + metadata.topic() +
                                         ", partisi: " + metadata.partition() +
                                         ", offset: " + metadata.offset() +
                                         ", timestamp: " + metadata.timestamp());
                    }
                });
            }
    
            producer.close();
        }
    }
    

Python

  • Dependensi

    pip install confluent-kafka
  • Contoh kode

    #!/bin/env python3
    import time
    import os
    from confluent_kafka import Producer
    
    def delivery_report(err, msg):
        """ Dipanggil sekali untuk setiap pesan yang diproduksi untuk menunjukkan hasil pengiriman.
            Dipicu oleh poll() atau flush(). """
        if err is not None:
            print('Pengiriman pesan gagal: {}'.format(err))
        else:
            print('Pesan berhasil dikirim ke {} [{}] pada offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
    
    def main():
        project = "etl-shanghai-b"
        logstore = "testlog"
        parse_json = False
    
        # Dapatkan kredensial dari variabel lingkungan
        access_key_id = os.getenv("SLS_ACCESS_KEY_ID")
        access_key_secret = os.getenv("SLS_ACCESS_KEY_SECRET")
        endpoint = "cn-shanghai.log.aliyuncs.com"
        port = "10012" # Gunakan port 10012 untuk Internet dan port 10011 untuk jaringan internal.
    
        hosts = f"{project}.{endpoint}:{port}"
        topic = logstore
        if parse_json:
            topic = topic + ".json"
    
        # Konfigurasikan produsen Kafka
        conf = {
            'bootstrap.servers': hosts,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': project,
            'sasl.password': f"{access_key_id}#{access_key_secret}",
            'enable.idempotence': False,
        }
    
        # Buat instance produsen
        producer = Producer(conf)
    
        # Kirim pesan
        content = "{\"msg\": \"Hello World\"}"
        producer.produce(topic=topic,
                         value=content.encode('utf-8'),
                         #timestamp=int(time.time() * 1000),  # (Opsional) Setel timestamp rekaman dalam milidetik.
                         callback=delivery_report)
    
        # Tunggu pesan yang belum selesai dikirim dan laporan pengiriman dipicu.
        producer.flush()
    
    if __name__ == '__main__':
        main()
    

Golang

  • Dependensi

    go get github.com/confluentinc/confluent-kafka-go/kafka
  • Contoh kode

    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    //	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    	project := "etl-shanghai-b"
    	logstore := "testlog"
    	parseJson := false
    
    	// Dapatkan kredensial dari variabel lingkungan
    	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
    	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
    	endpoint := "cn-shanghai.log.aliyuncs.com"
    	port := "10012" // Gunakan port 10012 untuk Internet dan port 10011 untuk jaringan internal.
    
    	hosts := fmt.Sprintf("%s.%s:%s", project, endpoint, port)
    	topic := logstore
    	if parseJson {
    		topic = topic + ".json"
    	}
    
    	// Konfigurasikan produsen Kafka
    	config := &kafka.ConfigMap{
    		"bootstrap.servers":  hosts,
    		"security.protocol":  "sasl_ssl",
    		"sasl.mechanisms":    "PLAIN",
    		"sasl.username":      project,
    		"sasl.password":      accessKeyID + "#" + accessKeySecret,
    		"enable.idempotence": false,
    	}
    
    	// Buat instance produsen
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Fatalf("Gagal membuat produsen: %v", err)
    	}
    	defer producer.Close()
    
    	// Kirim pesan dalam batch.
    	messages := []string{
    		"{\"msg\": \"Hello World 1\"}",
    		"{\"msg\": \"Hello World 2\"}",
    		"{\"msg\": \"Hello World 3\"}",
    	}
    
    	for _, content := range messages {
    		err := producer.Produce(&kafka.Message{
    			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    			Value:          []byte(content),
    			//Timestamp:      time.Now(), // Setel waktu jika diperlukan.
    		}, nil)
    
    		if err != nil {
    			log.Printf("Gagal memproduksi pesan: %v", err)
    		}
    	}
    
    	// Aktifkan goroutine untuk mendengarkan apakah produsen berhasil mengirim pesan.
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("Pengiriman gagal: %v\n", ev.TopicPartition.Error)
    				} else {
    					fmt.Printf("Pesan terkirim ke topik %s [%d] pada offset %v\n",
    						*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
    				}
    			}
    		}
    	}()
    
    	producer.Flush(5 * 1000)
    }

Pesan kesalahan

Jika gagal mengunggah log menggunakan protokol Kafka, pesan kesalahan akan dikembalikan dalam format pesan kesalahan Kafka. Tabel berikut menjelaskan pesan kesalahan. Untuk informasi lebih lanjut tentang pesan kesalahan protokol Kafka, lihat Daftar Kesalahan.

Pesan Kesalahan

Deskripsi

Solusi yang Direkomendasikan

NetworkException

Kesalahan jaringan terjadi.

Tunggu selama 1 detik, lalu coba lagi.

TopicAuthorizationException

Otentikasi gagal.

Kesalahan ini biasanya disebabkan oleh pasangan AccessKey yang tidak valid atau izin yang tidak cukup untuk menulis data ke proyek atau logstore tertentu. Tentukan pasangan AccessKey yang valid dengan izin tulis.

UnknownTopicOrPartitionException

Kesalahan ini dapat terjadi karena salah satu alasan berikut:

  • Proyek atau logstore yang ditentukan tidak ada.

  • Wilayah proyek berbeda dengan wilayah yang ditentukan dalam titik akhir.

Pastikan bahwa proyek dan logstore yang ditentukan ada. Jika kesalahan tetap ada, periksa apakah wilayah proyek sama dengan wilayah yang ditentukan dalam titik akhir.

KafkaStorageException

Penyimpangan terjadi di server.

Tunggu selama 1 detik, lalu coba lagi.