全部产品
Search
文档中心

ApsaraMQ for Kafka:Mengirim dan berlangganan pesan menggunakan titik akhir SSL dengan autentikasi PLAIN

更新时间:Sep 02, 2025

Topik ini menjelaskan cara menggunakan SDK ApsaraMQ for Kafka untuk mengakses titik akhir Secure Sockets Layer (SSL) dari instance ApsaraMQ for Kafka melalui Internet dan menggunakan mekanisme PLAIN untuk mengirim serta berlangganan pesan. Contoh ini menggunakan SDK Java.

Prasyarat

Instal dependensi Java

Tambahkan dependensi berikut ke file pom.xml:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>
Catatan

Disarankan agar versi klien Anda sesuai dengan versi utama instance ApsaraMQ for Kafka. Versi utama instance ApsaraMQ for Kafka dapat dilihat di halaman Instance Details pada konsol ApsaraMQ for Kafka.

Persiapan

  1. Buat file konfigurasi Log4j bernama log4j.properties.

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    log4j.rootLogger=INFO, STDOUT
    
    log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
    log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
    log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%n
  2. Unduh sertifikat root SSL.

  3. Buat file konfigurasi JAAS bernama kafka_client_jaas.conf.

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    };                       
    Catatan
    • Jika fitur daftar kontrol akses (ACL) dinonaktifkan untuk instance, nama pengguna dan kata sandi pengguna Simple Authentication and Security Layer (SASL) default dapat diperoleh di halaman Instance Details pada konsol ApsaraMQ for Kafka.

    • Jika ACL diaktifkan untuk instance, pastikan bahwa pengguna SASL yang digunakan adalah tipe PLAIN dan memiliki otorisasi untuk mengirim dan mengonsumsi pesan. Untuk informasi lebih lanjut, lihat Memberikan Izin kepada Pengguna SASL.

  4. Buat file konfigurasi ApsaraMQ for Kafka bernama kafka.properties.

    ## Tentukan titik akhir SSL, yang dapat diperoleh di konsol Message Queue for Apache Kafka. 
    bootstrap.servers=xxxx
    ## Tentukan topik, yang dibuat di konsol Message Queue for Apache Kafka. 
    topic=xxxx
    ## Tentukan grup konsumen, yang dibuat di konsol Message Queue for Apache Kafka.Group 
    group.id=xxxx
    ## Sertifikat root SSL. 
    ssl.truststore.location=/xxxx/kafka.client.truststore.jks
    ## File konfigurasi JAAS. 
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf                       
  5. Buat program bernama JavaKafkaConfigurer.java untuk memuat file konfigurasi.

    import java.util.Properties;
    
    public class JavaKafkaConfigurer {
    
        private static Properties properties;
    
        public static void configureSasl() {
            // Jika Anda telah menggunakan parameter -D atau metode lain untuk menetapkan jalur, jangan tetapkan lagi di bagian ini. 
            if (null == System.getProperty("java.security.auth.login.config")) {
                // Ganti XXX dengan jalur sebenarnya. 
                // Pastikan jalur dapat dibaca oleh sistem file. Jangan kompres file konfigurasi ke dalam paket JAR. 
                System.setProperty("java.security.auth.login.config", getKafkaProperties().getProperty("java.security.auth.login.config"));
            }
        }
    
        public synchronized static Properties getKafkaProperties() {
            if (null != properties) {
                return properties;
            }
            // Dapatkan isi file kafka.properties. 
            Properties kafkaProperties = new Properties();
            try {
                kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
            } catch (Exception e) {
                // Jika file tidak dapat dimuat, keluar dari program. 
                e.printStackTrace();
            }
            properties = kafkaProperties;
            return kafkaProperties;
        }
    }                    

Mengirim pesan

  1. Buat program produser bernama KafkaProducerDemo.java yang berisi kode berikut:

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.config.SslConfigs;
    
    public class KafkaProducerDemo {
    
        public static void main(String args[]) {
            // Tentukan jalur file konfigurasi JAAS. 
            JavaKafkaConfigurer.configureSasl();
    
            // Muat file kafka.properties. 
            Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();
    
            Properties props = new Properties();
            // Tentukan titik akhir. Dapatkan titik akhir SASL dari instance yang sesuai di konsol Message Queue for Apache Kafka. 
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
            // Tentukan jalur sertifikat root SSL. Ganti XXX dengan jalur sebenarnya. 
            // Jangan kompres file sertifikat ke dalam paket JAR. 
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
            // Kata sandi truststore di penyimpanan sertifikat root. Gunakan nilai default. 
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            // Tentukan protokol akses. Tetapkan nilainya menjadi SASL_SSL. 
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            // Tentukan metode autentikasi SASL. Gunakan nilai default. 
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            // Tetapkan metode untuk mendeserialisasi pesan Message Queue for Apache Kafka. 
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            // Tetapkan waktu maksimum untuk menunggu permintaan. 
            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
            // Tetapkan jumlah maksimum percobaan ulang yang diizinkan untuk klien. 
            props.put(ProducerConfig.RETRIES_CONFIG, 5);
            // Tetapkan interval antara dua percobaan ulang berturut-turut untuk klien. 
            props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
    
            // Tetapkan algoritma untuk verifikasi nama host menjadi nilai kosong. 
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    
            // Buat objek produser thread-safe. Satu objek produser dapat melayani satu proses. 
            // Untuk meningkatkan kinerja, Anda dapat membuat beberapa objek. Kami menyarankan Anda membuat tidak lebih dari lima objek. 
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
            // Buat pesan Message Queue for Apache Kafka. 
            String topic = kafkaProperties.getProperty("topic"); // Topik pesan. Masukkan topik yang Anda buat di konsol Message Queue for Apache Kafka. 
            String value = "ini adalah nilai pesan"; // Isi pesan. 
    
            try {
                // Untuk meningkatkan efisiensi, peroleh beberapa objek future sekaligus. Jangan peroleh sejumlah besar objek future sekaligus. 
                List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
                for (int i =0; i < 100; i++) {
                    // Kirim pesan dan peroleh objek future. 
                    ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                    Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                    futures.add(metadataFuture);
    
                }
                producer.flush();
                for (Future<RecordMetadata> future: futures) {
                    // Sinkronkan objek future. 
                    try {
                        RecordMetadata recordMetadata = future.get();
                        System.out.println("Produce ok:" + recordMetadata.toString());
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            } catch (Exception e) {
                // Jika pesan masih gagal dikirim setelah percobaan ulang, lakukan pemecahan masalah kesalahan. 
                System.out.println("terjadi kesalahan");
                e.printStackTrace();
            }
        }
    }  
  2. Kompilasi dan jalankan KafkaProducerDemo.java untuk mengirim pesan.

Mengonsumsi pesan

Anda dapat mengonsumsi pesan menggunakan salah satu metode berikut:

  • Menggunakan satu konsumen untuk mengonsumsi pesan

    1. Buat program konsumen tunggal bernama KafkaConsumerDemo.java yang berisi kode berikut:

      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import org.apache.kafka.clients.CommonClientConfigs;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.config.SaslConfigs;
      import org.apache.kafka.common.config.SslConfigs;
      
      public class KafkaConsumerDemo {
      
          public static void main(String args[]) {
              // Tentukan jalur file konfigurasi JAAS. 
              JavaKafkaConfigurer.configureSasl();
      
              // Muat file kafka.properties. 
              Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();
      
              Properties props = new Properties();
              // Tentukan titik akhir. Dapatkan titik akhir SASL dari instance yang sesuai di konsol Message Queue for Apache Kafka. 
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
              // Tentukan jalur sertifikat root SSL. Ganti XXX dengan jalur sebenarnya. 
              // Jangan kompres file sertifikat ke dalam paket JAR. 
              props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
              // Tentukan kata sandi truststore di penyimpanan sertifikat root. Gunakan nilai default. 
              props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
              // Tentukan protokol akses. Tetapkan nilainya menjadi SASL_SSL. 
              props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
              // Tentukan metode autentikasi SASL. Gunakan nilai default. 
              props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
              // Tetapkan interval maksimum antara dua siklus polling berturut-turut. 
              // Interval default adalah 30 detik. Jika konsumen tidak mengembalikan pesan denyut jantung dalam interval tersebut, broker menentukan bahwa konsumen tidak hidup. Dalam hal ini, broker menghapus konsumen dari grup konsumen dan memicu penyeimbangan beban.Group 
              props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
              // Tetapkan ukuran maksimum pesan yang diizinkan untuk operasi poll tunggal. Parameter ini memiliki efek signifikan jika data ditransmisikan melalui Internet. 
              props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
              props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
              // Tetapkan jumlah maksimum pesan yang dapat dipolling sekaligus. 
              // Jangan tetapkan parameter ini ke nilai yang terlalu besar. Jika pesan yang dipolling tidak semua dikonsumsi sebelum siklus polling berikutnya dimulai, penyeimbangan beban dipicu dan kinerja mungkin menurun. 
              props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
              // Tetapkan metode untuk mendeserialisasi pesan. 
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              // Tetapkan grup konsumen dari konsumen saat ini. Anda harus membuat grup konsumen di konsol Message Queue for Apache Kafka. 
              // Konsumen dalam grup konsumen mengonsumsi pesan dalam mode penyeimbangan beban. 
              props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
              // Tetapkan algoritma untuk verifikasi nama host menjadi nilai kosong. 
              props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
      
              // Buat objek konsumen. 
              KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
              // Tentukan satu atau beberapa topik yang ingin didaftarkan oleh grup konsumen. 
              // Kami menyarankan Anda mengonfigurasi konsumen dengan nilai GROUP_ID_CONFIG yang sama untuk mendaftarkan topik yang sama. 
              List<String> subscribedTopics =  new ArrayList<String>();
              // Jika Anda ingin mendaftarkan beberapa topik, tambahkan topik-topik tersebut di sini. 
              // Anda harus membuat topik-topik tersebut di konsol Message Queue for Apache Kafka terlebih dahulu. 
              subscribedTopics.add(kafkaProperties.getProperty("topic"));
              consumer.subscribe(subscribedTopics);
      
              // Konsumsi pesan dalam loop. 
              while (true){
                  try {
                      ConsumerRecords<String, String> records = consumer.poll(1000);
                      // Semua pesan harus dikonsumsi sebelum siklus polling berikutnya dimulai. Durasi total tidak boleh melebihi interval yang ditentukan oleh SESSION_TIMEOUT_MS_CONFIG. 
                      // Kami menyarankan Anda membuat pool thread terpisah untuk mengonsumsi pesan dan kemudian secara asinkron mengembalikan hasilnya. 
                      for (ConsumerRecord<String, String> record : records) {
                          System.out.println(String.format("Konsumsi partisi:%d offset:%d", record.partition(), record.offset()));
                      }
                  } catch (Exception e) {
                      try {
                          Thread.sleep(1000);
                      } catch (Throwable ignore) {
      
                      }
                      e.printStackTrace();
                  }
              }
          }
      }
    2. Kompilasi dan jalankan KafkaConsumerDemo.java untuk mengonsumsi pesan.

  • Menggunakan beberapa konsumen untuk mengonsumsi pesan

    1. Buat program multi-konsumen bernama KafkaMultiConsumerDemo.java yang berisi kode berikut:

      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import java.util.concurrent.atomic.AtomicBoolean;
      import org.apache.kafka.clients.CommonClientConfigs;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.config.SaslConfigs;
      import org.apache.kafka.common.config.SslConfigs;
      import org.apache.kafka.common.errors.WakeupException;
      
      /**
       * Tutorial ini menunjukkan cara menggunakan beberapa konsumen untuk secara bersamaan mengonsumsi pesan dalam satu proses. 
       * Pastikan bahwa jumlah total konsumen di lingkungan tidak melebihi jumlah partisi dari topik yang disubskripsikan oleh konsumen. 
       */
      public class KafkaMultiConsumerDemo {
      
          public static void main(String args[]) throws InterruptedException {
              // Tentukan jalur file konfigurasi JAAS. 
              JavaKafkaConfigurer.configureSasl();
      
              // Muat file kafka.properties. 
              Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
      
              Properties props = new Properties();
              // Tentukan titik akhir. Dapatkan titik akhir SASL dari instance yang sesuai di konsol Message Queue for Apache Kafka. 
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
              // Tentukan jalur sertifikat root SSL. Ganti XXX dengan jalur sebenarnya. 
              // Jangan kompres file sertifikat ke dalam paket JAR. 
              props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
              // Tentukan kata sandi truststore di penyimpanan sertifikat root. Gunakan nilai default. 
              props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
              // Tentukan protokol akses. Tetapkan nilainya menjadi SASL_SSL. 
              props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
              // Tentukan metode autentikasi SASL. Gunakan nilai default. 
              props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
              // Tetapkan interval maksimum antara dua siklus polling berturut-turut. 
              // Interval default adalah 30 detik. Jika konsumen tidak mengembalikan pesan denyut jantung dalam interval tersebut, broker menentukan bahwa konsumen tidak hidup. Dalam hal ini, broker menghapus konsumen dari grup konsumen dan memicu penyeimbangan beban.Group 
              props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
              // Tetapkan jumlah maksimum pesan yang dapat dipolling sekaligus. 
              // Jangan tetapkan parameter ini ke nilai yang terlalu besar. Jika pesan yang dipolling tidak semua dikonsumsi sebelum siklus polling berikutnya dimulai, penyeimbangan beban dipicu dan kinerja mungkin menurun. 
              props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
              // Tetapkan metode untuk mendeserialisasi pesan. 
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              // Tetapkan grup konsumen dari konsumen saat ini. Anda harus membuat grup konsumen di konsol Message Queue for Apache Kafka. 
              // Konsumen dalam grup konsumen mengonsumsi pesan dalam mode penyeimbangan beban. 
              props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
              // Buat objek konsumen. 
      
              // Tetapkan algoritma untuk verifikasi nama host menjadi nilai kosong. 
              props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
      
              int consumerNum = 2;
              Thread[] consumerThreads = new Thread[consumerNum];
              for (int i = 0; i < consumerNum; i++) {
                  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
                  List<String> subscribedTopics = new ArrayList<String>();
                  subscribedTopics.add(kafkaProperties.getProperty("topic"));
                  consumer.subscribe(subscribedTopics);
      
                  KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
                  consumerThreads[i] = new Thread(kafkaConsumerRunner);
              }
      
              for (int i = 0; i < consumerNum; i++) {
                  consumerThreads[i].start();
              }
      
              for (int i = 0; i < consumerNum; i++) {
                  consumerThreads[i].join();
              }
          }
      
          static class KafkaConsumerRunner implements Runnable {
              private final AtomicBoolean closed = new AtomicBoolean(false);
              private final KafkaConsumer consumer;
      
              KafkaConsumerRunner(KafkaConsumer consumer) {
                  this.consumer = consumer;
              }
      
              @Override
              public void run() {
                  try {
                      while (!closed.get()) {
                          try {
                              ConsumerRecords<String, String> records = consumer.poll(1000);
                              // Semua pesan harus dikonsumsi sebelum siklus polling berikutnya dimulai. Durasi total tidak boleh melebihi interval yang ditentukan oleh SESSION_TIMEOUT_MS_CONFIG. 
                              for (ConsumerRecord<String, String> record : records) {
                                  System.out.println(String.format("Thread:%s Konsumsi partisi:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
                              }
                          } catch (Exception e) {
                              try {
                                  Thread.sleep(1000);
                              } catch (Throwable ignore) {
      
                              }
                              e.printStackTrace();
                          }
                      }
                  } catch (WakeupException e) {
                      // Jika konsumen dimatikan, abaikan pengecualian. 
                      if (!closed.get()) {
                          throw e;
                      }
                  } finally {
                      consumer.close();
                  }
              }
      
              // Implementasikan hook shutdown yang dapat dipanggil oleh thread lain. 
              public void shutdown() {
                  closed.set(true);
                  consumer.wakeup();
              }
          }
      }
    2. Kompilasi dan jalankan KafkaMultiConsumerDemo.java untuk mengonsumsi pesan.