全部产品
Search
文档中心

DataHub:Kompatibilitas dengan Kafka

更新时间:Jun 30, 2025

DataHub sepenuhnya kompatibel dengan protokol Kafka. Anda dapat menggunakan klien Kafka asli untuk membaca dan menulis data ke DataHub.

Informasi latar belakang

Pemetaan dari Kafka ke DataHub

Jenis Topik

Mode ekspansi topik di Kafka berbeda dari yang ada di DataHub. Untuk menyesuaikan dengan mode ekspansi topik di Kafka, atur parameter ExpandMode menjadi ONLY_EXTEND saat membuat topik di DataHub. Topik dengan parameter ExpandMode diatur ke ONLY_EXTEND tidak mendukung operasi pemisahan atau penggabungan. Anda dapat menambahkan shard, tetapi tidak dapat menghapusnya.

Penamaan Topik

Nama topik di Kafka memetakan nama proyek dan nama topik di DataHub, yang dipisahkan dengan titik (.). Sebagai contoh, topik bernama test_project.test_topic di Kafka memetakan topik bernama test_topic dalam proyek bernama test_project di DataHub. Jika nama topik di Kafka mengandung beberapa titik (.), bagian sebelum titik pertama (.) adalah nama proyek di DataHub, dan sisanya adalah nama topik. Titik lainnya (.) dan tanda hubung (-) diganti dengan garis bawah (_).

Partisi

Setiap shard aktif di DataHub sesuai dengan partisi di Kafka. Jika jumlah shard aktif di DataHub adalah lima, dapat dianggap bahwa Kafka memiliki lima partisi. Saat menulis data, Anda dapat menentukan partisi berdasarkan ID partisi [0,4]. Jika Anda tidak menentukan partisi, klien Kafka akan menentukan partisi tempat data ditulis.

Topik TUPLE

Saat menulis pasangan nilai-kunci dari Kafka ke topik TUPLE di DataHub, skema topik TUPLE harus berisi satu atau dua bidang bertipe STRING. Jika tidak, penulisan data gagal. Jika skema hanya berisi satu bidang, hanya nilai dari pasangan nilai-kunci yang ditulis, sementara kuncinya dibuang. Jika skema berisi dua bidang, nilainya ditulis ke satu bidang dan kuncinya ke bidang lainnya. Jika Anda menulis data biner ke topik TUPLE, data tersebut akan ditampilkan sebagai teks kacau di topik. Kami sarankan Anda menulis data biner ke topik BLOB.

Topik BLOB

Saat menulis pasangan nilai-kunci dari Kafka ke topik BLOB di DataHub, nilainya ditulis ke topik BLOB. Jika kuncinya bukan NULL, kuncinya ditulis ke DataHub sebagai atribut. Kunci atributnya adalah __kafka_key__, dan nilainya adalah kunci data Kafka.

Header

Header di Kafka sesuai dengan atribut di DataHub. Namun, header yang nilainya NULL diabaikan di Kafka. Kami sarankan Anda tidak menggunakan __kafka_key__ sebagai kunci header.

Grup Konsumen

Di DataHub, grup konsumen memetakan ID langganan dan hanya dapat berlangganan satu topik pada satu waktu. Namun, grup Kafka dapat berlangganan beberapa topik pada satu waktu. Untuk meningkatkan kompatibilitas dengan metode langganan di Kafka, DataHub memungkinkan Anda membuat grup di proyek dan mengikat grup dengan topik yang ingin Anda langgapi, lalu gunakan grup tersebut untuk berlangganan beberapa topik di proyek. Di DataHub, grup adalah sekumpulan langganan yang dikelola oleh server. Jika grup terikat dengan topik, Anda dapat melihat langganan yang dibuat otomatis oleh grup pada tab Daftar Langganan halaman detail topik. Jika Anda menghapus langganan, grup tidak dapat berlangganan topik, dan offset konsumsi yang ada menghilang.

Grup dapat berlangganan maksimal 50 topik. Jika Anda perlu berlangganan lebih banyak topik untuk sebuah grup, ajukan tiket.

Parameter Konfigurasi Kafka

C = Konsumen, P = Produsen, S = Streams

Parameter

C/P/S

Nilai valid

Diperlukan

Deskripsi

bootstrap.servers

*

Untuk informasi lebih lanjut, lihat bagian "Titik akhir Kafka" dari topik ini.

Ya

security.protocol

*

SASL_SSL

Ya

Untuk memastikan transmisi data yang aman, Secure Sockets Layer (SSL) digunakan untuk enkripsi saat data ditulis dari klien Kafka ke DataHub.

sasl.mechanism

*

PLAIN

Ya

Mode autentikasi AccessKey. Atur parameter ini ke PLAIN.

compression.type

P

LZ4

Tidak

Menentukan apakah akan mengaktifkan transmisi terkompresi. Hanya algoritma kompresi LZ4 yang didukung.

group.id

C

project.topic:subId

atau

project.group

Ya

ID grup konsumen. Atur parameter ini berdasarkan topik yang dilanggan jika Anda menggunakan project.topic:subId. Jika tidak, data tidak dapat dibaca. Kami sarankan Anda menggunakan project.group.

partition.assignment.strategy

C

org.apache.kafka.clients.consumer.RangeAssignor

Tidak

Kebijakan untuk penugasan partisi. Kebijakan default untuk penugasan partisi di Kafka adalah RangeAssignor, yang juga merupakan satu-satunya kebijakan yang didukung oleh DataHub. Jangan ubah parameter ini.

session.timeout.ms

C/S

[60000, 180000]

Tidak

Periode timeout sesi. Periode timeout sesi default di Kafka adalah 10.000 milidetik. Namun, periode timeout sesi minimum di DataHub adalah 60.000 milidetik. Oleh karena itu, nilai default parameter ini adalah 60000.

heartbeat.interval.ms

C/S

Kami sarankan Anda mengatur parameter ini menjadi dua pertiga dari periode timeout sesi yang ditentukan.

Tidak

Interval denyut jantung. Interval denyut jantung default di Kafka adalah 3.000 milidetik. Karena nilai default parameter session.timeout.ms adalah 60000, kami sarankan Anda mengatur parameter heartbeat.interval.ms menjadi 40000. Jika tidak, permintaan denyut jantung akan terlalu sering.

application.id

S

project.topic:subId

atau

project.group

Ya

ID aplikasi. Atur parameter ini berdasarkan topik yang dilanggan jika Anda menggunakan project.topic:subId. Jika tidak, pembacaan data gagal. Kami sarankan Anda menggunakan project.group.

Tabel di atas menjelaskan parameter yang perlu diperhatikan secara khusus saat menulis data dari klien Kafka ke DataHub. Parameter terkait klien seperti retries,batch.size tidak terpengaruh. Parameter terkait server tidak memengaruhi perilaku server. Sebagai contoh, apa pun nilai parameter acks, DataHub mengembalikan nilai setelah data ditulis.

Titik Akhir Kafka

Wilayah

ID Wilayah

Titik akhir publik

Titik akhir ECS pada jaringan klasik

Titik akhir ECS di VPC

Tiongkok (Hangzhou)

cn-hangzhou

dh-cn-hangzhou.aliyuncs.com:9092

dh-cn-hangzhou.aliyun-inc.com:9093

dh-cn-hangzhou-int-vpc.aliyuncs.com:9094

Tiongkok (Shanghai)

cn-shanghai

dh-cn-shanghai.aliyuncs.com:9092

dh-cn-shanghai.aliyun-inc.com:9093

dh-cn-shanghai-int-vpc.aliyuncs.com:9094

Tiongkok (Beijing)

cn-beijing

dh-cn-beijing.aliyuncs.com:9092

dh-cn-beijing.aliyun-inc.com:9093

dh-cn-beijing-int-vpc.aliyuncs.com:9094

Tiongkok (Shenzhen)

cn-shenzhen

dh-cn-shenzhen.aliyuncs.com:9092

dh-cn-shenzhen.aliyun-inc.com:9093

dh-cn-shenzhen-int-vpc.aliyuncs.com:9094

Tiongkok (Zhangjiakou)

cn-zhangjiakou

dh-cn-zhangjiakou.aliyuncs.com:9092

dh-cn-zhangjiakou.aliyun-inc.com:9093

dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094

Singapura

ap-southeast-1

dh-ap-southeast-1.aliyuncs.com:9092

dh-ap-southeast-1.aliyun-inc.com:9093

dh-ap-southeast-1-int-vpc.aliyuncs.com:9094

Malaysia (Kuala Lumpur)

ap-southeast-3

dh-ap-southeast-3.aliyuncs.com:9092

dh-ap-southeast-3.aliyun-inc.com:9093

dh-ap-southeast-3-int-vpc.aliyuncs.com:9094

India (Mumbai) (menutup layanan)

ap-south-1

dh-ap-south-1.aliyuncs.com:9092

dh-ap-south-1.aliyun-inc.com:9093

dh-ap-south-1-int-vpc.aliyuncs.com:9094

Jerman (Frankfurt)

eu-central-1

dh-eu-central-1.aliyuncs.com:9092

dh-eu-central-1.aliyun-inc.com:9093

dh-eu-central-1-int-vpc.aliyuncs.com:9094

Tiongkok Timur 2 Keuangan

cn-shanghai-finance-1

dh-cn-shanghai-finance-1.aliyuncs.com:9092

dh-cn-shanghai-finance-1.aliyun-inc.com:9093

dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094

Tiongkok (Hong Kong)

cn-hongkong

dh-cn-hongkong.aliyuncs.com:9092

dh-cn-hongkong.aliyun-inc.com:9093

dh-cn-hongkong-int-vpc.aliyuncs.com:9094

Contoh

Buat Topik Menggunakan Kode

Catatan: Anda tidak dapat membuat topik dengan memanggil operasi API Kafka. Untuk membuat topik, Anda harus memanggil SDK DataHub. Saat membuat topik, atur parameter ExpandMode menjadi ONLY_EXTEND. Versi dependensi Maven harus V2.19.0 atau lebih baru.

Anda harus mengonfigurasi ID AccessKey dan Rahasia AccessKey di proyek Anda. Kami sarankan Anda mengatur variabel lingkungan berikut untuk mengonfigurasi ID AccessKey dan Rahasia AccessKey di file konfigurasi:

datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
Penting

Pasangan AccessKey akun Alibaba Cloud memiliki izin pada semua operasi API. Menggunakan pasangan AccessKey untuk melakukan operasi adalah operasi berisiko tinggi. Kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin.

Kami sarankan Anda tidak menuliskan ID AccessKey dan Rahasia AccessKey secara langsung ke dalam kode proyek Anda. Jika tidak, pasangan AccessKey mungkin bocor dan keamanan semua sumber daya dalam akun Anda terganggu.

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.19.0-public</version>
</dependency>
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateTopic {
    public static void main(String[] args) {
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig(endpoint,
                                new AliyunAccount(accessId, accessKey)))
                .build();

        int shardCount = 1;
        int lifeCycle = 7;

        try {
            datahubClient.createTopic("test_project", "test_topic", shardCount, lifeCycle, RecordType.BLOB, "comment", ExpandMode.ONLY_EXTEND);
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

Buat Grup Menggunakan Kode

Versi dependensi Maven harus V2.21.6 atau lebih baru.

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.21.6-public</version>
</dependency>
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateGroup {
    public static void main(String[] args) {
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig(endpoint,
                                new AliyunAccount(accessId, accessKey)))
                .build();

        List<String> topicList = new ArrayList<>();
        topicList.add("test_project.topic1");
        topicList.add("test_project.topic2");
        topicList.add("test_project.topic3");

        try {
            // Buat grup Kafka.
            datahubClient.createKafkaGroup("test_project", "test_topic", "test comment");

            // Ikat topik yang diinginkan untuk langganan ke grup.
            datahubClient.updateTopicsForKafkaGroup("test_project", "test_topic", topicList, UpdateKafkaGroupMode.ADD);
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

Contoh produser:

Hasilkan File kafka_client_producer_jaas.conf

Buat file kafka_client_producer_jaas.conf dan simpan ke direktori. File tersebut berisi konten berikut:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

Dependensi Maven

Versi klien Kafka harus V0.10.0.0 atau lebih baru. Versi yang direkomendasikan adalah V2.4.0.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

Contoh Kode

public class ProducerExample {
    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("compression.type", "lz4");

        String KafkaTopicName = "test_project.test_topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        try {
            List<Header> headers = new ArrayList<>();
            RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
            RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
            headers.add(header1);
            headers.add(header2);

            ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);

            // pengiriman sinkron
            producer.send(record).get();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

Hasil Eksekusi

Setelah eksekusi selesai, sampel data untuk memeriksa apakah DataHub berjalan dengan baik.

Contoh konsumen

Untuk informasi tentang cara menghasilkan file kafka_client_producer_jaas.conf dan dependensi Maven, lihat informasi terkait di bagian "Contoh Produser" dari topik ini.

Setelah Anda menambahkan konsumen, tunggu sekitar 10 detik agar alokasi shard selesai. Setelah itu, konsumen dapat mengonsumsi data.

Contoh Kode

(Direkomendasikan) Contoh menggunakan grup Kafka

package com.aliyun.datahub.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample2 {

    static {
        System.setProperty("java.security.auth.login.config","src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        // Atur parameter group.id menjadi project.group.
        properties.put("group.id", "test_project.test_kafka_group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        List<String> topicList = new ArrayList<>();
        topicList.add("test_project.test_topic1");
        topicList.add("test_project.test_topic2");
        topicList.add("test_project.test_topic3");
        // Beberapa topik dapat dilanggan sekaligus jika Anda menggunakan grup Kafka.
        kafkaConsumer.subscribe(topicList);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

Contoh menggunakan project.topic.subid

package com.aliyun.datahub.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        // Atur parameter group.id menjadi project.topic.subId.
        properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        // Hanya satu topik yang dapat dilanggan jika Anda menggunakan project.topic.subId.
        kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

Hasil Eksekusi

Setelah eksekusi selesai, Anda dapat melihat data yang dibaca pada klien konsumen.

ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)

Catatan: Semua data yang dikembalikan untuk permintaan pembacaan data memiliki nilai yang sama untuk parameter LogAppendTime, yang merupakan nilai terbesar dari cap waktu data.

Tugas Streams contoh

Dependensi Maven

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.0</version>
</dependency>

Contoh Kode

Kode contoh berikut membaca data masukan di test_project, mengonversi kunci dan nilai menjadi huruf kecil, lalu menuliskannya ke data keluaran.

public class StreamExample {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(final String[] args) {
        final String input = "test_project.input";
        final String output = "test_project.output";
        final Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("application.id", "test_project.input:1611293595417QH0WL");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("auto.offset.reset", "earliest");

        final StreamsBuilder builder = new StreamsBuilder();
        TestMapper testMapper = new TestMapper();
        builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
                .map(testMapper)
                .to(output, Produced.with(Serdes.String(), Serdes.String()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

    static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {

        @Override
        public KeyValue<String, String> apply(String s, String s2) {
            return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
        }
    }
}

Hasil Eksekusi

Setelah Anda memulai tugas Streams, tunggu sekitar 1 menit agar alokasi shard selesai. Setelah itu, Anda dapat melihat jumlah tugas di konsol DataHub. Jumlah tugas sesuai dengan jumlah shard dalam topik masukan. Dalam contoh ini, topik masukan berisi tiga shard.

tugas aktif yang ditetapkan saat ini: [0_0, 0_1, 0_2]
    tugas cadangan yang ditetapkan saat ini: []
    tugas aktif yang dicabut: []
    tugas cadangan yang dicabut: []

Setelah alokasi shard selesai, Anda dapat menulis data uji berikut ke topik masukan: (AAAA,BBBB), (CCCC,DDDD), dan (EEEE,FFFF). Kemudian, sampel data keluaran untuk memeriksa apakah penulisan data valid.

Catatan penggunaan

  • Transaksi dan idempotensi tidak didukung.

  • Klien Kafka tidak dapat secara otomatis membuat topik DataHub. Sebelum menulis data dari klien Kafka ke DataHub, pastikan bahwa topik DataHub telah dibuat.

  • Setiap konsumen hanya dapat berlangganan satu topik.

  • Cap waktu data yang dibaca oleh konsumen adalah nilai parameter LogAppendTime, yang menunjukkan waktu ketika data ditulis ke DataHub. Semua data yang dikembalikan untuk permintaan pembacaan data memiliki cap waktu yang sama, yaitu nilai terbesar dari cap waktu data. Oleh karena itu, saat membaca data, cap waktu yang diperoleh mungkin lebih besar dari cap waktu aktual ketika data ditulis ke DataHub.

  • Setiap tugas Streams mendukung hanya satu topik masukan dan beberapa topik keluaran.

  • Tugas Streams bersifat tanpa status.

  • Versi Kafka yang didukung adalah dari V0.10.0 hingga V2.4.0.

FAQ

1. Mengapa koneksi terputus selama penulisan data?

Selector - [Producer clientId=producer-1] Koneksi dengan dh-cn-shenzhen.aliyuncs.com terputus
java.io.EOFException
    at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
    ...

Di Kafka, permintaan meta dan permintaan penulisan data tidak menggunakan koneksi yang sama. Saat permintaan meta dikirim untuk pertama kali, koneksi dibuat. Saat permintaan penulisan data dikirim, koneksi ke broker yang dikembalikan untuk permintaan meta dibuat. Setelah itu, semua permintaan berikutnya dikirim melalui koneksi kedua, dan koneksi pertama menjadi idle. Jika koneksi tetap idle melebihi batas waktu tertentu, server secara otomatis menutup koneksi. Oleh karena itu, Anda dapat mengabaikan kesalahan ini jika tidak memengaruhi penulisan data.

Apa yang harus saya lakukan jika klien Kafka saya gagal dimulai?

Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 100.67.134.161 found

Jika klien Kafka Anda gagal dimulai, tambahkan kode berikut: properties.put("ssl.endpoint.identification.algorithm", "");.

Mengapa kesalahan DisconnectException muncul selama konsumsi data pada klien konsumen?

[INFO][Consumer clientId=client-id, groupId=consumer-project.topic:subid] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectException

Klien Kafka harus mempertahankan koneksi persisten berbasis TCP ke server. Dalam kebanyakan kasus, kesalahan DisconnectException disebabkan oleh gangguan jaringan. Kesalahan ini tidak memengaruhi konsumsi data pada klien karena logika ulang coba dikonfigurasikan pada klien.