全部产品
Search
文档中心

ApsaraMQ for Kafka:Gunakan titik akhir instans untuk mengirim dan menerima pesan

更新时间:Nov 11, 2025

Anda dapat menghubungkan aplikasi ke instans ApsaraMQ for Kafka untuk mengirim dan menerima pesan melalui titik akhir instans tersebut. ApsaraMQ for Kafka menyediakan titik akhir default, titik akhir Security Sockets Layer (SSL), dan titik akhir Simple Authentication and Secure Layer (SASL) guna memenuhi berbagai kebutuhan koneksi dan keamanan Anda.

Prasyarat

  • JDK 1.8 atau versi yang lebih baru telah diinstal. Untuk informasi selengkapnya, lihat Java Downloads.

  • Maven 2.5 atau versi yang lebih baru telah diinstal. Untuk informasi selengkapnya, lihat Downloading Apache Maven.

  • Kompilator telah diinstal.

    Dalam topik ini, IntelliJ IDEA Ultimate digunakan.

  • Instans ApsaraMQ for Kafka telah dibeli dan dideploy.

    • Instansi terhubung-VPC: Secara default, hanya titik akhir default yang ditampilkan. Instans jenis ini hanya dapat diakses dari dalam VPC.

    • Instansi terhubung Internet dan VPC: Secara default, titik akhir default dan titik akhir SSL ditampilkan. Instans jenis ini dapat diakses melalui Internet maupun dari dalam VPC.

    Catatan

Instal dependensi Java

Kode contoh berikut menunjukkan dependensi yang diperlukan saat menggunakan SDK untuk Java guna terhubung ke instans ApsaraMQ for Kafka. Dependensi tersebut telah didefinisikan dalam file pom.xml pada folder kafka-java-demo, sehingga Anda tidak perlu menginstalnya secara manual.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>
Catatan

Kami menyarankan agar versi klien Anda konsisten dengan versi utama instans ApsaraMQ for Kafka Anda. Anda dapat melihat versi utama instans ApsaraMQ for Kafka Anda di halaman Instance Details di konsol ApsaraMQ for Kafka.

Persiapkan file konfigurasi

  1. (Opsional) Unduh sertifikat root SSL. Jika Anda menggunakan titik akhir SSL untuk terhubung ke instans ApsaraMQ for Kafka, Anda harus menginstal sertifikat tersebut.

  2. Buka halaman aliware-kafka-demos, klik download untuk mengunduh proyek demo ke mesin lokal Anda, lalu ekstrak paket proyek tersebut.

  3. Di dalam proyek demo yang telah diekstrak, temukan folder kafka-java-demo dan impor folder tersebut ke IntelliJ IDEA.

  4. (Opsional) Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans ApsaraMQ for Kafka, Anda harus memodifikasi file konfigurasi kafka_client_jaas.conf. Untuk informasi mengenai perbedaan antar titik akhir, lihat Perbandingan antar titik akhir.

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    }; 

    Jika instans ApsaraMQ for Kafka Anda merupakan instansi terhubung-VPC, hanya sumber daya yang dideploy di VPC yang sama yang dapat mengakses instans tersebut. Hal ini menjamin keamanan dan privasi transmisi data. Dalam skenario yang memerlukan keamanan tinggi, Anda dapat mengaktifkan fitur daftar kontrol akses (ACL). Setelah diaktifkan, pesan hanya akan ditransmisikan melalui saluran aman setelah otentikasi identitas SASL berhasil dilakukan. Anda dapat memilih mekanisme PLAIN atau SCRAM untuk otentikasi identitas berdasarkan kebutuhan bisnis Anda terhadap perlindungan keamanan. Untuk informasi selengkapnya, lihat Aktifkan fitur ACL.

    Jika instans ApsaraMQ for Kafka Anda merupakan instansi terhubung Internet dan VPC, pesan harus diautentikasi dan dienkripsi saat ditransmisikan melalui Internet. Mekanisme PLAIN dari SASL harus digunakan bersamaan dengan SSL untuk memastikan bahwa pesan tidak ditransmisikan dalam teks biasa tanpa enkripsi.

    Dalam topik ini, nilai parameter username dan password adalah nama pengguna dan kata sandi SASL dari instans tersebut.

    • Jika Anda mengaktifkan akses Internet tetapi tidak mengaktifkan ACL untuk instans tersebut, Anda dapat memperoleh nama pengguna dan kata sandi pengguna default di bagian Configuration Information pada halaman Instance Details di ApsaraMQ for Kafka console.

    • Jika Anda mengaktifkan ACL untuk instans tersebut, pastikan pengguna SASL yang Anda gunakan bertipe PLAIN dan telah diberikan izin yang diperlukan untuk mengirim dan menerima pesan. Untuk informasi selengkapnya, lihat Berikan izin kepada pengguna SASL.

  5. Modifikasi file konfigurasi kafka.properties.

    ##==============================Parameter umum==============================
    bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
    topic=xxx
    group.id=xxx
    ##=======================Konfigurasikan parameter berikut sesuai nilai aktualnya.========================
    ## Titik akhir SSL.
    ssl.truststore.location=/xxxx/only.4096.client.truststore.jks
    ##ssl.truststore.password harus KafkaOnsClient, tidak dapat diubah
    ssl.truststore.password=KafkaOnsClient
    ##Verifikasi hostname, biarkan kosong, tidak perlu diubah
    ssl.endpoint.identification.algorithm=
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf
    ## Mekanisme PLAIN dari titik akhir SASL.
    java.security.auth.login.config.plain=/xxxx/kafka_client_jaas_plain.conf
    ## Mekanisme SCRAM dari titik akhir SASL.
    java.security.auth.login.config.scram=/xxxx/kafka_client_jaas_scram.conf

    Parameter

    Deskripsi

    bootstrap.servers

    Informasi titik akhir. Anda dapat memperoleh titik akhir tersebut di bagian Endpoint Information pada halaman Instance Details di ApsaraMQ for Kafka console.

    topic

    Nama topik pada instans. Anda dapat memperoleh nama topik tersebut di halaman Topics di ApsaraMQ for Kafka console.

    group.id

    ID kelompok pada instans. Anda dapat memperoleh ID kelompok tersebut di halaman Groups di ApsaraMQ for Kafka console.

    Catatan

    Jika klien menjalankan producer.go untuk mengirim pesan, parameter ini bersifat opsional. Jika klien menjalankan consumer.go untuk berlangganan pesan, parameter ini wajib diisi.

    ssl.truststore.location

    Jalur tempat penyimpanan sertifikat root SSL. Anda harus menyimpan file sertifikat SSL yang telah diunduh pada bagian "Persiapkan file konfigurasi" ke jalur lokal, lalu ganti xxxx dalam kode contoh dengan jalur lokal tersebut. Contoh: /home/ssl/only.4096.client.truststore.jks.

    Penting

    Jika Anda menggunakan titik akhir default atau titik akhir SASL untuk mengakses instans, parameter ini tidak diperlukan. Jika Anda menggunakan titik akhir SSL untuk mengakses instans, parameter ini wajib diisi.

    ssl.truststore.password

    Kata sandi sertifikat server, tetap sebagai: KafkaOnsClient. Tidak dapat diubah.

    ssl.endpoint.identification.algorithm

    Verifikasi Hostname, diatur menjadi kosong. Tidak dapat diubah.

    java.security.auth.login.config

    Jalur tempat penyimpanan file konfigurasi JAAS. Anda harus menyimpan file kafka_client_jaas.conf dalam proyek demo ke jalur lokal, lalu ganti xxxx dalam kode contoh dengan jalur lokal tersebut. Contoh: /home/ssl/kafka_client_jaas.conf.

    Penting

    Jika Anda menggunakan titik akhir default untuk mengakses instans, parameter ini tidak diperlukan. Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans, parameter ini wajib diisi.

Kirim pesan

Kode contoh berikut menunjukkan cara mengompilasi dan menjalankan KafkaProducerDemo.java untuk mengirim pesan:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
// Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans, komentari baris pertama kode berikut: 
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
* Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut: 
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/

public class KafkaProducerDemo {

    public static void main(String args[]) {
          
       /*
        * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut. 
        Tentukan jalur file konfigurasi JAAS. 
        JavaKafkaConfigurer.configureSasl();
        */
         
       /*
        * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut. 
        Tentukan jalur file konfigurasi JAAS. 
        JavaKafkaConfigurer.configureSaslPlain();
        */
       
       /*
        * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut. 
        Tentukan jalur file konfigurasi JAAS. 
        JavaKafkaConfigurer.configureSaslScram();
        */

        // Muat file kafka.properties. 
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Tentukan titik akhir. Anda dapat memperoleh titik akhir yang digunakan untuk mengakses topik di halaman Instance Details di konsol ApsaraMQ for Kafka. 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
         
       /*
        * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar empat baris kode berikut. 
        * Jangan kompres file ke dalam paket JAR. 
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        * Kata sandi truststore dalam sertifikat root. Gunakan nilai default. 
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        * Protokol akses. Atur parameter ini menjadi SASL_SSL. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        * Metode autentikasi SASL. Gunakan nilai default. 
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        */

       /*
        * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut. 
        * Protokol akses. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        * Mekanisme PLAIN. 
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        */

       /*
        * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut. 
        * Protokol akses. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        * Mekanisme SCRAM. 
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        */

        // Metode yang digunakan untuk serialisasi pesan di ApsaraMQ for Kafka. 
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // Waktu tunggu maksimum untuk permintaan. 
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // Jumlah maksimum percobaan ulang pesan di sisi klien. 
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        // Interval antara dua percobaan ulang berturut-turut untuk pesan di sisi klien. 
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
         
       /*
        * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut. 
        * Atur algoritma verifikasi hostname menjadi nilai kosong. 
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        */

        // Buat objek producer yang aman untuk thread. Buat satu objek producer untuk satu proses. 
        // Untuk meningkatkan kinerja, Anda dapat membuat beberapa objek producer. Kami menyarankan agar Anda membuat maksimal lima objek producer. 
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // Buat pesan ApsaraMQ for Kafka. 
        String topic = kafkaProperties.getProperty("topic"); // Topik tempat pesan tersebut berada. Masukkan topik yang telah Anda buat di konsol ApsaraMQ for Kafka. 
        String value = "this is the message's value"; // Isi pesan. 

        try {
            // Peroleh beberapa objek future sekaligus. Hal ini membantu meningkatkan efisiensi. Namun, jangan peroleh terlalu banyak objek future sekaligus. 
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i =0; i < 100; i++) {
                // Kirim pesan dan peroleh objek future. 
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);

            }
            producer.flush();
            for (Future<RecordMetadata> future: futures) {
                // Peroleh hasil objek future secara sinkron. 
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            // Jika pesan tetap gagal dikirim setelah mencapai jumlah maksimum percobaan ulang, lakukan pemecahan masalah. 
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
}

Berlangganan pesan

Anda dapat berlangganan pesan dengan salah satu metode berikut.

Gunakan satu konsumen untuk berlangganan pesan

Kode contoh berikut menunjukkan cara mengompilasi dan menjalankan KafkaConsumerDemo.java untuk berlangganan pesan:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar tiga baris kode berikut. Jika Anda menggunakan titik akhir SASL untuk mengakses instans, hapus komentar dua baris pertama kode berikut: 
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/

public class KafkaConsumerDemo {

    public static void main(String args[]) {

        // Tentukan jalur file konfigurasi JAAS. 
        /*
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut: 
        JavaKafkaConfigurer.configureSasl();
         */
                        
        /*
         * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut: 
        JavaKafkaConfigurer.configureSaslPlain();
         */
                        
        /*
        * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut: 
        JavaKafkaConfigurer.configureSaslScram();
        */

        // Muat file kafka.properties.
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Tentukan titik akhir. Anda dapat memperoleh titik akhir yang digunakan untuk mengakses topik di halaman Instance Details di konsol ApsaraMQ for Kafka. 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        // Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris pertama kode berikut. 
        // Periode timeout sesi. Jika konsumen tidak mengembalikan heartbeat sebelum sesi habis waktu, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. Nilai default adalah 30 detik. 
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar enam baris kode berikut. 
         * Tentukan jalur tempat penyimpanan sertifikat root SSL. Ganti XXX dengan jalur aktual. 
         * Jangan kompres file sertifikat ke dalam paket JAR. 
         props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
         * Kata sandi truststore dalam penyimpanan sertifikat root. Gunakan nilai default. 
         props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
         * Protokol akses. Atur parameter ini menjadi SASL_SSL. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
         * Metode autentikasi SASL. Gunakan nilai default. 
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         * Interval maksimum antara dua siklus polling berturut-turut. 
         * Interval default adalah 30 detik. Jika konsumen tidak mengembalikan heartbeat dalam interval tersebut, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. 
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         * Tentukan ukuran pesan maksimum yang diizinkan untuk satu operasi tarik. Jika data ditransmisikan melalui Internet, parameter ini dapat sangat memengaruhi kinerja. 
         props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
         props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
         */

        // Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, komentari baris kode berikut. 
       // Periode timeout sesi. Jika konsumen tidak mengembalikan heartbeat sebelum sesi habis waktu, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. Nilai default adalah 30 detik. 
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar tiga baris kode berikut. 
         * Protokol akses. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * Mekanisme PLAIN. 
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         * Interval maksimum antara dua siklus polling berturut-turut. 
         * Interval default adalah 30 detik. Jika konsumen tidak mengembalikan heartbeat dalam interval tersebut, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. 
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         */

        // Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, komentari baris kode berikut. 
       // Periode timeout sesi. Jika konsumen tidak mengembalikan heartbeat sebelum sesi habis waktu, broker menentukan bahwa konsumen tidak aktif. Kemudian, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. Nilai default adalah 30 detik.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar empat baris kode berikut. 
         * Protokol akses. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * Mekanisme SCRAM. 
         props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         * Interval maksimum antara dua siklus polling berturut-turut. 
         * Interval default adalah 30 detik. Jika konsumen tidak mengembalikan heartbeat dalam interval tersebut, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. 
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         */

        // Jumlah maksimum pesan yang dapat dipolling sekaligus. 
        // Jangan atur parameter ini ke nilai yang terlalu besar. Jika pesan yang dipolling tidak semuanya dikonsumsi sebelum polling berikutnya dimulai, load balancing dipicu dan kinerja dapat menurun. 
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        // Metode yang digunakan untuk deserialisasi pesan.
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // Kelompok konsumen tempat instans konsumen saat ini berada. Masukkan kelompok konsumen yang telah Anda buat di konsol ApsaraMQ for Kafka. 
        // Instans konsumen yang berada dalam kelompok konsumen yang sama. Instans-instans ini mengonsumsi pesan dalam mode load balancing. 
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
        
        // Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut. 
        // Atur algoritma verifikasi hostname menjadi nilai kosong. 
        //props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // Buat objek pesan, yaitu instans konsumen. 
        KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
        // Tentukan satu atau beberapa topik tempat kelompok konsumen berlangganan. 
        // Kami menyarankan agar Anda mengonfigurasi konsumen dengan nilai GROUP_ID_CONFIG yang sama untuk berlangganan topik yang sama. 
        List<String> subscribedTopics =  new ArrayList<String>();
        
        // Jika Anda menggunakan titik akhir SSL untuk mengakses instans, komentari lima baris pertama dan hapus komentar baris keenam kode berikut. 
        // Jika Anda ingin berlangganan beberapa topik, tambahkan di sini. 
        // Anda harus membuat topik tersebut di konsol ApsaraMQ for Kafka terlebih dahulu. 
        String topicStr = kafkaProperties.getProperty("topic");
        String[] topics = topicStr.split(",");
        for (String topic: topics) {
            subscribedTopics.add(topic.trim());
        }
        //subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);

        // Konsumsi pesan dalam loop. 
        while (true){
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // Semua pesan harus dikonsumsi sebelum siklus polling berikutnya dimulai. Durasi total tidak boleh melebihi interval timeout yang ditentukan oleh SESSION_TIMEOUT_MS_CONFIG. 
                // Kami menyarankan agar Anda membuat kolam thread terpisah untuk mengonsumsi pesan dan mengembalikan hasilnya secara asinkron. 
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
          
                e.printStackTrace();
            }
        }
    }
}

Gunakan beberapa konsumen untuk berlangganan pesan

Kode contoh berikut menunjukkan cara mengompilasi dan menjalankan KafkaMultiConsumerDemo.java untuk berlangganan pesan:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
// Jika Anda menggunakan titik akhir SSL atau titik akhir SASL untuk mengakses instans, hapus komentar baris pertama kode berikut: 

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar tiga baris pertama kode berikut. Jika Anda menggunakan titik akhir SASL untuk mengakses instans, hapus komentar dua baris pertama kode berikut: 
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
import org.apache.kafka.common.errors.WakeupException;

/**
 * Tutorial ini menunjukkan cara menggunakan beberapa konsumen untuk mengonsumsi pesan secara simultan dalam satu proses. 
 * Pastikan bahwa jumlah total konsumen di lingkungan tidak melebihi jumlah partisi dari topik yang berlangganan oleh konsumen tersebut. 
 */
public class KafkaMultiConsumerDemo {

    public static void main(String args[]) throws InterruptedException {
        
        // Tentukan jalur file konfigurasi JAAS. 
        /* 
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut: 
         JavaKafkaConfigurer.configureSasl();
         */
                            
        /* 
         * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut: 
         JavaKafkaConfigurer.configureSaslPlain(); 
         */
                            
        /* 
         * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar baris kode berikut: 
         JavaKafkaConfigurer.configureSaslScram();
         */


        // Muat file kafka.properties. 
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Tentukan titik akhir. Anda dapat memperoleh titik akhir yang digunakan untuk mengakses topik di halaman Instance Details di konsol ApsaraMQ for Kafka. 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        
        /*
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar empat baris kode berikut. 
         * Jangan kompres file sertifikat ke dalam paket JAR. 
         props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
         * Kata sandi truststore dalam penyimpanan sertifikat root. Gunakan nilai default. 
         props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
         * Protokol akses. Atur parameter ini menjadi SASL_SSL. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
         * Metode autentikasi SASL. Gunakan nilai default. 
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         */
        
        /*
         * Jika Anda menggunakan mekanisme PLAIN dari titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut. 
         * Protokol akses. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * Mekanisme PLAIN. 
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         */

        /* 
         * Jika Anda menggunakan mekanisme SCRAM dari titik akhir SASL untuk mengakses instans, hapus komentar dua baris kode berikut. 
         * Protokol akses. 
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * Mekanisme SCRAM. 
         props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         */

        // Interval maksimum antara dua siklus polling berturut-turut. 
        // Interval default adalah 30 detik. Jika konsumen tidak mengembalikan heartbeat dalam interval tersebut, broker menentukan bahwa konsumen tidak aktif. Dalam kasus ini, broker menghapus konsumen dari kelompok konsumen dan memicu rebalance. 
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        // Jumlah maksimum pesan yang dapat dipolling sekaligus. 
        // Jangan atur parameter ini ke nilai yang terlalu besar. Jika semua pesan yang dipolling tidak dikonsumsi sebelum siklus polling berikutnya dimulai, load balancing dipicu dan kinerja dapat menurun. 
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        // Metode yang digunakan untuk deserialisasi pesan. 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // Kelompok konsumen tempat instans konsumen saat ini berada. Masukkan kelompok konsumen yang telah Anda buat di konsol ApsaraMQ for Kafka. 
        // Instans konsumen yang berada dalam kelompok konsumen yang sama. Instans-instans ini mengonsumsi pesan dalam mode load balancing. 
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        /* 
         * Jika Anda menggunakan titik akhir SSL untuk mengakses instans, hapus komentar baris kode berikut: 
         * Buat objek konsumen, yaitu instans konsumen. 
         * Atur algoritma verifikasi hostname menjadi nilai kosong. 
         props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
         */

        int consumerNum = 2;
        Thread[] consumerThreads = new Thread[consumerNum];
        for (int i = 0; i < consumerNum; i++) {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

            List<String> subscribedTopics = new ArrayList<String>();
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);

            KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
            consumerThreads[i] = new Thread(kafkaConsumerRunner);
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].start();
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].join();
        }
    }

    static class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {
            try {
                while (!closed.get()) {
                    try {
                        ConsumerRecords<String, String> records = consumer.poll(1000);
                        // Semua pesan harus dikonsumsi sebelum siklus polling berikutnya dimulai. Durasi total tidak boleh melebihi interval yang ditentukan oleh SESSION_TIMEOUT_MS_CONFIG. 
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.println(String.format("Thread:%s Consume partition:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
                        }
                    } catch (Exception e) {
                        try {
                            Thread.sleep(1000);
                        } catch (Throwable ignore) {

                        }
                        e.printStackTrace();
                    }
                }
            } catch (WakeupException e) {
                // Jika konsumen dimatikan, abaikan pengecualian tersebut. 
                if (!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }
        // Implementasikan hook shutdown yang dapat dipanggil oleh thread lain. 
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}

FAQ

Bagaimana cara mengonfigurasi sertifikat SASL_SSL di ApsaraMQ for Kafka?

Anda dapat mengonfigurasi sertifikat SASL_SSL di ApsaraMQ for Kafka dengan langkah-langkah berikut: Unduh sertifikat SSL melalui tautan pada Langkah 1 bagian "Persiapkan file konfigurasi", simpan ke jalur lokal, lalu konfigurasikan parameter ssl.truststore.location dalam file konfigurasi kafka.properties pada proyek demo.

Apakah saya dapat mengikat sertifikat SSL saya sendiri saat menggunakan SDK untuk Java untuk mengakses titik akhir instans ApsaraMQ for Kafka guna mengirim dan menerima pesan?

Tidak, Anda tidak dapat mengikat sertifikat SSL Anda sendiri saat menggunakan SDK untuk Java untuk mengakses titik akhir instans ApsaraMQ for Kafka guna mengirim dan menerima pesan. Disarankan agar Anda menggunakan sertifikat SSL yang disediakan oleh ApsaraMQ for Kafka.

Referensi