Topik ini menjelaskan cara menggunakan SDK ApsaraMQ for Kafka untuk mengakses titik akhir Secure Sockets Layer (SSL) dari instance ApsaraMQ for Kafka melalui Internet dan menggunakan mekanisme PLAIN untuk mengirim serta berlangganan pesan. Contoh ini menggunakan SDK Java.
Prasyarat
JDK 1.8 atau yang lebih baru telah diinstal. Untuk informasi lebih lanjut, lihat Java SE Downloads.
Maven 2.5 atau yang lebih baru telah diinstal. Untuk informasi lebih lanjut, lihat Download Apache Maven.
Instal dependensi Java
Tambahkan dependensi berikut ke file pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>Disarankan agar versi klien Anda sesuai dengan versi utama instance ApsaraMQ for Kafka. Versi utama instance ApsaraMQ for Kafka dapat dilihat di halaman Instance Details pada konsol ApsaraMQ for Kafka.
Persiapan
Buat file konfigurasi Log4j bernama log4j.properties.
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. log4j.rootLogger=INFO, STDOUT log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%nBuat file konfigurasi JAAS bernama kafka_client_jaas.conf.
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx"; };CatatanJika fitur daftar kontrol akses (ACL) dinonaktifkan untuk instance, nama pengguna dan kata sandi pengguna Simple Authentication and Security Layer (SASL) default dapat diperoleh di halaman Instance Details pada konsol ApsaraMQ for Kafka.
Jika ACL diaktifkan untuk instance, pastikan bahwa pengguna SASL yang digunakan adalah tipe PLAIN dan memiliki otorisasi untuk mengirim dan mengonsumsi pesan. Untuk informasi lebih lanjut, lihat Memberikan Izin kepada Pengguna SASL.
Buat file konfigurasi ApsaraMQ for Kafka bernama kafka.properties.
## Tentukan titik akhir SSL, yang dapat diperoleh di konsol Message Queue for Apache Kafka. bootstrap.servers=xxxx ## Tentukan topik, yang dibuat di konsol Message Queue for Apache Kafka. topic=xxxx ## Tentukan grup konsumen, yang dibuat di konsol Message Queue for Apache Kafka.Group group.id=xxxx ## Sertifikat root SSL. ssl.truststore.location=/xxxx/kafka.client.truststore.jks ## File konfigurasi JAAS. java.security.auth.login.config=/xxxx/kafka_client_jaas.confBuat program bernama JavaKafkaConfigurer.java untuk memuat file konfigurasi.
import java.util.Properties; public class JavaKafkaConfigurer { private static Properties properties; public static void configureSasl() { // Jika Anda telah menggunakan parameter -D atau metode lain untuk menetapkan jalur, jangan tetapkan lagi di bagian ini. if (null == System.getProperty("java.security.auth.login.config")) { // Ganti XXX dengan jalur sebenarnya. // Pastikan jalur dapat dibaca oleh sistem file. Jangan kompres file konfigurasi ke dalam paket JAR. System.setProperty("java.security.auth.login.config", getKafkaProperties().getProperty("java.security.auth.login.config")); } } public synchronized static Properties getKafkaProperties() { if (null != properties) { return properties; } // Dapatkan isi file kafka.properties. Properties kafkaProperties = new Properties(); try { kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties")); } catch (Exception e) { // Jika file tidak dapat dimuat, keluar dari program. e.printStackTrace(); } properties = kafkaProperties; return kafkaProperties; } }
Mengirim pesan
Buat program produser bernama KafkaProducerDemo.java yang berisi kode berikut:
import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; public class KafkaProducerDemo { public static void main(String args[]) { // Tentukan jalur file konfigurasi JAAS. JavaKafkaConfigurer.configureSasl(); // Muat file kafka.properties. Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties(); Properties props = new Properties(); // Tentukan titik akhir. Dapatkan titik akhir SASL dari instance yang sesuai di konsol Message Queue for Apache Kafka. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers")); // Tentukan jalur sertifikat root SSL. Ganti XXX dengan jalur sebenarnya. // Jangan kompres file sertifikat ke dalam paket JAR. props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location")); // Kata sandi truststore di penyimpanan sertifikat root. Gunakan nilai default. props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); // Tentukan protokol akses. Tetapkan nilainya menjadi SASL_SSL. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); // Tentukan metode autentikasi SASL. Gunakan nilai default. props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // Tetapkan metode untuk mendeserialisasi pesan Message Queue for Apache Kafka. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Tetapkan waktu maksimum untuk menunggu permintaan. props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000); // Tetapkan jumlah maksimum percobaan ulang yang diizinkan untuk klien. props.put(ProducerConfig.RETRIES_CONFIG, 5); // Tetapkan interval antara dua percobaan ulang berturut-turut untuk klien. props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000); // Tetapkan algoritma untuk verifikasi nama host menjadi nilai kosong. props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); // Buat objek produser thread-safe. Satu objek produser dapat melayani satu proses. // Untuk meningkatkan kinerja, Anda dapat membuat beberapa objek. Kami menyarankan Anda membuat tidak lebih dari lima objek. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); // Buat pesan Message Queue for Apache Kafka. String topic = kafkaProperties.getProperty("topic"); // Topik pesan. Masukkan topik yang Anda buat di konsol Message Queue for Apache Kafka. String value = "ini adalah nilai pesan"; // Isi pesan. try { // Untuk meningkatkan efisiensi, peroleh beberapa objek future sekaligus. Jangan peroleh sejumlah besar objek future sekaligus. List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128); for (int i =0; i < 100; i++) { // Kirim pesan dan peroleh objek future. ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value + ": " + i); Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage); futures.add(metadataFuture); } producer.flush(); for (Future<RecordMetadata> future: futures) { // Sinkronkan objek future. try { RecordMetadata recordMetadata = future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { t.printStackTrace(); } } } catch (Exception e) { // Jika pesan masih gagal dikirim setelah percobaan ulang, lakukan pemecahan masalah kesalahan. System.out.println("terjadi kesalahan"); e.printStackTrace(); } } }Kompilasi dan jalankan KafkaProducerDemo.java untuk mengirim pesan.
Mengonsumsi pesan
Anda dapat mengonsumsi pesan menggunakan salah satu metode berikut:
Menggunakan satu konsumen untuk mengonsumsi pesan
Buat program konsumen tunggal bernama KafkaConsumerDemo.java yang berisi kode berikut:
import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; public class KafkaConsumerDemo { public static void main(String args[]) { // Tentukan jalur file konfigurasi JAAS. JavaKafkaConfigurer.configureSasl(); // Muat file kafka.properties. Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties(); Properties props = new Properties(); // Tentukan titik akhir. Dapatkan titik akhir SASL dari instance yang sesuai di konsol Message Queue for Apache Kafka. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers")); // Tentukan jalur sertifikat root SSL. Ganti XXX dengan jalur sebenarnya. // Jangan kompres file sertifikat ke dalam paket JAR. props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location")); // Tentukan kata sandi truststore di penyimpanan sertifikat root. Gunakan nilai default. props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); // Tentukan protokol akses. Tetapkan nilainya menjadi SASL_SSL. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); // Tentukan metode autentikasi SASL. Gunakan nilai default. props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // Tetapkan interval maksimum antara dua siklus polling berturut-turut. // Interval default adalah 30 detik. Jika konsumen tidak mengembalikan pesan denyut jantung dalam interval tersebut, broker menentukan bahwa konsumen tidak hidup. Dalam hal ini, broker menghapus konsumen dari grup konsumen dan memicu penyeimbangan beban.Group props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // Tetapkan ukuran maksimum pesan yang diizinkan untuk operasi poll tunggal. Parameter ini memiliki efek signifikan jika data ditransmisikan melalui Internet. props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); // Tetapkan jumlah maksimum pesan yang dapat dipolling sekaligus. // Jangan tetapkan parameter ini ke nilai yang terlalu besar. Jika pesan yang dipolling tidak semua dikonsumsi sebelum siklus polling berikutnya dimulai, penyeimbangan beban dipicu dan kinerja mungkin menurun. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); // Tetapkan metode untuk mendeserialisasi pesan. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // Tetapkan grup konsumen dari konsumen saat ini. Anda harus membuat grup konsumen di konsol Message Queue for Apache Kafka. // Konsumen dalam grup konsumen mengonsumsi pesan dalam mode penyeimbangan beban. props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id")); // Tetapkan algoritma untuk verifikasi nama host menjadi nilai kosong. props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); // Buat objek konsumen. KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props); // Tentukan satu atau beberapa topik yang ingin didaftarkan oleh grup konsumen. // Kami menyarankan Anda mengonfigurasi konsumen dengan nilai GROUP_ID_CONFIG yang sama untuk mendaftarkan topik yang sama. List<String> subscribedTopics = new ArrayList<String>(); // Jika Anda ingin mendaftarkan beberapa topik, tambahkan topik-topik tersebut di sini. // Anda harus membuat topik-topik tersebut di konsol Message Queue for Apache Kafka terlebih dahulu. subscribedTopics.add(kafkaProperties.getProperty("topic")); consumer.subscribe(subscribedTopics); // Konsumsi pesan dalam loop. while (true){ try { ConsumerRecords<String, String> records = consumer.poll(1000); // Semua pesan harus dikonsumsi sebelum siklus polling berikutnya dimulai. Durasi total tidak boleh melebihi interval yang ditentukan oleh SESSION_TIMEOUT_MS_CONFIG. // Kami menyarankan Anda membuat pool thread terpisah untuk mengonsumsi pesan dan kemudian secara asinkron mengembalikan hasilnya. for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("Konsumsi partisi:%d offset:%d", record.partition(), record.offset())); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } } }Kompilasi dan jalankan KafkaConsumerDemo.java untuk mengonsumsi pesan.
Menggunakan beberapa konsumen untuk mengonsumsi pesan
Buat program multi-konsumen bernama KafkaMultiConsumerDemo.java yang berisi kode berikut:
import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.errors.WakeupException; /** * Tutorial ini menunjukkan cara menggunakan beberapa konsumen untuk secara bersamaan mengonsumsi pesan dalam satu proses. * Pastikan bahwa jumlah total konsumen di lingkungan tidak melebihi jumlah partisi dari topik yang disubskripsikan oleh konsumen. */ public class KafkaMultiConsumerDemo { public static void main(String args[]) throws InterruptedException { // Tentukan jalur file konfigurasi JAAS. JavaKafkaConfigurer.configureSasl(); // Muat file kafka.properties. Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties(); Properties props = new Properties(); // Tentukan titik akhir. Dapatkan titik akhir SASL dari instance yang sesuai di konsol Message Queue for Apache Kafka. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers")); // Tentukan jalur sertifikat root SSL. Ganti XXX dengan jalur sebenarnya. // Jangan kompres file sertifikat ke dalam paket JAR. props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location")); // Tentukan kata sandi truststore di penyimpanan sertifikat root. Gunakan nilai default. props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); // Tentukan protokol akses. Tetapkan nilainya menjadi SASL_SSL. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); // Tentukan metode autentikasi SASL. Gunakan nilai default. props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // Tetapkan interval maksimum antara dua siklus polling berturut-turut. // Interval default adalah 30 detik. Jika konsumen tidak mengembalikan pesan denyut jantung dalam interval tersebut, broker menentukan bahwa konsumen tidak hidup. Dalam hal ini, broker menghapus konsumen dari grup konsumen dan memicu penyeimbangan beban.Group props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // Tetapkan jumlah maksimum pesan yang dapat dipolling sekaligus. // Jangan tetapkan parameter ini ke nilai yang terlalu besar. Jika pesan yang dipolling tidak semua dikonsumsi sebelum siklus polling berikutnya dimulai, penyeimbangan beban dipicu dan kinerja mungkin menurun. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); // Tetapkan metode untuk mendeserialisasi pesan. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // Tetapkan grup konsumen dari konsumen saat ini. Anda harus membuat grup konsumen di konsol Message Queue for Apache Kafka. // Konsumen dalam grup konsumen mengonsumsi pesan dalam mode penyeimbangan beban. props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id")); // Buat objek konsumen. // Tetapkan algoritma untuk verifikasi nama host menjadi nilai kosong. props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); int consumerNum = 2; Thread[] consumerThreads = new Thread[consumerNum]; for (int i = 0; i < consumerNum; i++) { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); List<String> subscribedTopics = new ArrayList<String>(); subscribedTopics.add(kafkaProperties.getProperty("topic")); consumer.subscribe(subscribedTopics); KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer); consumerThreads[i] = new Thread(kafkaConsumerRunner); } for (int i = 0; i < consumerNum; i++) { consumerThreads[i].start(); } for (int i = 0; i < consumerNum; i++) { consumerThreads[i].join(); } } static class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; KafkaConsumerRunner(KafkaConsumer consumer) { this.consumer = consumer; } @Override public void run() { try { while (!closed.get()) { try { ConsumerRecords<String, String> records = consumer.poll(1000); // Semua pesan harus dikonsumsi sebelum siklus polling berikutnya dimulai. Durasi total tidak boleh melebihi interval yang ditentukan oleh SESSION_TIMEOUT_MS_CONFIG. for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("Thread:%s Konsumsi partisi:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset())); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } } catch (WakeupException e) { // Jika konsumen dimatikan, abaikan pengecualian. if (!closed.get()) { throw e; } } finally { consumer.close(); } } // Implementasikan hook shutdown yang dapat dipanggil oleh thread lain. public void shutdown() { closed.set(true); consumer.wakeup(); } } }Kompilasi dan jalankan KafkaMultiConsumerDemo.java untuk mengonsumsi pesan.