アプリケーションを ApsaraMQ for Kafka インスタンスに接続して、インスタンスのエンドポイントを使用してメッセージを送受信できます。ApsaraMQ for Kafka は、接続とセキュリティの要件を満たすために、インスタンスにデフォルトのエンドポイント、Security Sockets Layer (SSL) エンドポイント、および Simple Authentication and Secure Layer (SASL) エンドポイントを提供します。
前提条件
JDK 1.8 以降がインストールされていること。詳細については、「Java Downloads」をご参照ください。
Maven 2.5 以降がインストールされていること。詳細については、「Downloading Apache Maven」をご参照ください。
コンパイラがインストールされていること。
このトピックでは、IntelliJ IDEA Ultimate を使用します。
ApsaraMQ for Kafka インスタンスが購入され、デプロイされていること。
Virtual Private Cloud (VPC) 接続インスタンス: デフォルトでは、デフォルトのエンドポイントのみが表示されます。このタイプのインスタンスには VPC 内でのみアクセスできます。
インターネットおよび VPC 接続インスタンス: デフォルトでは、デフォルトのエンドポイントと SSL エンドポイントが表示されます。このタイプのインスタンスには、インターネット経由または VPC 内でアクセスできます。
説明インスタンス構成のアップグレードまたはダウングレードによって、VPC 接続インスタンスとインターネットおよび VPC 接続インスタンスを切り替えることができます。詳細については、「インスタンス構成のアップグレードとダウングレード」をご参照ください。
デフォルトでは、インスタンスに対して SASL エンドポイントは有効になっていません。そのため、SASL エンドポイントは表示されません。SASL エンドポイントを使用する場合は、手動で有効にする必要があります。詳細については、「SASL ユーザーに権限を付与する」をご参照ください。
エンドポイントが使用されるシナリオの詳細については、「エンドポイント間の比較」をご参照ください。
Java の依存関係をインストールする
次のサンプルコードは、SDK for Java を使用して ApsaraMQ for Kafka インスタンスに接続する際に必要な依存関係の例を示しています。依存関係は kafka-java-demo フォルダの pom.xml ファイルに組み込まれています。依存関係を手動でインストールする必要はありません。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>クライアントのバージョンは、ApsaraMQ for Kafka インスタンスのメジャーバージョンと一致させることを推奨します。ApsaraMQ for Kafka インスタンスのメジャーバージョンは、ApsaraMQ for Kafka コンソールの インスタンスの詳細 ページで確認できます。
構成ファイルの準備
(オプション) SSL ルート証明書をダウンロードします。SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、証明書をインストールする必要があります。
aliware-kafka-demos ページに移動し、
をクリックしてデモプロジェクトをオンプレミスマシンにダウンロードし、デモプロジェクトのパッケージを解凍します。解凍したデモプロジェクトで、kafka-java-demo フォルダを見つけて、そのフォルダを IntelliJ IDEA にインポートします。
(オプション) SSL エンドポイントまたは SASL エンドポイントを使用して ApsaraMQ for Kafka インスタンスにアクセスする場合は、kafka_client_jaas.conf 構成ファイルを変更する必要があります。エンドポイント間の違いの詳細については、「エンドポイント間の比較」をご参照ください。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx"; };ApsaraMQ for Kafka インスタンスが VPC 接続インスタンスの場合、同じ VPC にデプロイされたリソースのみがインスタンスにアクセスできます。これにより、データ伝送のセキュリティとプライバシーが確保されます。高いセキュリティが要求されるシナリオでは、アクセス制御リスト (ACL) 機能を有効にできます。この機能を有効にすると、SASL の身分認証が成功した後にのみ、メッセージがセキュアチャネルで送信されます。セキュリティ保護に関するビジネス要件に基づいて、身分認証に PLAIN または SCRAM メカニズムを選択できます。詳細については、「ACL 機能を有効にする」をご参照ください。
ApsaraMQ for Kafka インスタンスがインターネットおよび VPC 接続インスタンスの場合、インターネット経由で送信されるメッセージは認証および暗号化される必要があります。SASL の PLAIN メカニズムは SSL と一緒に使用して、メッセージが暗号化されずにプレーンテキストで送信されないようにする必要があります。
このトピックでは、username および password パラメーターの値は、インスタンスの SASL ユーザー名とパスワードです。
インスタンスでインターネットアクセスを有効にしているが ACL を有効にしていない場合、ApsaraMQ for Kafka コンソールの [インスタンス詳細] ページの 設定情報 セクションで、デフォルトユーザーのユーザー名とパスワードを取得できます。
インスタンスで ACL を有効にする場合は、使用する SASL ユーザーが PLAIN タイプであり、メッセージの送受信に必要な権限が付与されていることを確認してください。詳細については、「SASL ユーザーに権限を付与する」をご参照ください。
kafka.properties 構成ファイルを変更します。
##==============================共通パラメーター============================== bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx topic=xxx group.id=xxx ##=======================次のパラメーターを実際の値に設定します。======================== ## SSL エンドポイント。 ssl.truststore.location=/xxxx/only.4096.client.truststore.jks ##ssl.truststore.password は KafkaOnsClient である必要があり、変更できません ssl.truststore.password=KafkaOnsClient ##ホスト名の検証、空のままにしてください、変更する必要はありません ssl.endpoint.identification.algorithm= java.security.auth.login.config=/xxxx/kafka_client_jaas.conf ## SASL エンドポイントの PLAIN メカニズム。 java.security.auth.login.config.plain=/xxxx/kafka_client_jaas_plain.conf ## SASL エンドポイントの SCRAM メカニズム。 java.security.auth.login.config.scram=/xxxx/kafka_client_jaas_scram.confパラメーター
説明
bootstrap.servers
エンドポイント情報。ApsaraMQ for Kafka コンソールの インスタンスの詳細 ページの アクセスポイント情報 セクションでエンドポイントを取得できます。
topic
インスタンス上の Topic の名前です。Topic 名は、ApsaraMQ for Kafka コンソールの トピック管理 ページで取得できます。
group.id
インスタンス上のグループの ID です。グループ ID は、ApsaraMQ for Kafka コンソールのGroup の管理 ページで取得できます。
説明クライアントが producer.go を実行してメッセージを送信する場合、このパラメーターはオプションです。クライアントが consumer.go を実行してメッセージをサブスクライブする場合、このパラメーターは必須です。
ssl.truststore.location
SSL ルート証明書が保存されるパス。「構成ファイルの準備」セクションでダウンロードした SSL 証明書ファイルをローカルパスに保存し、サンプルコードの xxxx をそのローカルパスに置き換える必要があります。例: /home/ssl/only.4096.client.truststore.jks。
重要デフォルトのエンドポイントまたは SASL エンドポイントを使用してインスタンスにアクセスする場合、このパラメーターは不要です。SSL エンドポイントを使用してインスタンスにアクセスする場合、このパラメーターは必須です。
ssl.truststore.password
サーバー証明書のパスワード。KafkaOnsClient に固定されています。変更できません。
ssl.endpoint.identification.algorithm
ホスト名の検証。空に設定します。変更できません。
java.security.auth.login.config
JAAS 構成ファイルが保存されるパス。デモプロジェクトの kafka_client_jaas.conf ファイルをローカルパスに保存し、サンプルコードの xxxx をそのローカルパスに置き換える必要があります。例: /home/ssl/kafka_client_jaas.conf。
重要デフォルトのエンドポイントを使用してインスタンスにアクセスする場合、このパラメーターは不要です。SSL エンドポイントまたは SASL エンドポイントを使用してインスタンスにアクセスする場合、このパラメーターは必須です。
メッセージを送信する
次のサンプルコードは、KafkaProducerDemo.java をコンパイルして実行し、メッセージを送信する方法の例を示しています。
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
// SSL エンドポイントまたは SASL エンドポイントを使用してインスタンスにアクセスする場合は、次のコードの最初の行をコメントアウトします。
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
* SSL エンドポイントまたは SASL エンドポイントを使用してインスタンスにアクセスする場合は、次の 2 行のコードのコメントを解除します。
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
public class KafkaProducerDemo {
public static void main(String args[]) {
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JAAS 構成ファイルのパスを指定します。
JavaKafkaConfigurer.configureSasl();
*/
/*
* SASL エンドポイントの PLAIN メカニズムを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JAAS 構成ファイルのパスを指定します。
JavaKafkaConfigurer.configureSaslPlain();
*/
/*
* SASL エンドポイントの SCRAM メカニズムを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JAAS 構成ファイルのパスを指定します。
JavaKafkaConfigurer.configureSaslScram();
*/
// kafka.properties ファイルをロードします。
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// エンドポイントを指定します。Topic へのアクセスに使用するエンドポイントは、ApsaraMQ for Kafka コンソールの [インスタンス詳細] ページで取得できます。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の 4 行のコードのコメントを解除します。
* ファイルを JAR パッケージに圧縮しないでください。
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
* ルート証明書のトラストストアのパスワード。デフォルト値を使用します。
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
* アクセスプロトコル。このパラメーターを SASL_SSL に設定します。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
* SASL 認証方式。デフォルト値を使用します。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* SASL エンドポイントの PLAIN メカニズムを使用してインスタンスにアクセスする場合は、次の 2 行のコードのコメントを解除します。
* アクセスプロトコル。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* PLAIN メカニズム。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* SASL エンドポイントの SCRAM メカニズムを使用してインスタンスにアクセスする場合は、次の 2 行のコードのコメントを解除します。
* アクセスプロトコル。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* SCRAM メカニズム。
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
*/
// ApsaraMQ for Kafka でメッセージをシリアル化するために使用されるメソッド。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// リクエストの最大待機時間。
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
// クライアントでのメッセージの最大リトライ回数。
props.put(ProducerConfig.RETRIES_CONFIG, 5);
// クライアントでのメッセージの 2 回の連続したリトライの間隔。
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
* ホスト名検証のアルゴリズムを空の値に設定します。
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
*/
// スレッドセーフなプロデューサーオブジェクトを構築します。1 つのプロセスに対して 1 つのプロデューサーオブジェクトを構築します。
// パフォーマンスを向上させるために、複数のプロデューサーオブジェクトを構築できます。最大 5 つのプロデューサーオブジェクトを構築することを推奨します。
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// ApsaraMQ for Kafka メッセージを構築します。
String topic = kafkaProperties.getProperty("topic"); // メッセージが属する Topic。ApsaraMQ for Kafka コンソールで作成した Topic を入力します。
String value = "this is the message's value"; // メッセージの内容。
try {
// 複数の future オブジェクトを同時に取得します。これにより効率が向上します。ただし、一度に多数の future オブジェクトを取得しないでください。
List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
for (int i =0; i < 100; i++) {
// メッセージを送信し、future オブジェクトを取得します。
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);
}
producer.flush();
for (Future<RecordMetadata> future: futures) {
// 同期的に future オブジェクトの結果を取得します。
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
// 最大リトライ回数に達してもメッセージの送信に失敗する場合は、エラーのトラブルシューティングを行います。
System.out.println("error occurred");
e.printStackTrace();
}
}
}メッセージをサブスクライブする
次のいずれかの方法でメッセージをサブスクライブできます。
単一のコンシューマーを使用してメッセージをサブスクライブする
次のサンプルコードは、KafkaConsumerDemo.java をコンパイルして実行し、メッセージをサブスクライブする方法の例を示しています。
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の 3 行のコードのコメントを解除します。SASL エンドポイントを使用してインスタンスにアクセスする場合は、次のコードの最初の 2 行のコメントを解除します。
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
public class KafkaConsumerDemo {
public static void main(String args[]) {
// JAAS 構成ファイルのパスを指定します。
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JavaKafkaConfigurer.configureSasl();
*/
/*
* SASL エンドポイントの PLAIN メカニズムを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JavaKafkaConfigurer.configureSaslPlain();
*/
/*
* SASL エンドポイントの SCRAM メカニズムを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JavaKafkaConfigurer.configureSaslScram();
*/
// kafka.properties ファイルをロードします。
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// エンドポイントを指定します。Topic へのアクセスに使用するエンドポイントは、ApsaraMQ for Kafka コンソールの [インスタンス詳細] ページで取得できます。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
// SSL エンドポイントを使用してインスタンスにアクセスする場合は、次のコードの最初の行のコメントを解除します。
// セッションのタイムアウト期間。コンシューマーがセッションタイムアウト前にハートビートを返さない場合、ブローカーはコンシューマーが生存していないと判断します。この場合、ブローカーはコンシューマーを使用者グループから削除し、リバランスをトリガーします。デフォルト値は 30 秒です。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の 6 行のコードのコメントを解除します。
* SSL ルート証明書が保存されるパスを指定します。XXX を実際のパスに置き換えます。
* 証明書ファイルを JAR パッケージに圧縮しないでください。
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
* ルート証明書ストアのトラストストアのパスワード。デフォルト値を使用します。
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
* アクセスプロトコル。このパラメーターを SASL_SSL に設定します。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
* SASL 認証方式。デフォルト値を使用します。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
* 2 つの連続するポーリングサイクル間の最大間隔。
* デフォルトの間隔は 30 秒です。コンシューマーがこの間隔内にハートビートを返さない場合、ブローカーはコンシューマーが生存していないと判断します。この場合、ブローカーはコンシューマーを使用者グループから削除し、リバランスをトリガーします。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
* 1 回のプル操作で許可される最大メッセージサイズを指定します。データがインターネット経由で送信される場合、このパラメーターはパフォーマンスに大きな影響を与える可能性があります。
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
*/
// SASL エンドポイントの PLAIN メカニズムを使用してインスタンスにアクセスする場合は、次の行のコードをコメントアウトします。
// セッションのタイムアウト期間。コンシューマーがセッションタイムアウト前にハートビートを返さない場合、ブローカーはコンシューマーが生存していないと判断します。この場合、ブローカーはコンシューマーを使用者グループから削除し、リバランスをトリガーします。デフォルト値は 30 秒です。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
/*
* SASL エンドポイントの PLAIN メカニズムを使用してインスタンスにアクセスする場合は、次の 3 行のコードのコメントを解除します。
* アクセスプロトコル。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* PLAIN メカニズム。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
* 2 つの連続するポーリングサイクル間の最大間隔。
* デフォルトの間隔は 30 秒です。コンシューマーがこの間隔内にハートビートを返さない場合、ブローカーはコンシューマーが生存していないと判断します。この場合、ブローカーはコンシューマーを使用者グループから削除し、リバランスをトリガーします。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
*/
// SASL エンドポイントの SCRAM メカニズムを使用してインスタンスにアクセスする場合は、次の行のコードをコメントアウトします。
// セッションのタイムアウト期間。コンシューマーがセッションタイムアウト前にハートビートを返さない場合、ブローカーはコンシューマーが生存していないと判断します。その後、ブローカーはコンシューマーを使用者グループから削除し、リバランスをトリガーします。デフォルト値は 30 秒です。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
/*
* SASL エンドポイントの SCRAM メカニズムを使用してインスタンスにアクセスする場合は、次の 4 行のコードのコメントを解除します。
* アクセスプロトコル。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* SCRAM メカニズム。
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
* 2 つの連続するポーリングサイクル間の最大間隔。
* デフォルトの間隔は 30 秒です。コンシューマーがこの間隔内にハートビートを返さない場合、ブローカーはコンシューマーが生存していないと判断します。この場合、ブローカーはコンシューマーを使用者グループから削除し、リバランスをトリガーします。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
*/
// 一度にポーリングできるメッセージの最大数。
// このパラメーターを過度に大きな値に設定しないでください。ポーリングされたメッセージが次のポーリングが開始される前にすべて消費されない場合、ロードバランシングがトリガーされ、パフォーマンスが低下する可能性があります。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
// メッセージを逆シリアル化するために使用されるメソッド。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 現在のコンシューマーインスタンスが属する使用者グループ。ApsaraMQ for Kafka コンソールで作成した使用者グループを入力します。
// 同じ使用者グループに属するコンシューマーインスタンス。これらのインスタンスは、ロードバランシングモードでメッセージを消費します。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
// SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
// ホスト名検証のアルゴリズムを空の値に設定します。
//props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// メッセージオブジェクトを構築します。これはコンシューマーインスタンスです。
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
// 使用者グループがサブスクライブする 1 つ以上の Topic を指定します。
// 同じ GROUP_ID_CONFIG 値を持つコンシューマーが同じ Topic をサブスクライブするように設定することを推奨します。
List<String> subscribedTopics = new ArrayList<String>();
// SSL エンドポイントを使用してインスタンスにアクセスする場合は、次のコードの最初の 5 行をコメントアウトし、6 行目のコメントを解除します。
// 複数の Topic をサブスクライブする場合は、ここに追加します。
// 事前に ApsaraMQ for Kafka コンソールで Topic を作成する必要があります。
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic: topics) {
subscribedTopics.add(topic.trim());
}
//subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
// ループでメッセージを消費します。
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// すべてのメッセージは、次のポーリングサイクルが開始される前に消費される必要があります。合計時間は、SESSION_TIMEOUT_MS_CONFIG で指定されたタイムアウト間隔を超えることはできません。
// 別のスレッドプールを作成してメッセージを消費し、非同期的に結果を返すことを推奨します。
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
}複数のコンシューマーを使用してメッセージをサブスクライブする
次のサンプルコードは、KafkaMultiConsumerDemo.java をコンパイルして実行し、メッセージをサブスクライブする方法の例を示しています。
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
// SSL エンドポイントまたは SASL エンドポイントを使用してインスタンスにアクセスする場合は、次のコードの最初の行をコメントアウトします。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次のコードの最初の 3 行のコメントを解除します。SASL エンドポイントを使用してインスタンスにアクセスする場合は、次のコードの最初の 2 行のコメントを解除します。
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
import org.apache.kafka.common.errors.WakeupException;
/**
* このチュートリアルでは、1 つのプロセスで複数のコンシューマーを使用して同時にメッセージを消費する方法を示します。
* 環境内のコンシューマーの総数が、コンシューマーがサブスクライブする Topic のパーティション数を超えないようにしてください。
*/
public class KafkaMultiConsumerDemo {
public static void main(String args[]) throws InterruptedException {
// JAAS 構成ファイルのパスを指定します。
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JavaKafkaConfigurer.configureSasl();
*/
/*
* SASL エンドポイントの PLAIN メカニズムを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JavaKafkaConfigurer.configureSaslPlain();
*/
/*
* SASL エンドポイントの SCRAM メカニズムを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
JavaKafkaConfigurer.configureSaslScram();
*/
// kafka.properties ファイルをロードします。
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// エンドポイントを指定します。Topic へのアクセスに使用するエンドポイントは、ApsaraMQ for Kafka コンソールの [インスタンス詳細] ページで取得できます。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の 4 行のコードのコメントを解除します。
* 証明書ファイルを JAR パッケージに圧縮しないでください。
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
* ルート証明書ストアのトラストストアのパスワード。デフォルト値を使用します。
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
* アクセスプロトコル。このパラメーターを SASL_SSL に設定します。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
* SASL 認証方式。デフォルト値を使用します。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* SASL エンドポイントの PLAIN メカニズムを使用してインスタンスにアクセスする場合は、次の 2 行のコードのコメントを解除します。
* アクセスプロトコル。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* PLAIN メカニズム。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* SASL エンドポイントの SCRAM メカニズムを使用してインスタンスにアクセスする場合は、次の 2 行のコードのコメントを解除します。
* アクセスプロトコル。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* SCRAM メカニズム。
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
*/
// 2 つの連続するポーリングサイクル間の最大間隔。
// デフォルトの間隔は 30 秒です。コンシューマーがこの間隔内にハートビートを返さない場合、ブローカーはコンシューマーが生存していないと判断します。この場合、ブローカーはコンシューマーを使用者グループから削除し、リバランスをトリガーします。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// 同時にポーリングできるメッセージの最大数。
// このパラメーターを過度に大きな値に設定しないでください。ポーリングされたすべてのメッセージが次のポーリングサイクルが開始される前に消費されない場合、ロードバランシングがトリガーされ、パフォーマンスが低下する可能性があります。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
// メッセージを逆シリアル化するために使用されるメソッド。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 現在のコンシューマーインスタンスが属する使用者グループ。ApsaraMQ for Kafka コンソールで作成した使用者グループを入力します。
// 同じ使用者グループに属するコンシューマーインスタンス。これらのインスタンスは、ロードバランシングモードでメッセージを消費します。
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
/*
* SSL エンドポイントを使用してインスタンスにアクセスする場合は、次の行のコードのコメントを解除します。
* コンシューマーオブジェクトを構築します。これはコンシューマーインスタンスです。
* ホスト名検証のアルゴリズムを空の値に設定します。
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
*/
int consumerNum = 2;
Thread[] consumerThreads = new Thread[consumerNum];
for (int i = 0; i < consumerNum; i++) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopics);
KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
consumerThreads[i] = new Thread(kafkaConsumerRunner);
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].start();
}
for (int i = 0; i < consumerNum; i++) {
consumerThreads[i].join();
}
}
static class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
while (!closed.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// すべてのメッセージは、SESSION_TIMEOUT_MS_CONFIG で指定された間隔を超えない合計時間で、次のポーリングサイクルが開始される前に消費される必要があります。
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Thread:%s Consume partition:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
} catch (WakeupException e) {
// コンシューマーがシャットダウンされた場合、例外を無視します。
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
// 別のスレッドから呼び出すことができるシャットダウンフックを実装します。
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}よくある質問
ApsaraMQ for Kafka で SASL_SSL 証明書を構成するにはどうすればよいですか?
ApsaraMQ for Kafka で SASL_SSL 証明書を構成するには、次の操作を実行します。このトピックの「構成ファイルの準備」セクションのステップ 1 のリンクにアクセスして SSL 証明書をローカルパスにダウンロードし、デモプロジェクトの kafka.properties 構成ファイルで ssl.truststore.location パラメーターを構成します。
SDK for Java を使用して ApsaraMQ for Kafka インスタンスのエンドポイントにアクセスしてメッセージを送受信するときに、独自の SSL 証明書をバインドできますか?
いいえ、SDK for Java を使用して ApsaraMQ for Kafka インスタンスのエンドポイントにアクセスしてメッセージを送受信するときに、独自の SSL 証明書をバインドすることはできません。ApsaraMQ for Kafka が提供する SSL 証明書を使用することを推奨します。
関連ドキュメント
Spring Cloud フレームワークを使用して ApsaraMQ for Kafka インスタンスにアクセスし、メッセージを送受信することもできます。詳細については、「Spring Cloud フレームワークを使用してメッセージを送受信する」をご参照ください。
SDK for Java を使用して期待どおりにメッセージを送受信できない場合は、インスタンスの実行ステータスを確認してください。詳細については、「ApsaraMQ for Kafka インスタンスのヘルスチェックを実行する」をご参照ください。