インターネット経由で ApsaraMQ for Kafka インスタンスのメッセージを送受信するには、SASL/PLAIN 認証を使用して SSL エンドポイントに接続します。PLAIN メカニズムは認証情報をクリアテキストで送信するため、転送中の認証情報を保護するために、常に SSL 暗号化 (SASL_SSL) と組み合わせて使用してください。
このガイドのすべての例では、Java SDK を使用します。
プレースホルダー
開始する前に、以下の値を取得してください。すべての設定およびコードサンプルで、これらのプレースホルダーを置き換えてください。
| プレースホルダー | 説明 | 取得場所 |
|---|---|---|
<bootstrap-servers> | ご利用の ApsaraMQ for Kafka インスタンスの SSL エンドポイント | [インスタンス詳細] ページ |
<topic-name> | トピック名 | [トピック] ページ |
<group-id> | コンシューマーグループ ID | [グループ] ページ(ApsaraMQ for Kafka コンソール内) |
<truststore-path> | ご利用のマシン上の SSL ルート証明書への絶対パス | ステップ 2: SSL ルート証明書のダウンロードをご参照ください |
<jaas-conf-path> | JAAS 設定ファイルへの絶対パス | ステップ 3: JAAS 設定ファイルの作成をご参照ください |
<username> | SASL ユーザー名 | [インスタンス詳細] ページ (ACL が無効な場合) またはご利用の SASL ユーザー認証情報 (ACL が有効な場合) |
<password> | SASL パスワード | ユーザー名と同じ |
前提条件
ApsaraMQ for Kafka インスタンス、トピック、およびコンシューマーグループ。詳細については、「ステップ 3: リソースの作成」をご参照ください。
Java 依存関係の追加
次の依存関係を pom.xml ファイルに追加します:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>ApsaraMQ for Kafka インスタンスのバージョンと kafka-clients のメジャーバージョンを一致させてください。インスタンスのバージョンは、ApsaraMQ for Kafka コンソールの [インスタンス詳細] ページで確認できます。
設定ファイルの準備
プロデューサーとコンシューマーのコードを記述する前に、次のファイルを作成します。
| ファイル | 目的 |
|---|---|
log4j.properties | ログ出力設定 |
SSL ルート証明書 (.jks) | ブローカー接続用の TLS 信頼アンカー |
kafka_client_jaas.conf | SASL/PLAIN 認証情報 |
kafka.properties | ブローカーのエンドポイント、トピック、コンシューマーグループ、およびファイルパス |
JavaKafkaConfigurer.java | プロパティをロードし、JAAS パスを設定するヘルパークラス |
ステップ 1: Log4j 設定ファイルの作成
log4j.properties という名前のファイルを作成します:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, STDOUT
log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%nステップ 2: SSL ルート証明書のダウンロード
SSL ルート証明書をダウンロードし、アプリケーションからアクセス可能な場所に保存します。絶対パスを記録しておきます。このパスを <truststore-path> として使用します。
本番環境では、トラストストアファイルへの完全な絶対パスを使用してください。JAR 内にパッケージ化しないでください。
ステップ 3: JAAS 設定ファイルの作成
kafka_client_jaas.conf という名前のファイルを作成します:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<username>"
password="<password>";
};インスタンスでアクセス制御リスト (ACL) 機能が無効になっている場合、デフォルトの Simple Authentication and Security Layer (SASL) ユーザー認証情報は ApsaraMQ for Kafka コンソールの [インスタンス詳細] ページで確認できます。
ACL が有効な場合は、SASL ユーザーが PLAIN タイプで、メッセージを生成および消費する権限を持っていることを確認してください。詳細については、「SASL ユーザーに権限を付与する」をご参照ください。
ステップ 4: Kafka プロパティファイルの作成
kafka.properties を作成します:
## SSL エンドポイント (ApsaraMQ for Kafka コンソールから取得)
bootstrap.servers=<bootstrap-servers>
## トピック名 (ApsaraMQ for Kafka コンソールで作成)
topic=<topic-name>
## コンシューマーグループ ID (ApsaraMQ for Kafka コンソールで作成)
group.id=<group-id>
## SSL ルート証明書への絶対パス
ssl.truststore.location=<truststore-path>
## JAAS 設定ファイルへの絶対パス
java.security.auth.login.config=<jaas-conf-path>ステップ 5: 設定ローダーの作成
プロパティファイルをロードし、JAAS 設定パスを設定するために JavaKafkaConfigurer.java を作成します。
import java.util.Properties;
public class JavaKafkaConfigurer {
private static Properties properties;
public static void configureSasl() {
// JAAS 設定パスが -D フラグや他のメソッドですでに設定されている場合はスキップします
if (null == System.getProperty("java.security.auth.login.config")) {
System.setProperty("java.security.auth.login.config",
getKafkaProperties().getProperty("java.security.auth.login.config"));
}
}
public synchronized static Properties getKafkaProperties() {
if (null != properties) {
return properties;
}
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(
KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
e.printStackTrace();
}
properties = kafkaProperties;
return kafkaProperties;
}
}SSL および SASL プロパティリファレンス
以下のプロパティは、プロデューサーとコンシューマーの両方の例で共有されます。これらのプロパティは、SSL トランスポートと SASL/PLAIN 認証を設定します。
| プロパティ | 値 | 説明 |
|---|---|---|
security.protocol | SASL_SSL | TLS でトラフィックを暗号化し、SASL で認証します |
sasl.mechanism | PLAIN | PLAIN 認証メカニズムを使用します |
ssl.truststore.location | <truststore-path> | SSL ルート証明書 (.jks ファイル) へのパス |
ssl.truststore.password | KafkaOnsClient | デフォルトのトラストストアパスワード |
ssl.endpoint.identification.algorithm | (空の文字列) | ホスト名検証を無効にします |
メッセージの生成
KafkaProducerDemo.java を作成します:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaProducerDemo {
public static void main(String args[]) {
// JAAS 設定のロード
JavaKafkaConfigurer.configureSasl();
// kafka.properties のロード
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- SSL + SASL/PLAIN 認証 ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// ホスト名検証の無効化
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- プロデューサー設定 ---
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
// スレッドセーフなプロデューサーを作成します (通常はプロセスごとに 1 つで十分ですが、スループットを向上させるには最大 5 つ作成します)
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// メッセージの送信先トピック
String topic = kafkaProperties.getProperty("topic");
String value = "this is the message's value";
try {
List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> kafkaMessage =
new ProducerRecord<String, String>(topic, value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);
}
producer.flush();
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
System.out.println("error occurred");
e.printStackTrace();
}
}
}KafkaProducerDemo.java をコンパイルして実行し、メッセージを送信します。
メッセージの消費
単一コンシューマー
KafkaConsumerDemo.java を作成します:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaConsumerDemo {
public static void main(String args[]) {
// JAAS 設定のロード
JavaKafkaConfigurer.configureSasl();
// kafka.properties のロード
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- SSL + SASL/PLAIN 認証 ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// ホスト名検証の無効化
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- コンシューマー設定 ---
// セッションタイムアウト (デフォルト: 30 秒)。この間隔内にハートビートが受信されない場合、
// ブローカーはコンシューマーをグループから削除し、リバランスをトリガーします。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// パーティションごと、およびリクエストごとにフェッチされる最大バイト数。
// インターネット接続の場合は、これらの値を調整して帯域幅を制御します。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
// 1 回のポーリングで返されるレコードの最大数。
// 次のポーリングの前にすべてのレコードを処理できる程度に低く保ちます。
// そうしないと、ブローカーがリバランスをトリガーします。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaProperties.getProperty("group.id"));
// コンシューマーインスタンスの作成
KafkaConsumer<String, String> consumer =
new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
// 1 つ以上のトピックをサブスクライブします
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
// ポーリングループ
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// 次のポーリングの前にすべてのレコードを処理します。
// スループットを向上させるには、処理を別のスレッドプールにオフロードします。
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("Consume partition:%d offset:%d",
record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
}KafkaConsumerDemo.java をコンパイルして実行し、メッセージを消費します。
複数コンシューマー
スループットを向上させるには、同じプロセス内で複数のコンシューマースレッドを実行します。すべてのプロセスにわたるコンシューマーの総数は、サブスクライブしているトピックのパーティション数を超えてはなりません。
KafkaMultiConsumerDemo.java を作成します:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.WakeupException;
public class KafkaMultiConsumerDemo {
public static void main(String args[]) throws InterruptedException {
// JAAS 設定のロード
JavaKafkaConfigurer.configureSasl();
// kafka.properties のロード
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// --- SSL + SASL/PLAIN 認証 ---
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// ホスト名検証の無効化
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// --- コンシューマー設定 ---
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaProperties.getProperty("group.id"));
// 2 つのコンシューマースレッドを開始
int consumerNum = 2;
Thread[] consumerThreads = new Thread[consumerNum];
for (int i = 0; i < consumerNum; i++) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
consumerThreads[i] = new Thread(kafkaConsumerRunner);
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].start();
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].join();
}
}
static class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
while (!closed.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(
String.format("スレッド:%s パーティション:%d オフセット:%d を消費",
Thread.currentThread().getName(),
record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
} catch (WakeupException e) {
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}KafkaMultiConsumerDemo.java をコンパイルして実行し、複数のスレッドでメッセージを消費します。
結果の確認
プロデューサーとコンシューマーを実行した後、コンソール出力を確認します。
プロデューサー -- 成功した場合の出力は次のようになります:
Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@0
Produce ok:send-and-subscribe-to-messages-by-using-an-ssl-endpoint-with-plain-authentication-0@1
...コンシューマー -- 成功した場合の出力は次のようになります:
Consume partition:0 offset:0
Consume partition:0 offset:1
...いずれかのプログラムで例外がスローされた場合は、「ApsaraMQ for Kafka クライアントエラーのトラブルシューティング」をご参照ください。