DataHub sepenuhnya kompatibel dengan protokol Kafka. Anda dapat menggunakan klien Kafka asli untuk membaca dan menulis data ke DataHub.
Informasi latar belakang
Pemetaan dari Kafka ke DataHub
Jenis Topik
Mode ekspansi topik di Kafka berbeda dari yang ada di DataHub. Untuk menyesuaikan dengan mode ekspansi topik di Kafka, atur parameter ExpandMode menjadi ONLY_EXTEND saat membuat topik di DataHub. Topik dengan parameter ExpandMode diatur ke ONLY_EXTEND tidak mendukung operasi pemisahan atau penggabungan. Anda dapat menambahkan shard, tetapi tidak dapat menghapusnya.
Penamaan Topik
Nama topik di Kafka memetakan nama proyek dan nama topik di DataHub, yang dipisahkan dengan titik (.). Sebagai contoh, topik bernama test_project.test_topic di Kafka memetakan topik bernama test_topic dalam proyek bernama test_project di DataHub. Jika nama topik di Kafka mengandung beberapa titik (.), bagian sebelum titik pertama (.) adalah nama proyek di DataHub, dan sisanya adalah nama topik. Titik lainnya (.) dan tanda hubung (-) diganti dengan garis bawah (_).
Partisi
Setiap shard aktif di DataHub sesuai dengan partisi di Kafka. Jika jumlah shard aktif di DataHub adalah lima, dapat dianggap bahwa Kafka memiliki lima partisi. Saat menulis data, Anda dapat menentukan partisi berdasarkan ID partisi [0,4]. Jika Anda tidak menentukan partisi, klien Kafka akan menentukan partisi tempat data ditulis.
Topik TUPLE
Saat menulis pasangan nilai-kunci dari Kafka ke topik TUPLE di DataHub, skema topik TUPLE harus berisi satu atau dua bidang bertipe STRING. Jika tidak, penulisan data gagal. Jika skema hanya berisi satu bidang, hanya nilai dari pasangan nilai-kunci yang ditulis, sementara kuncinya dibuang. Jika skema berisi dua bidang, nilainya ditulis ke satu bidang dan kuncinya ke bidang lainnya. Jika Anda menulis data biner ke topik TUPLE, data tersebut akan ditampilkan sebagai teks kacau di topik. Kami sarankan Anda menulis data biner ke topik BLOB.
Topik BLOB
Saat menulis pasangan nilai-kunci dari Kafka ke topik BLOB di DataHub, nilainya ditulis ke topik BLOB. Jika kuncinya bukan NULL, kuncinya ditulis ke DataHub sebagai atribut. Kunci atributnya adalah __kafka_key__, dan nilainya adalah kunci data Kafka.
Header
Header di Kafka sesuai dengan atribut di DataHub. Namun, header yang nilainya NULL diabaikan di Kafka. Kami sarankan Anda tidak menggunakan __kafka_key__ sebagai kunci header.
Grup Konsumen
Di DataHub, grup konsumen memetakan ID langganan dan hanya dapat berlangganan satu topik pada satu waktu. Namun, grup Kafka dapat berlangganan beberapa topik pada satu waktu. Untuk meningkatkan kompatibilitas dengan metode langganan di Kafka, DataHub memungkinkan Anda membuat grup di proyek dan mengikat grup dengan topik yang ingin Anda langgapi, lalu gunakan grup tersebut untuk berlangganan beberapa topik di proyek. Di DataHub, grup adalah sekumpulan langganan yang dikelola oleh server. Jika grup terikat dengan topik, Anda dapat melihat langganan yang dibuat otomatis oleh grup pada tab Daftar Langganan halaman detail topik. Jika Anda menghapus langganan, grup tidak dapat berlangganan topik, dan offset konsumsi yang ada menghilang.
Grup dapat berlangganan maksimal 50 topik. Jika Anda perlu berlangganan lebih banyak topik untuk sebuah grup, ajukan tiket.
Parameter Konfigurasi Kafka
C = Konsumen, P = Produsen, S = Streams
Parameter | C/P/S | Nilai valid | Diperlukan | Deskripsi |
bootstrap.servers | * | Untuk informasi lebih lanjut, lihat bagian "Titik akhir Kafka" dari topik ini. | Ya | |
security.protocol | * | SASL_SSL | Ya | Untuk memastikan transmisi data yang aman, Secure Sockets Layer (SSL) digunakan untuk enkripsi saat data ditulis dari klien Kafka ke DataHub. |
sasl.mechanism | * | PLAIN | Ya | Mode autentikasi AccessKey. Atur parameter ini ke PLAIN. |
compression.type | P | LZ4 | Tidak | Menentukan apakah akan mengaktifkan transmisi terkompresi. Hanya algoritma kompresi LZ4 yang didukung. |
group.id | C | project.topic:subId atau project.group | Ya | ID grup konsumen. Atur parameter ini berdasarkan topik yang dilanggan jika Anda menggunakan project.topic:subId. Jika tidak, data tidak dapat dibaca. Kami sarankan Anda menggunakan project.group. |
partition.assignment.strategy | C | org.apache.kafka.clients.consumer.RangeAssignor | Tidak | Kebijakan untuk penugasan partisi. Kebijakan default untuk penugasan partisi di Kafka adalah RangeAssignor, yang juga merupakan satu-satunya kebijakan yang didukung oleh DataHub. Jangan ubah parameter ini. |
session.timeout.ms | C/S | [60000, 180000] | Tidak | Periode timeout sesi. Periode timeout sesi default di Kafka adalah 10.000 milidetik. Namun, periode timeout sesi minimum di DataHub adalah 60.000 milidetik. Oleh karena itu, nilai default parameter ini adalah 60000. |
heartbeat.interval.ms | C/S | Kami sarankan Anda mengatur parameter ini menjadi dua pertiga dari periode timeout sesi yang ditentukan. | Tidak | Interval denyut jantung. Interval denyut jantung default di Kafka adalah 3.000 milidetik. Karena nilai default parameter |
application.id | S | project.topic:subId atau project.group | Ya | ID aplikasi. Atur parameter ini berdasarkan topik yang dilanggan jika Anda menggunakan project.topic:subId. Jika tidak, pembacaan data gagal. Kami sarankan Anda menggunakan project.group. |
Tabel di atas menjelaskan parameter yang perlu diperhatikan secara khusus saat menulis data dari klien Kafka ke DataHub. Parameter terkait klien seperti retries,batch.size tidak terpengaruh. Parameter terkait server tidak memengaruhi perilaku server. Sebagai contoh, apa pun nilai parameter acks, DataHub mengembalikan nilai setelah data ditulis.
Titik Akhir Kafka
Wilayah | ID Wilayah | Titik akhir publik | Titik akhir ECS pada jaringan klasik | Titik akhir ECS di VPC |
Tiongkok (Hangzhou) | cn-hangzhou | dh-cn-hangzhou.aliyuncs.com:9092 | dh-cn-hangzhou.aliyun-inc.com:9093 | dh-cn-hangzhou-int-vpc.aliyuncs.com:9094 |
Tiongkok (Shanghai) | cn-shanghai | dh-cn-shanghai.aliyuncs.com:9092 | dh-cn-shanghai.aliyun-inc.com:9093 | dh-cn-shanghai-int-vpc.aliyuncs.com:9094 |
Tiongkok (Beijing) | cn-beijing | dh-cn-beijing.aliyuncs.com:9092 | dh-cn-beijing.aliyun-inc.com:9093 | dh-cn-beijing-int-vpc.aliyuncs.com:9094 |
Tiongkok (Shenzhen) | cn-shenzhen | dh-cn-shenzhen.aliyuncs.com:9092 | dh-cn-shenzhen.aliyun-inc.com:9093 | dh-cn-shenzhen-int-vpc.aliyuncs.com:9094 |
Tiongkok (Zhangjiakou) | cn-zhangjiakou | dh-cn-zhangjiakou.aliyuncs.com:9092 | dh-cn-zhangjiakou.aliyun-inc.com:9093 | dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094 |
Singapura | ap-southeast-1 | dh-ap-southeast-1.aliyuncs.com:9092 | dh-ap-southeast-1.aliyun-inc.com:9093 | dh-ap-southeast-1-int-vpc.aliyuncs.com:9094 |
Malaysia (Kuala Lumpur) | ap-southeast-3 | dh-ap-southeast-3.aliyuncs.com:9092 | dh-ap-southeast-3.aliyun-inc.com:9093 | dh-ap-southeast-3-int-vpc.aliyuncs.com:9094 |
India (Mumbai) (menutup layanan) | ap-south-1 | dh-ap-south-1.aliyuncs.com:9092 | dh-ap-south-1.aliyun-inc.com:9093 | dh-ap-south-1-int-vpc.aliyuncs.com:9094 |
Jerman (Frankfurt) | eu-central-1 | dh-eu-central-1.aliyuncs.com:9092 | dh-eu-central-1.aliyun-inc.com:9093 | dh-eu-central-1-int-vpc.aliyuncs.com:9094 |
Tiongkok Timur 2 Keuangan | cn-shanghai-finance-1 | dh-cn-shanghai-finance-1.aliyuncs.com:9092 | dh-cn-shanghai-finance-1.aliyun-inc.com:9093 | dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094 |
Tiongkok (Hong Kong) | cn-hongkong | dh-cn-hongkong.aliyuncs.com:9092 | dh-cn-hongkong.aliyun-inc.com:9093 | dh-cn-hongkong-int-vpc.aliyuncs.com:9094 |
Contoh
Buat Topik Menggunakan Kode
Catatan: Anda tidak dapat membuat topik dengan memanggil operasi API Kafka. Untuk membuat topik, Anda harus memanggil SDK DataHub. Saat membuat topik, atur parameter ExpandMode menjadi ONLY_EXTEND. Versi dependensi Maven harus V2.19.0 atau lebih baru.
Anda harus mengonfigurasi ID AccessKey dan Rahasia AccessKey di proyek Anda. Kami sarankan Anda mengatur variabel lingkungan berikut untuk mengonfigurasi ID AccessKey dan Rahasia AccessKey di file konfigurasi:
datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>Pasangan AccessKey akun Alibaba Cloud memiliki izin pada semua operasi API. Menggunakan pasangan AccessKey untuk melakukan operasi adalah operasi berisiko tinggi. Kami sarankan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin.
Kami sarankan Anda tidak menuliskan ID AccessKey dan Rahasia AccessKey secara langsung ke dalam kode proyek Anda. Jika tidak, pasangan AccessKey mungkin bocor dan keamanan semua sumber daya dalam akun Anda terganggu.
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency>@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateTopic {
public static void main(String[] args) {
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
new AliyunAccount(accessId, accessKey)))
.build();
int shardCount = 1;
int lifeCycle = 7;
try {
datahubClient.createTopic("test_project", "test_topic", shardCount, lifeCycle, RecordType.BLOB, "comment", ExpandMode.ONLY_EXTEND);
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}Buat Grup Menggunakan Kode
Versi dependensi Maven harus V2.21.6 atau lebih baru.
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.21.6-public</version>
</dependency>@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateGroup {
public static void main(String[] args) {
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
new AliyunAccount(accessId, accessKey)))
.build();
List<String> topicList = new ArrayList<>();
topicList.add("test_project.topic1");
topicList.add("test_project.topic2");
topicList.add("test_project.topic3");
try {
// Buat grup Kafka.
datahubClient.createKafkaGroup("test_project", "test_topic", "test comment");
// Ikat topik yang diinginkan untuk langganan ke grup.
datahubClient.updateTopicsForKafkaGroup("test_project", "test_topic", topicList, UpdateKafkaGroupMode.ADD);
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}Contoh produser:
Hasilkan File kafka_client_producer_jaas.conf
Buat file kafka_client_producer_jaas.conf dan simpan ke direktori. File tersebut berisi konten berikut:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};Dependensi Maven
Versi klien Kafka harus V0.10.0.0 atau lebih baru. Versi yang direkomendasikan adalah V2.4.0.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>Contoh Kode
public class ProducerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "lz4");
String KafkaTopicName = "test_project.test_topic";
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
List<Header> headers = new ArrayList<>();
RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
headers.add(header1);
headers.add(header2);
ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);
// pengiriman sinkron
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}Hasil Eksekusi
Setelah eksekusi selesai, sampel data untuk memeriksa apakah DataHub berjalan dengan baik.
Contoh konsumen
Untuk informasi tentang cara menghasilkan file kafka_client_producer_jaas.conf dan dependensi Maven, lihat informasi terkait di bagian "Contoh Produser" dari topik ini.
Setelah Anda menambahkan konsumen, tunggu sekitar 10 detik agar alokasi shard selesai. Setelah itu, konsumen dapat mengonsumsi data.
Contoh Kode
(Direkomendasikan) Contoh menggunakan grup Kafka
package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample2 {
static {
System.setProperty("java.security.auth.login.config","src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
// Atur parameter group.id menjadi project.group.
properties.put("group.id", "test_project.test_kafka_group");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = new ArrayList<>();
topicList.add("test_project.test_topic1");
topicList.add("test_project.test_topic2");
topicList.add("test_project.test_topic3");
// Beberapa topik dapat dilanggan sekaligus jika Anda menggunakan grup Kafka.
kafkaConsumer.subscribe(topicList);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}Contoh menggunakan project.topic.subid
package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
// Atur parameter group.id menjadi project.topic.subId.
properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// Hanya satu topik yang dapat dilanggan jika Anda menggunakan project.topic.subId.
kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}Hasil Eksekusi
Setelah eksekusi selesai, Anda dapat melihat data yang dibaca pada klien konsumen.
ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)Catatan: Semua data yang dikembalikan untuk permintaan pembacaan data memiliki nilai yang sama untuk parameter LogAppendTime, yang merupakan nilai terbesar dari cap waktu data.
Tugas Streams contoh
Dependensi Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>Contoh Kode
Kode contoh berikut membaca data masukan di test_project, mengonversi kunci dan nilai menjadi huruf kecil, lalu menuliskannya ke data keluaran.
public class StreamExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(final String[] args) {
final String input = "test_project.input";
final String output = "test_project.output";
final Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("application.id", "test_project.input:1611293595417QH0WL");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("auto.offset.reset", "earliest");
final StreamsBuilder builder = new StreamsBuilder();
TestMapper testMapper = new TestMapper();
builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
.map(testMapper)
.to(output, Produced.with(Serdes.String(), Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {
@Override
public KeyValue<String, String> apply(String s, String s2) {
return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
}
}
}Hasil Eksekusi
Setelah Anda memulai tugas Streams, tunggu sekitar 1 menit agar alokasi shard selesai. Setelah itu, Anda dapat melihat jumlah tugas di konsol DataHub. Jumlah tugas sesuai dengan jumlah shard dalam topik masukan. Dalam contoh ini, topik masukan berisi tiga shard.
tugas aktif yang ditetapkan saat ini: [0_0, 0_1, 0_2]
tugas cadangan yang ditetapkan saat ini: []
tugas aktif yang dicabut: []
tugas cadangan yang dicabut: []Setelah alokasi shard selesai, Anda dapat menulis data uji berikut ke topik masukan: (AAAA,BBBB), (CCCC,DDDD), dan (EEEE,FFFF). Kemudian, sampel data keluaran untuk memeriksa apakah penulisan data valid.
Catatan penggunaan
Transaksi dan idempotensi tidak didukung.
Klien Kafka tidak dapat secara otomatis membuat topik DataHub. Sebelum menulis data dari klien Kafka ke DataHub, pastikan bahwa topik DataHub telah dibuat.
Setiap konsumen hanya dapat berlangganan satu topik.
Cap waktu data yang dibaca oleh konsumen adalah nilai parameter LogAppendTime, yang menunjukkan waktu ketika data ditulis ke DataHub. Semua data yang dikembalikan untuk permintaan pembacaan data memiliki cap waktu yang sama, yaitu nilai terbesar dari cap waktu data. Oleh karena itu, saat membaca data, cap waktu yang diperoleh mungkin lebih besar dari cap waktu aktual ketika data ditulis ke DataHub.
Setiap tugas Streams mendukung hanya satu topik masukan dan beberapa topik keluaran.
Tugas Streams bersifat tanpa status.
Versi Kafka yang didukung adalah dari V0.10.0 hingga V2.4.0.
FAQ
1. Mengapa koneksi terputus selama penulisan data?
Selector - [Producer clientId=producer-1] Koneksi dengan dh-cn-shenzhen.aliyuncs.com terputus
java.io.EOFException
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
...Di Kafka, permintaan meta dan permintaan penulisan data tidak menggunakan koneksi yang sama. Saat permintaan meta dikirim untuk pertama kali, koneksi dibuat. Saat permintaan penulisan data dikirim, koneksi ke broker yang dikembalikan untuk permintaan meta dibuat. Setelah itu, semua permintaan berikutnya dikirim melalui koneksi kedua, dan koneksi pertama menjadi idle. Jika koneksi tetap idle melebihi batas waktu tertentu, server secara otomatis menutup koneksi. Oleh karena itu, Anda dapat mengabaikan kesalahan ini jika tidak memengaruhi penulisan data.
Apa yang harus saya lakukan jika klien Kafka saya gagal dimulai?
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 100.67.134.161 foundJika klien Kafka Anda gagal dimulai, tambahkan kode berikut: properties.put("ssl.endpoint.identification.algorithm", "");.
Mengapa kesalahan DisconnectException muncul selama konsumsi data pada klien konsumen?
[INFO][Consumer clientId=client-id, groupId=consumer-project.topic:subid] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectExceptionKlien Kafka harus mempertahankan koneksi persisten berbasis TCP ke server. Dalam kebanyakan kasus, kesalahan DisconnectException disebabkan oleh gangguan jaringan. Kesalahan ini tidak memengaruhi konsumsi data pada klien karena logika ulang coba dikonfigurasikan pada klien.