全部产品
Search
文档中心

ApsaraMQ for Kafka:Kirim dan terima pesan menggunakan titik akhir default

更新时间:Nov 11, 2025

Topik ini menjelaskan cara menggunakan kit pengembangan perangkat lunak (SDK) Java untuk terhubung ke titik akhir default instans ApsaraMQ for Kafka serta mengirim dan menerima pesan dalam virtual private cloud (VPC).

Prasyarat

Instal dependensi Java

  1. Buat proyek Java di IntelliJ IDEA.

  2. Tambahkan dependensi berikut ke file pom.xml:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.6</version>
    </dependency>
    Catatan

    Kami menyarankan agar versi klien Anda konsisten dengan versi utama instans ApsaraMQ for Kafka Anda. Anda dapat melihat versi utama instans ApsaraMQ for Kafka Anda pada halaman Instance Details di konsol ApsaraMQ for Kafka.

Persiapkan file konfigurasi

  1. (Opsional) Unduh sertifikat root SSL. Jika Anda menggunakan titik akhir SSL untuk terhubung ke instans ApsaraMQ for Kafka, Anda harus menginstal sertifikat tersebut.

  2. Buka halaman aliware-kafka-demos, klik download untuk mengunduh proyek demo ke mesin lokal Anda, lalu ekstrak paket proyek demo tersebut.

  3. Di dalam proyek demo yang telah diekstrak, temukan folder kafka-java-demo dan impor folder tersebut ke IntelliJ IDEA.

  4. (Opsional) Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans ApsaraMQ for Kafka, Anda harus memodifikasi file konfigurasi kafka_client_jaas.conf. Untuk informasi mengenai perbedaan antar titik akhir, lihat Perbandingan antar titik akhir.

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    }; 

    Jika instans ApsaraMQ for Kafka Anda adalah Instansi terhubung-VPC, hanya sumber daya yang diterapkan di VPC yang sama yang dapat mengakses instans tersebut. Hal ini menjamin keamanan dan privasi transmisi data. Dalam skenario yang memerlukan keamanan tinggi, Anda dapat mengaktifkan fitur daftar kontrol akses (ACL). Setelah fitur ini diaktifkan, pesan ditransmisikan melalui saluran aman hanya setelah otentikasi identitas SASL berhasil dilakukan. Anda dapat memilih mekanisme PLAIN atau SCRAM untuk otentikasi identitas berdasarkan kebutuhan bisnis Anda terhadap perlindungan keamanan. Untuk informasi selengkapnya, lihat Aktifkan fitur ACL.

    Jika instans ApsaraMQ for Kafka Anda adalah instans yang terhubung ke Internet dan VPC, pesan harus diautentikasi dan dienkripsi saat ditransmisikan melalui Internet. Mekanisme PLAIN dari SASL harus digunakan bersama dengan SSL untuk memastikan bahwa pesan tidak ditransmisikan dalam teks biasa tanpa enkripsi.

    Dalam topik ini, nilai parameter username dan password adalah nama pengguna dan kata sandi SASL dari instans tersebut.

    • Jika Anda mengaktifkan akses Internet tetapi tidak mengaktifkan ACL untuk instans tersebut, Anda dapat memperoleh nama pengguna dan kata sandi pengguna default di bagian Configuration Information pada halaman Instance Details di ApsaraMQ for Kafka console.

    • Jika Anda mengaktifkan ACL untuk instans tersebut, pastikan pengguna SASL yang Anda gunakan bertipe PLAIN dan telah diberikan izin yang diperlukan untuk mengirim dan menerima pesan. Untuk informasi selengkapnya, lihat Berikan izin kepada pengguna SASL.

  5. Modifikasi file konfigurasi kafka.properties.

    ##==============================Parameter umum==============================
    bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
    topic=xxx
    group.id=xxx
    ##=======================Konfigurasikan parameter berikut sesuai nilai aktualnya.========================
    ## Titik akhir SSL.
    ssl.truststore.location=/xxxx/only.4096.client.truststore.jks
    ##ssl.truststore.password harus KafkaOnsClient, tidak dapat diubah
    ssl.truststore.password=KafkaOnsClient
    ##Verifikasi hostname, biarkan kosong, tidak perlu dimodifikasi
    ssl.endpoint.identification.algorithm=
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf
    ## Mekanisme PLAIN dari titik akhir SASL.
    java.security.auth.login.config.plain=/xxxx/kafka_client_jaas_plain.conf
    ## Mekanisme SCRAM dari titik akhir SASL.
    java.security.auth.login.config.scram=/xxxx/kafka_client_jaas_scram.conf

    Parameter

    Deskripsi

    bootstrap.servers

    Informasi titik akhir. Anda dapat memperoleh titik akhir tersebut di bagian Endpoint Information pada halaman Instance Details di ApsaraMQ for Kafka console.

    topic

    Nama topik pada instans. Anda dapat memperoleh nama topik tersebut pada halaman Topics di ApsaraMQ for Kafka console.

    group.id

    ID kelompok pada instans. Anda dapat memperoleh ID kelompok tersebut pada halaman Groups di ApsaraMQ for Kafka console.

    Catatan

    Jika klien menjalankan producer.go untuk mengirim pesan, parameter ini opsional. Jika klien menjalankan consumer.go untuk Berlangganan pesan, parameter ini wajib diisi.

    ssl.truststore.location

    Jalur tempat sertifikat root SSL disimpan. Anda harus menyimpan file sertifikat SSL yang diunduh pada bagian "Persiapkan file konfigurasi" ke jalur lokal, lalu ganti xxxx dalam contoh kode dengan jalur lokal tersebut. Contoh: /home/ssl/only.4096.client.truststore.jks.

    Penting

    Jika Anda menggunakan titik akhir default atau titik akhir SASL untuk mengakses instans, parameter ini tidak diperlukan. Jika Anda menggunakan titik akhir SSL untuk mengakses instans, parameter ini wajib diisi.

    ssl.truststore.password

    Kata sandi sertifikat server, tetap sebagai: KafkaOnsClient. Tidak dapat diubah.

    ssl.endpoint.identification.algorithm

    Verifikasi Hostname, diatur menjadi kosong. Tidak dapat diubah.

    java.security.auth.login.config

    Jalur tempat file konfigurasi JAAS disimpan. Anda harus menyimpan file kafka_client_jaas.conf dalam proyek demo ke jalur lokal, lalu ganti xxxx dalam contoh kode dengan jalur lokal tersebut. Contoh: /home/ssl/kafka_client_jaas.conf.

    Penting

    Jika Anda menggunakan titik akhir default untuk mengakses instans, parameter ini tidak diperlukan. Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans, parameter ini wajib diisi.

Kirim pesan

Kode contoh berikut menunjukkan cara mengompilasi dan menjalankan KafkaProducerDemo.java untuk mengirim pesan:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
// Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans, beri komentar pada baris pertama kode berikut: 
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
* Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut: 
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/

public class KafkaProducerDemo {

    public static void main(String args[]) {
          
       /*
        * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut. 
        Tentukan jalur file konfigurasi JAAS. 
        JavaKafkaConfigurer.configureSasl();
        */
         
       /*
        * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut. 
        Tentukan jalur file konfigurasi JAAS. 
        JavaKafkaConfigurer.configureSaslPlain();
        */
       
       /*
        * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut. 
        Tentukan jalur file konfigurasi JAAS. 
        JavaKafkaConfigurer.configureSaslScram();
        */

        // Muat file kafka.properties. 
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Tentukan titik akhir. Anda dapat memperoleh titik akhir yang digunakan untuk mengakses topik pada halaman Instance Details di konsol ApsaraMQ for Kafka. 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
         
       /*
        * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar empat baris kode berikut. 
        * Jangan kompres file ke dalam paket JAR. 
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        * Kata sandi truststore dalam sertifikat root. Gunakan nilai default. 
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        * Protokol akses. Atur parameter ini menjadi SASL_SSL. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        * Metode autentikasi SASL. Gunakan nilai default. 
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        */

       /*
        * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut. 
        * Protokol akses. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        * Mekanisme PLAIN. 
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        */

       /*
        * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut. 
        * Protokol akses. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        * Mekanisme SCRAM. 
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        */

        // Metode yang digunakan untuk membuat serial pesan di ApsaraMQ for Kafka. 
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // Waktu tunggu maksimum untuk permintaan. 
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // Jumlah maksimum percobaan ulang pesan di sisi klien. 
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        // Interval antara dua percobaan ulang berturut-turut untuk pesan di klien. 
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
         
       /*
        * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut. 
        * Atur algoritma verifikasi hostname menjadi nilai kosong. 
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        */

        // Buat objek produsen yang aman untuk thread. Buat satu objek produsen untuk satu proses. 
        // Untuk meningkatkan kinerja, Anda dapat membuat beberapa objek produsen. Kami menyarankan agar Anda membuat maksimal lima objek produsen. 
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // Buat pesan ApsaraMQ for Kafka. 
        String topic = kafkaProperties.getProperty("topic"); // Topik tempat pesan tersebut berada. Masukkan topik yang Anda buat di konsol ApsaraMQ for Kafka. 
        String value = "this is the message's value"; // Isi pesan. 

        try {
            // Peroleh beberapa objek future sekaligus. Hal ini membantu meningkatkan efisiensi. Namun, jangan peroleh terlalu banyak objek future sekaligus. 
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i =0; i < 100; i++) {
                // Kirim pesan dan peroleh objek future. 
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);

            }
            producer.flush();
            for (Future<RecordMetadata> future: futures) {
                // Peroleh hasil objek future secara sinkron. 
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            // Jika pesan tetap gagal dikirim setelah mencapai jumlah maksimum percobaan ulang, lakukan pemecahan masalah. 
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
}

Berlangganan pesan

Anda dapat berlangganan pesan dengan salah satu metode berikut.

Gunakan satu konsumen untuk Berlangganan pesan

Kode contoh berikut menunjukkan cara mengompilasi dan menjalankan KafkaConsumerDemo.java untuk berlangganan pesan:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar tiga baris kode berikut. Jika Anda menggunakan titik akhir SASL untuk mengakses instans, hapus komentar dua baris pertama dari kode berikut: 
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/

public class KafkaConsumerDemo {

    public static void main(String args[]) {

        // Tentukan jalur file konfigurasi JAAS. 
        /*
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut: 
        JavaKafkaConfigurer.configureSasl();
         */
                        
        /*
         * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut: 
        JavaKafkaConfigurer.configureSaslPlain();
         */
                        
        /*
        * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut: 
        JavaKafkaConfigurer.configureSaslScram();
        */

        // Muat file kafka.properties.
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Tentukan titik akhir. Anda dapat memperoleh titik akhir yang digunakan untuk mengakses topik pada halaman Instance Details di konsol ApsaraMQ for Kafka. 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        // Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris pertama kode berikut. 
        // Periode waktu habis sesi. Jika konsumen tidak mengembalikan heartbeat sebelum sesi habis, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. Nilai default adalah 30 detik. 
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar enam baris kode berikut. 
         * Tentukan jalur tempat sertifikat root SSL disimpan. Ganti XXX dengan jalur aktual. 
         * Jangan kompres file sertifikat ke dalam paket JAR. 
         props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
         * Kata sandi truststore dalam penyimpanan sertifikat root. Gunakan nilai default. 
         props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
         * Protokol akses. Atur parameter ini menjadi SASL_SSL. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
         * Metode autentikasi SASL. Gunakan nilai default. 
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         * Interval maksimum antara dua siklus polling berturut-turut. 
         * Interval default adalah 30 detik. Jika konsumen tidak mengembalikan heartbeat dalam interval tersebut, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. 
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         * Tentukan ukuran pesan maksimum yang diizinkan untuk satu operasi tarik. Jika data ditransmisikan melalui Internet, parameter ini dapat sangat memengaruhi kinerja. 
         props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
         props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
         */

        // Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, beri komentar pada baris kode berikut. 
       // Periode waktu habis sesi. Jika konsumen tidak mengembalikan heartbeat sebelum sesi habis, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. Nilai default adalah 30 detik. 
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar tiga baris kode berikut. 
         * Protokol akses. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * Mekanisme PLAIN. 
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         * Interval maksimum antara dua siklus polling berturut-turut. 
         * Interval default adalah 30 detik. Jika konsumen tidak mengembalikan heartbeat dalam interval tersebut, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. 
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         */

        // Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, beri komentar pada baris kode berikut. 
       // Periode waktu habis sesi. Jika konsumen tidak mengembalikan heartbeat sebelum sesi habis, broker menentukan bahwa konsumen tidak aktif. Kemudian, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. Nilai default adalah 30 detik.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar empat baris kode berikut. 
         * Protokol akses. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * Mekanisme SCRAM. 
         props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         * Interval maksimum antara dua siklus polling berturut-turut. 
         * Interval default adalah 30 detik. Jika konsumen tidak mengembalikan heartbeat dalam interval tersebut, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. 
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         */

        // Jumlah maksimum pesan yang dapat dipolling sekaligus. 
        // Jangan atur parameter ini ke nilai yang terlalu besar. Jika pesan yang dipolling tidak semuanya dikonsumsi sebelum polling berikutnya dimulai, load balancing dipicu dan kinerja dapat menurun. 
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        // Metode yang digunakan untuk mendeserialisasi pesan.
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // Kelompok konsumen tempat instans konsumen saat ini berada. Masukkan kelompok konsumen yang Anda buat di konsol ApsaraMQ for Kafka. 
        // Instans konsumen yang berada dalam kelompok konsumen yang sama. Instans-instans ini mengonsumsi pesan dalam mode load balancing. 
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
        
        // Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut. 
        // Atur algoritma verifikasi hostname menjadi nilai kosong. 
        //props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // Buat objek pesan, yaitu instans konsumen. 
        KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
        // Tentukan satu atau beberapa topik tempat kelompok konsumen Berlangganan. 
        // Kami menyarankan agar Anda mengonfigurasi konsumen dengan nilai GROUP_ID_CONFIG yang sama untuk Berlangganan topik yang sama. 
        List<String> subscribedTopics =  new ArrayList<String>();
        
        // Jika Anda menggunakan titik akhir SSL untuk mengakses instans, beri komentar lima baris pertama dan hapus komentar baris keenam dari kode berikut. 
        // Jika Anda ingin Berlangganan beberapa topik, tambahkan di sini. 
        // Anda harus membuat topik tersebut di konsol ApsaraMQ for Kafka terlebih dahulu. 
        String topicStr = kafkaProperties.getProperty("topic");
        String[] topics = topicStr.split(",");
        for (String topic: topics) {
            subscribedTopics.add(topic.trim());
        }
        //subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);

        // Konsumsi pesan dalam loop. 
        while (true){
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // Semua pesan harus dikonsumsi sebelum siklus polling berikutnya dimulai. Durasi total tidak boleh melebihi interval waktu habis yang ditentukan oleh SESSION_TIMEOUT_MS_CONFIG. 
                // Kami menyarankan agar Anda membuat kolam thread terpisah untuk mengonsumsi pesan dan mengembalikan hasil secara asinkron. 
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
          
                e.printStackTrace();
            }
        }
    }
}

Gunakan beberapa konsumen untuk Berlangganan pesan

Kode contoh berikut menunjukkan cara mengompilasi dan menjalankan KafkaMultiConsumerDemo.java untuk berlangganan pesan:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
// Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans, hapus komentar baris pertama kode berikut: 

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar tiga baris pertama kode berikut. Jika Anda menggunakan titik akhir SASL untuk mengakses instans, hapus komentar dua baris pertama kode berikut: 
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
import org.apache.kafka.common.errors.WakeupException;

/**
 * Tutorial ini menunjukkan cara menggunakan beberapa konsumen untuk mengonsumsi pesan secara bersamaan dalam satu proses. 
 * Pastikan bahwa jumlah total konsumen di lingkungan tidak melebihi jumlah partisi dari topik yang Berlangganan oleh konsumen tersebut. 
 */
public class KafkaMultiConsumerDemo {

    public static void main(String args[]) throws InterruptedException {
        
        // Tentukan jalur file konfigurasi JAAS. 
        /* 
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut: 
         JavaKafkaConfigurer.configureSasl();
         */
                            
        /* 
         * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut: 
         JavaKafkaConfigurer.configureSaslPlain(); 
         */
                            
        /* 
         * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut: 
         JavaKafkaConfigurer.configureSaslScram();
         */


        // Muat file kafka.properties. 
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Tentukan titik akhir. Anda dapat memperoleh titik akhir yang digunakan untuk mengakses topik pada halaman Instance Details di konsol ApsaraMQ for Kafka. 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        
        /*
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar empat baris kode berikut. 
         * Jangan kompres file sertifikat ke dalam paket JAR. 
         props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
         * Kata sandi truststore dalam penyimpanan sertifikat root. Gunakan nilai default. 
         props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
         * Protokol akses. Atur parameter ini menjadi SASL_SSL. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
         * Metode autentikasi SASL. Gunakan nilai default. 
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         */
        
        /*
         * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut. 
         * Protokol akses. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * Mekanisme PLAIN. 
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         */

        /* 
         * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut. 
         * Protokol akses. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * Mekanisme SCRAM. 
         props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         */

        // Interval maksimum antara dua siklus polling berturut-turut. 
        // Interval default adalah 30 detik. Jika konsumen tidak mengembalikan heartbeat dalam interval tersebut, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. 
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        // Jumlah maksimum pesan yang dapat dipolling sekaligus. 
        // Jangan atur parameter ini ke nilai yang terlalu besar. Jika semua pesan yang dipolling tidak dikonsumsi sebelum siklus polling berikutnya dimulai, load balancing dipicu dan kinerja dapat menurun. 
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        // Metode yang digunakan untuk mendeserialisasi pesan. 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // Kelompok konsumen tempat instans konsumen saat ini berada. Masukkan kelompok konsumen yang Anda buat di konsol ApsaraMQ for Kafka. 
        // Instans konsumen yang berada dalam kelompok konsumen yang sama. Instans-instans ini mengonsumsi pesan dalam mode load balancing. 
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        /* 
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut: 
         * Buat objek konsumen, yaitu instans konsumen. 
         * Atur algoritma verifikasi hostname menjadi nilai kosong. 
         props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
         */

        int consumerNum = 2;
        Thread[] consumerThreads = new Thread[consumerNum];
        for (int i = 0; i < consumerNum; i++) {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

            List<String> subscribedTopics = new ArrayList<String>();
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);

            KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
            consumerThreads[i] = new Thread(kafkaConsumerRunner);
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].start();
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].join();
        }
    }

    static class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {
            try {
                while (!closed.get()) {
                    try {
                        ConsumerRecords<String, String> records = consumer.poll(1000);
                        // Semua pesan harus dikonsumsi sebelum siklus polling berikutnya dimulai. Durasi total tidak boleh melebihi interval yang ditentukan oleh SESSION_TIMEOUT_MS_CONFIG. 
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.println(String.format("Thread:%s Consume partition:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
                        }
                    } catch (Exception e) {
                        try {
                            Thread.sleep(1000);
                        } catch (Throwable ignore) {

                        }
                        e.printStackTrace();
                    }
                }
            } catch (WakeupException e) {
                // Jika konsumen dimatikan, abaikan pengecualian tersebut. 
                if (!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }
        // Implementasikan hook shutdown yang dapat dipanggil oleh thread lain. 
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}