Untuk memproduksi dan mengonsumsi pesan pada instans ApsaraMQ for Kafka melalui Internet, hubungkan ke titik akhir SSL dengan otentikasi SASL/PLAIN. Mekanisme PLAIN mentransmisikan kredensial dalam bentuk teks biasa, sehingga selalu gunakan bersama enkripsi SSL (SASL_SSL) guna melindungi kredensial selama transit.
Semua contoh dalam panduan ini menggunakan Java SDK.
Placeholder
Kumpulkan nilai-nilai berikut sebelum memulai. Ganti placeholder ini di semua contoh konfigurasi dan kode.
| Placeholder | Deskripsi | Cara menemukannya |
|---|---|---|
<bootstrap-servers> | Titik akhir SSL instans ApsaraMQ for Kafka Anda | Instance Details di Konsol ApsaraMQ for Kafka |
<topic-name> | Nama topik | Topics di Konsol ApsaraMQ for Kafka |
<group-id> | ID kelompok konsumen | Groups di Konsol ApsaraMQ for Kafka |
<truststore-path> | Jalur mutlak ke sertifikat root SSL di mesin Anda | Lihat Langkah 2: Unduh sertifikat root SSL |
<jaas-conf-path> | Jalur mutlak ke file konfigurasi JAAS | Lihat Langkah 3: Buat file konfigurasi JAAS |
<username> | Username SASL | Instance Details (ACL dinonaktifkan) atau kredensial pengguna SASL Anda (ACL diaktifkan) |
<password> | Password SASL | Sama seperti username |
Prasyarat
Instans ApsaraMQ for Kafka, topik, dan kelompok konsumen. Untuk detailnya, lihat Langkah 3: Buat sumber daya.
Tambahkan dependensi Java
Tambahkan dependensi berikut ke file pom.xml Anda:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>Sesuaikan versi utama kafka-clients dengan versi instans ApsaraMQ for Kafka Anda. Temukan versi instans di halaman Instance Details di Konsol ApsaraMQ for Kafka.
Persiapkan file konfigurasi
Buat file-file berikut sebelum menulis kode produsen dan konsumen.
| File | Tujuan |
|---|---|
log4j.properties | Konfigurasi output log |
Sertifikat root SSL (.jks) | Anchor kepercayaan TLS untuk koneksi broker |
kafka_client_jaas.conf | Kredensial SASL/PLAIN |
kafka.properties | Titik akhir broker, topik, kelompok konsumen, dan jalur file |
JavaKafkaConfigurer.java | Kelas helper yang memuat properti dan mengatur jalur JAAS |
Langkah 1: Buat file konfigurasi Log4j
Buat file bernama log4j.properties:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, STDOUT
log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%nLangkah 2: Unduh sertifikat root SSL
Unduh sertifikat root SSL dan simpan ke lokasi yang dapat diakses oleh aplikasi Anda. Catat jalur mutlaknya—Anda akan menggunakannya sebagai <truststore-path>.
Di lingkungan produksi, gunakan jalur mutlak lengkap ke file truststore. Jangan masukkan file tersebut ke dalam JAR.
Langkah 3: Buat file konfigurasi JAAS
Buat file bernama kafka_client_jaas.conf:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<username>"
password="<password>";
};Jika fitur daftar kontrol akses (ACL) dinonaktifkan untuk instans, temukan kredensial pengguna Simple Authentication and Security Layer (SASL) default di halaman Instance Details di Konsol ApsaraMQ for Kafka.
Jika ACL diaktifkan, pastikan pengguna SASL bertipe PLAIN dan memiliki izin untuk memproduksi serta mengonsumsi pesan. Untuk detailnya, lihat Grant permissions to SASL users.
Langkah 4: Buat file properti Kafka
Buat kafka.properties:
## Titik akhir SSL (dari Konsol ApsaraMQ for Kafka)
bootstrap.servers=<bootstrap-servers>
## Nama topik (dibuat di Konsol ApsaraMQ for Kafka)
topic=<topic-name>
## ID kelompok konsumen (dibuat di Konsol ApsaraMQ for Kafka)
group.id=<group-id>
## Jalur mutlak ke sertifikat root SSL
ssl.truststore.location=<truststore-path>
## Jalur mutlak ke file konfigurasi JAAS
java.security.auth.login.config=<jaas-conf-path>Langkah 5: Buat pemuat konfigurasi
Buat JavaKafkaConfigurer.java untuk memuat file properti dan mengatur jalur konfigurasi JAAS.
import java.util.Properties;
public class JavaKafkaConfigurer {
private static Properties properties;
public static void configureSasl() {
// Lewati jika jalur konfigurasi JAAS sudah diatur melalui flag -D atau metode lain
if (null == System.getProperty("java.security.auth.login.config")) {
System.setProperty("java.security.auth.login.config",
getKafkaProperties().getProperty("java.security.auth.login.config"));
}
}
public synchronized static Properties getKafkaProperties() {
if (null != properties) {
return properties;
}
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(
KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
e.printStackTrace();
}
properties = kafkaProperties;
return kafkaProperties;
}
}Referensi properti SSL dan SASL
Properti berikut digunakan bersama oleh contoh produsen dan konsumen. Properti ini mengonfigurasi transport SSL dan otentikasi SASL/PLAIN.
| Properti | Nilai | Deskripsi |
|---|---|---|
security.protocol | SASL_SSL | Enkripsi traffic dengan TLS dan otentikasi dengan SASL |
sasl.mechanism | PLAIN | Gunakan mekanisme otentikasi PLAIN |
ssl.truststore.location | <truststore-path> | Jalur ke sertifikat root SSL (file .jks) |
ssl.truststore.password | KafkaOnsClient | Password truststore default |
ssl.endpoint.identification.algorithm | (string kosong) | Nonaktifkan verifikasi hostname |
Produksi pesan
Buat KafkaProducerDemo.java:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaProducerDemo {
public static void main(String args[]) {
// Muat konfigurasi JAAS
JavaKafkaConfigurer.configureSasl();
// Muat kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- Otentikasi SSL + SASL/PLAIN ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// Nonaktifkan verifikasi hostname
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- Pengaturan produsen ---
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
// Buat produsen thread-safe (satu per proses biasanya cukup;
// untuk throughput lebih tinggi, buat hingga 5)
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// Topik untuk mengirim pesan
String topic = kafkaProperties.getProperty("topic");
String value = "this is the message's value";
try {
List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> kafkaMessage =
new ProducerRecord<String, String>(topic, value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);
}
producer.flush();
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
System.out.println("error occurred");
e.printStackTrace();
}
}
}Kompilasi dan jalankan KafkaProducerDemo.java untuk mengirim pesan.
Konsumsi pesan
Konsumen tunggal
Buat KafkaConsumerDemo.java:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaConsumerDemo {
public static void main(String args[]) {
// Muat konfigurasi JAAS
JavaKafkaConfigurer.configureSasl();
// Muat kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- Otentikasi SSL + SASL/PLAIN ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// Nonaktifkan verifikasi hostname
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- Pengaturan konsumen ---
// Timeout sesi (default: 30 detik). Jika tidak ada heartbeat dalam interval ini,
// broker menghapus konsumen dari grup dan memicu penyeimbangan ulang.
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// Byte maksimum yang diambil per partisi dan per permintaan.
// Sesuaikan nilai ini untuk koneksi Internet guna mengontrol bandwidth.
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
// Jumlah maksimum catatan yang dikembalikan per poll.
// Pertahankan nilai ini cukup rendah agar semua catatan diproses sebelum poll berikutnya;
// jika tidak, broker akan memicu penyeimbangan ulang.
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaProperties.getProperty("group.id"));
// Buat instans konsumen
KafkaConsumer<String, String> consumer =
new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
// Berlangganan satu atau beberapa topik
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
// Loop polling
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// Proses semua catatan sebelum polling berikutnya.
// Untuk throughput lebih baik, alihkan pemrosesan ke kolam thread terpisah.
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Consume partition:%d offset:%d",
record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
}Kompilasi dan jalankan KafkaConsumerDemo.java untuk mengonsumsi pesan.
Konsumen ganda
Untuk meningkatkan throughput, jalankan beberapa thread konsumen dalam satu proses yang sama. Jumlah total konsumen di seluruh proses tidak boleh melebihi jumlah partisi pada topik yang berlangganan.
Buat KafkaMultiConsumerDemo.java:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.WakeupException;
public class KafkaMultiConsumerDemo {
public static void main(String args[]) throws InterruptedException {
// Muat konfigurasi JAAS
JavaKafkaConfigurer.configureSasl();
// Muat kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- Otentikasi SSL + SASL/PLAIN ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// Nonaktifkan verifikasi hostname
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- Pengaturan konsumen ---
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaProperties.getProperty("group.id"));
// Mulai dua thread konsumen
int consumerNum = 2;
Thread[] consumerThreads = new Thread[consumerNum];
for (int i = 0; i < consumerNum; i++) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
consumerThreads[i] = new Thread(kafkaConsumerRunner);
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].start();
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].join();
}
}
static class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
while (!closed.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Thread:%s Consume partition:%d offset:%d",
Thread.currentThread().getName(),
record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
} catch (WakeupException e) {
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}Kompilasi dan jalankan KafkaMultiConsumerDemo.java untuk mengonsumsi pesan dengan beberapa thread.
Verifikasi hasil
Setelah menjalankan produsen dan konsumen, periksa output konsol.
Produsen — output sukses terlihat seperti:
Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@0
Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@1
...Konsumen — output sukses terlihat seperti:
Consume partition:0 offset:0
Consume partition:0 offset:1
...Jika salah satu program melemparkan exception, lihat Troubleshoot ApsaraMQ for Kafka client errors.