全部产品
Search
文档中心

ApsaraMQ for Kafka:Hubungkan ke ApsaraMQ for Kafka melalui SSL dengan otentikasi PLAIN

更新时间:Mar 12, 2026

Untuk memproduksi dan mengonsumsi pesan pada instans ApsaraMQ for Kafka melalui Internet, hubungkan ke titik akhir SSL dengan otentikasi SASL/PLAIN. Mekanisme PLAIN mentransmisikan kredensial dalam bentuk teks biasa, sehingga selalu gunakan bersama enkripsi SSL (SASL_SSL) guna melindungi kredensial selama transit.

Semua contoh dalam panduan ini menggunakan Java SDK.

Placeholder

Kumpulkan nilai-nilai berikut sebelum memulai. Ganti placeholder ini di semua contoh konfigurasi dan kode.

PlaceholderDeskripsiCara menemukannya
<bootstrap-servers>Titik akhir SSL instans ApsaraMQ for Kafka AndaInstance Details di Konsol ApsaraMQ for Kafka
<topic-name>Nama topikTopics di Konsol ApsaraMQ for Kafka
<group-id>ID kelompok konsumenGroups di Konsol ApsaraMQ for Kafka
<truststore-path>Jalur mutlak ke sertifikat root SSL di mesin AndaLihat Langkah 2: Unduh sertifikat root SSL
<jaas-conf-path>Jalur mutlak ke file konfigurasi JAASLihat Langkah 3: Buat file konfigurasi JAAS
<username>Username SASLInstance Details (ACL dinonaktifkan) atau kredensial pengguna SASL Anda (ACL diaktifkan)
<password>Password SASLSama seperti username

Prasyarat

Tambahkan dependensi Java

Tambahkan dependensi berikut ke file pom.xml Anda:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>
Catatan

Sesuaikan versi utama kafka-clients dengan versi instans ApsaraMQ for Kafka Anda. Temukan versi instans di halaman Instance Details di Konsol ApsaraMQ for Kafka.

Persiapkan file konfigurasi

Buat file-file berikut sebelum menulis kode produsen dan konsumen.

FileTujuan
log4j.propertiesKonfigurasi output log
Sertifikat root SSL (.jks)Anchor kepercayaan TLS untuk koneksi broker
kafka_client_jaas.confKredensial SASL/PLAIN
kafka.propertiesTitik akhir broker, topik, kelompok konsumen, dan jalur file
JavaKafkaConfigurer.javaKelas helper yang memuat properti dan mengatur jalur JAAS

Langkah 1: Buat file konfigurasi Log4j

Buat file bernama log4j.properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO, STDOUT

log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%n

Langkah 2: Unduh sertifikat root SSL

Unduh sertifikat root SSL dan simpan ke lokasi yang dapat diakses oleh aplikasi Anda. Catat jalur mutlaknya—Anda akan menggunakannya sebagai <truststore-path>.

Peringatan

Di lingkungan produksi, gunakan jalur mutlak lengkap ke file truststore. Jangan masukkan file tersebut ke dalam JAR.

Langkah 3: Buat file konfigurasi JAAS

Buat file bernama kafka_client_jaas.conf:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<username>"
  password="<password>";
};
Catatan
  • Jika fitur daftar kontrol akses (ACL) dinonaktifkan untuk instans, temukan kredensial pengguna Simple Authentication and Security Layer (SASL) default di halaman Instance Details di Konsol ApsaraMQ for Kafka.

  • Jika ACL diaktifkan, pastikan pengguna SASL bertipe PLAIN dan memiliki izin untuk memproduksi serta mengonsumsi pesan. Untuk detailnya, lihat Grant permissions to SASL users.

Langkah 4: Buat file properti Kafka

Buat kafka.properties:

## Titik akhir SSL (dari Konsol ApsaraMQ for Kafka)
bootstrap.servers=<bootstrap-servers>
## Nama topik (dibuat di Konsol ApsaraMQ for Kafka)
topic=<topic-name>
## ID kelompok konsumen (dibuat di Konsol ApsaraMQ for Kafka)
group.id=<group-id>
## Jalur mutlak ke sertifikat root SSL
ssl.truststore.location=<truststore-path>
## Jalur mutlak ke file konfigurasi JAAS
java.security.auth.login.config=<jaas-conf-path>

Langkah 5: Buat pemuat konfigurasi

Buat JavaKafkaConfigurer.java untuk memuat file properti dan mengatur jalur konfigurasi JAAS.

import java.util.Properties;

public class JavaKafkaConfigurer {

    private static Properties properties;

    public static void configureSasl() {
        // Lewati jika jalur konfigurasi JAAS sudah diatur melalui flag -D atau metode lain
        if (null == System.getProperty("java.security.auth.login.config")) {
            System.setProperty("java.security.auth.login.config",
                getKafkaProperties().getProperty("java.security.auth.login.config"));
        }
    }

    public synchronized static Properties getKafkaProperties() {
        if (null != properties) {
            return properties;
        }
        Properties kafkaProperties = new Properties();
        try {
            kafkaProperties.load(
                KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        properties = kafkaProperties;
        return kafkaProperties;
    }
}

Referensi properti SSL dan SASL

Properti berikut digunakan bersama oleh contoh produsen dan konsumen. Properti ini mengonfigurasi transport SSL dan otentikasi SASL/PLAIN.

PropertiNilaiDeskripsi
security.protocolSASL_SSLEnkripsi traffic dengan TLS dan otentikasi dengan SASL
sasl.mechanismPLAINGunakan mekanisme otentikasi PLAIN
ssl.truststore.location<truststore-path>Jalur ke sertifikat root SSL (file .jks)
ssl.truststore.passwordKafkaOnsClientPassword truststore default
ssl.endpoint.identification.algorithm(string kosong)Nonaktifkan verifikasi hostname

Produksi pesan

Buat KafkaProducerDemo.java:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

public class KafkaProducerDemo {

    public static void main(String args[]) {
        // Muat konfigurasi JAAS
        JavaKafkaConfigurer.configureSasl();

        // Muat kafka.properties
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();

        // --- Otentikasi SSL + SASL/PLAIN ---
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // Nonaktifkan verifikasi hostname
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // --- Pengaturan produsen ---
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);

        // Buat produsen thread-safe (satu per proses biasanya cukup;
        // untuk throughput lebih tinggi, buat hingga 5)
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // Topik untuk mengirim pesan
        String topic = kafkaProperties.getProperty("topic");
        String value = "this is the message's value";

        try {
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i = 0; i < 100; i++) {
                ProducerRecord<String, String> kafkaMessage =
                    new ProducerRecord<String, String>(topic, value + ": " + i);
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);
            }
            producer.flush();
            for (Future<RecordMetadata> future : futures) {
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
}

Kompilasi dan jalankan KafkaProducerDemo.java untuk mengirim pesan.

Konsumsi pesan

Konsumen tunggal

Buat KafkaConsumerDemo.java:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

public class KafkaConsumerDemo {

    public static void main(String args[]) {
        // Muat konfigurasi JAAS
        JavaKafkaConfigurer.configureSasl();

        // Muat kafka.properties
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();

        // --- Otentikasi SSL + SASL/PLAIN ---
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // Nonaktifkan verifikasi hostname
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // --- Pengaturan konsumen ---
        // Timeout sesi (default: 30 detik). Jika tidak ada heartbeat dalam interval ini,
        // broker menghapus konsumen dari grup dan memicu penyeimbangan ulang.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        // Byte maksimum yang diambil per partisi dan per permintaan.
        // Sesuaikan nilai ini untuk koneksi Internet guna mengontrol bandwidth.
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
        // Jumlah maksimum catatan yang dikembalikan per poll.
        // Pertahankan nilai ini cukup rendah agar semua catatan diproses sebelum poll berikutnya;
        // jika tidak, broker akan memicu penyeimbangan ulang.
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
            kafkaProperties.getProperty("group.id"));

        // Buat instans konsumen
        KafkaConsumer<String, String> consumer =
            new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);

        // Berlangganan satu atau beberapa topik
        List<String> subscribedTopics = new ArrayList<String>();
        subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);

        // Loop polling
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // Proses semua catatan sebelum polling berikutnya.
                // Untuk throughput lebih baik, alihkan pemrosesan ke kolam thread terpisah.
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(
                        String.format("Consume partition:%d offset:%d",
                            record.partition(), record.offset()));
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }
}

Kompilasi dan jalankan KafkaConsumerDemo.java untuk mengonsumsi pesan.

Konsumen ganda

Untuk meningkatkan throughput, jalankan beberapa thread konsumen dalam satu proses yang sama. Jumlah total konsumen di seluruh proses tidak boleh melebihi jumlah partisi pada topik yang berlangganan.

Buat KafkaMultiConsumerDemo.java:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaMultiConsumerDemo {

    public static void main(String args[]) throws InterruptedException {
        // Muat konfigurasi JAAS
        JavaKafkaConfigurer.configureSasl();

        // Muat kafka.properties
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();

        // --- Otentikasi SSL + SASL/PLAIN ---
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getProperty("bootstrap.servers"));
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            kafkaProperties.getProperty("ssl.truststore.location"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // Nonaktifkan verifikasi hostname
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // --- Pengaturan konsumen ---
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
            kafkaProperties.getProperty("group.id"));

        // Mulai dua thread konsumen
        int consumerNum = 2;
        Thread[] consumerThreads = new Thread[consumerNum];
        for (int i = 0; i < consumerNum; i++) {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

            List<String> subscribedTopics = new ArrayList<String>();
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);

            KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
            consumerThreads[i] = new Thread(kafkaConsumerRunner);
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].start();
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].join();
        }
    }

    static class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {
            try {
                while (!closed.get()) {
                    try {
                        ConsumerRecords<String, String> records = consumer.poll(1000);
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.println(
                                String.format("Thread:%s Consume partition:%d offset:%d",
                                    Thread.currentThread().getName(),
                                    record.partition(), record.offset()));
                        }
                    } catch (Exception e) {
                        try {
                            Thread.sleep(1000);
                        } catch (Throwable ignore) {
                        }
                        e.printStackTrace();
                    }
                }
            } catch (WakeupException e) {
                if (!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}

Kompilasi dan jalankan KafkaMultiConsumerDemo.java untuk mengonsumsi pesan dengan beberapa thread.

Verifikasi hasil

Setelah menjalankan produsen dan konsumen, periksa output konsol.

Produsen — output sukses terlihat seperti:

Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@0
Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@1
...

Konsumen — output sukses terlihat seperti:

Consume partition:0 offset:0
Consume partition:0 offset:1
...

Jika salah satu program melemparkan exception, lihat Troubleshoot ApsaraMQ for Kafka client errors.