Spring Cloud は、メッセージ駆動型のマイクロサービス指向アプリケーションを構築するために使用されるフレームワークです。 サービスディスカバリ、構成管理、メッセージ送信、負荷分散などのマイクロサービス関連のソリューションを提供するこのフレームワークは、分散システムを効率的に構築し、マイクロサービス間の通信を実装するために使用できます。 このトピックでは、Spring Cloud フレームワークを使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。
前提条件
JDK 1.8 以降がインストールされていること。 詳細については、Java のダウンロードをご参照ください。
Maven 2.5 以降がインストールされていること。 詳細については、Apache Maven のダウンロードをご参照ください。
デモパッケージが kafka-spring-stream-demo からダウンロードされ、準備された Linux システムにアップロードされていること。
ApsaraMQ for Kafka インスタンスのバージョン 2.x 以降が作成されていること。 インスタンスのバージョンをアップグレードする方法については、インスタンスバージョンのアップグレードをご参照ください。
インスタンスにトピックとコンシューマーグループが作成されていること。 詳細については、手順 3: リソースの作成をご参照ください。
インターネット環境(メッセージ送信に認証と暗号化が必要)
クライアントがインターネット経由で ApsaraMQ for Kafka インスタンスに接続する場合、SASL_SSL プロトコルが認証と暗号化に使用されます。 インターネット環境では、クライアントは Secure Sockets Layer(SSL)エンドポイントを使用して ApsaraMQ for Kafka インスタンスにアクセスします。 エンドポイントについては、エンドポイントの比較をご参照ください。
この例では、デモパッケージは /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo ディレクトリにアップロードされています。
Linux システムにログオンし、次のコマンドを実行して、デモパッケージの /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo ディレクトリに移動します。
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo次のコマンドを実行して、構成ファイルのディレクトリに移動します。
cd sasl-ssl/src/main/resources/次のコマンドを実行して、application.properties 構成ファイルを開き、パラメーターに基づいて構成ファイルにインスタンス設定を指定します。
vi application.properties## インスタンス設定に基づいて、次のパラメーターを構成します。 kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093 kafka.consumer.group=test-spring kafka.output.topic.name=test-output kafka.input.topic.name=test-input kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks ### バインディングパラメーターを構成して、ApsaraMQ for Kafka インスタンスを Spring Cloud Stream Binder にバインドします。 次のパラメーターのデフォルト設定を保持します。 spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name} spring.cloud.stream.bindings.MyOutput.contentType=text/plain spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group} spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name} spring.cloud.stream.bindings.MyInput.contentType=text/plain ### Binder は、メッセージングミドルウェア用の Spring Cloud のカプセル化モジュールです。 次のパラメーターのデフォルト設定を保持します。 spring.cloud.stream.kafka.binder.autoCreateTopics=false spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers} spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location} spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient ### デモに次のパラメーターが含まれていない場合は、手動でパラメーターを追加します。 このパラメーターは、サーバーホスト名検証を有効にするかどうかを指定します。 Simple Authentication and Security Layer(SASL)がID 検証に使用されるため、このパラメーターを空の文字列に設定して、サーバーホスト名検証を無効にすることができます。 ### サーバーホスト名検証は、ルート SSL 証明書のホスト名がサーバーのホスト名と一致するかどうかを検証することです。 このパラメーターのデフォルト値は HTTPS です。 spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=表 1. パラメーター
パラメーター
説明
kafka.bootstrap-servers
ApsaraMQ for Kafka インスタンスのエンドポイントです。アクセスポイント情報 セクションの インスタンスの詳細 ページ (ApsaraMQ for Kafka コンソール) でエンドポイントを取得できます。
kafka.consumer.group
メッセージをサブスクライブするコンシューマー グループ。 Group の管理 ページでコンシューマーグループを作成できます。このページは、ApsaraMQ for Kafka コンソール内にあります。詳細については、「ステップ 3: リソースの作成」をご参照ください。
kafka.output.topic.name
送信メッセージのトピック。 コンソールプログラムは、このトピックを使用して定期的にメッセージを送信します。 各メッセージの内容は固定です。 トピック管理ApsaraMQ for Kafka コンソールの [トピック] ページでトピックを作成できます。 詳細については、手順 3: リソースの作成をご参照ください。
kafka.input.topic.name
受信メッセージのトピック。 このトピックを使用して、コンソールでメッセージを送信できます。 デモプログラムはメッセージを消費し、ログにメッセージを表示します。
kafka.ssl.truststore.location
ルート SSL 証明書 kafka.client.truststore.jks の保存パス。
次のコマンドを実行して kafka_client_jaas.conf ファイルを開き、インスタンスの SASL ユーザーのユーザー名とパスワードを指定します。
vi kafka_client_jaas.conf説明ApsaraMQ for Kafka インスタンスでアクセス制御リスト(ACL)機能が無効になっている場合、ApsaraMQ for Kafka コンソールの [インスタンスの詳細] ページで、インスタンスのデフォルトの SASL ユーザーのユーザー名とパスワードを取得できます。
ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合、使用する SASL ユーザーが PLAIN タイプであり、ユーザーにメッセージを送受信する権限が付与されていることを確認してください。 詳細については、SASL ユーザーへの権限の付与をご参照ください。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX"; };/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl ディレクトリに移動し、次のコマンドを実行してデモを実行します。
sh run_demo.sh次の情報が返された場合、デモプログラムは kafka.output.topic.name で指定されたトピックを使用して、コンソールプログラムから送信されたメッセージを受信します。
Send: hello world !! Send: hello world !! Send: hello world !! Send: hello world !!ApsaraMQ for Kafka コンソール にログオンして、メッセージが送受信されているかどうかを確認します。
VPC 環境(メッセージ送信に認証と暗号化が不要)
クライアントが仮想プライベートクラウド(VPC)内の ApsaraMQ for Kafka インスタンスに接続する場合、PLAINTEXT プロトコルがメッセージの送信に使用され、認証と暗号化は不要です。 VPC 環境では、クライアントはデフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスにアクセスします。 エンドポイントについては、エンドポイントの比較をご参照ください。
この例では、デモパッケージは /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo ディレクトリにアップロードされています。
Linux システムにログオンし、次のコマンドを実行して、デモパッケージの /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo ディレクトリに移動します。
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo次のコマンドを実行して、構成ファイルのディレクトリに移動します。
cd vpc/src/main/resources/次のコマンドを実行して application.properties 構成ファイルを開き、パラメーターに基づいて構成ファイルにインスタンス設定を指定します。
vi application.properties### インスタンス設定に基づいて次のパラメーターを構成します。 kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 kafka.consumer.group=test-spring kafka.output.topic.name=test-output kafka.input.topic.name=test-input/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc ディレクトリに移動し、次のコマンドを実行してデモを実行します。
sh run_demo.sh次の情報が返されます。
Send: hello world !! Send: hello world !! Send: hello world !! Send: hello world !!ApsaraMQ for Kafka コンソール にログオンして、メッセージが送受信されているかどうかを確認します。
参照資料
Spring Cloud フレームワークの詳細については、Spring Cloud Stream リファレンスドキュメントをご参照ください。