Spring Cloud Stream provides a binder abstraction that decouples your application code from the underlying message broker. This topic walks you through connecting a Spring Cloud Stream application to an ApsaraMQ for Kafka instance to produce and consume messages.
The demo project includes two preconfigured profiles:
| Profile | Protocol | Port | Access method |
|---|---|---|---|
| SASL_SSL | SASL_SSL | 9093 | Internet (authentication and TLS encryption) |
| PLAINTEXT | PLAINTEXT | 9092 | VPC (no authentication or encryption) |
Prerequisites
Before you begin, make sure you have:
JDK 1.8 or later (download)
Maven 2.5 or later (download)
A Linux system with the kafka-spring-stream-demo demo package uploaded
An ApsaraMQ for Kafka instance (version 2.x or later). To upgrade, see Upgrade instance versions
At least two topics and one consumer group created on the instance. See Step 3: Create resources
Step 1: Configure the application
Navigate to the demo directory:
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demoThen open the application.properties file in the profile directory that matches your access method:
| Access method | Profile directory |
|---|---|
| Internet (SASL_SSL) | sasl-ssl/src/main/resources/ |
| VPC (PLAINTEXT) | vpc/src/main/resources/ |
Internet access (SASL_SSL)
In Internet environments, a client uses the SSL endpoint to access the ApsaraMQ for Kafka instance. For information about endpoints, see Comparison among endpoints.
Open the configuration file:
cd sasl-ssl/src/main/resources/
vi application.propertiesUpdate the following properties with your instance values:
## Instance-specific settings (replace with your values)
kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
kafka.consumer.group=test-spring
kafka.output.topic.name=test-output
kafka.input.topic.name=test-input
kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jksKeep the default Spring Cloud Stream binder settings unless you have specific requirements:
## Spring Cloud Stream bindings (retain defaults)
spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name}
spring.cloud.stream.bindings.MyOutput.contentType=text/plain
spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group}
spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name}
spring.cloud.stream.bindings.MyInput.contentType=text/plain
## Kafka binder configuration (retain defaults)
spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers}
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location}
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient
## Disable server hostname verification because SASL handles authentication.
## If this line is missing from the demo, add it manually.
spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=Configure SASL credentials
Open the kafka_client_jaas.conf file in the same directory and set the username and password:
vi kafka_client_jaas.confKafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<your-username>"
password="<your-password>";
};Replace the placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
<your-username> | SASL username | Instance Details page in the ApsaraMQ for Kafka console |
<your-password> | SASL password | Instance Details page in the ApsaraMQ for Kafka console |
If the Access Control List (ACL) feature is disabled on your instance, get the default SASL username and password from the Instance Details page in the ApsaraMQ for Kafka console. If the ACL feature is enabled, the SASL user must be of the PLAIN type and have send and receive permissions. See Grant permissions to SASL users.
VPC access (PLAINTEXT)
In VPC environments, a client uses the default endpoint to access the ApsaraMQ for Kafka instance. For information about endpoints, see Comparison among endpoints.
Open the configuration file:
cd vpc/src/main/resources/
vi application.propertiesUpdate the following properties with your instance values:
## Instance-specific settings (replace with your values)
kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
kafka.consumer.group=test-spring
kafka.output.topic.name=test-output
kafka.input.topic.name=test-inputNo additional security configuration is needed for VPC access.
Parameter reference
| Parameter | Description |
|---|---|
kafka.bootstrap-servers | SSL endpoint (Internet) or default endpoint (VPC) of your ApsaraMQ for Kafka instance. Find this in the Endpoint Information section on the Instance Details page in the ApsaraMQ for Kafka console. |
kafka.consumer.group | Consumer group that subscribes to messages. Create one on the Groups page in the ApsaraMQ for Kafka console. See Step 3: Create resources. |
kafka.output.topic.name | Topic for outbound messages. The demo sends fixed-content messages to this topic at regular intervals. Create one on the Topics page in the ApsaraMQ for Kafka console. See Step 3: Create resources. |
kafka.input.topic.name | Topic for inbound messages. Send a message to this topic from the console, and the demo consumes and logs it. |
kafka.ssl.truststore.location | Path to the SSL root certificate kafka.client.truststore.jks. Internet access only. |
Step 2: Run the demo
Navigate to the profile directory that matches your access method, then run the demo script:
Internet access (SASL_SSL)
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl
sh run_demo.shVPC access (PLAINTEXT)
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc
sh run_demo.shOn success, output similar to the following appears:
Send: hello world !!
Send: hello world !!
Send: hello world !!
Send: hello world !!This confirms the demo is producing messages to the topic specified in kafka.output.topic.name.
Step 3: Verify message delivery
Verify produced messages -- Log in to the ApsaraMQ for Kafka console and check whether the topic specified in
kafka.output.topic.namereceived the messages sent by the demo. See Query messages.Verify consumed messages -- Send a test message to the topic specified in
kafka.input.topic.namefrom the console and check whether the message appears in the demo's log output. See Send a message.
What's next
Comparison among endpoints -- understand the differences between SSL and default endpoints
Spring Cloud Stream reference documentation -- explore advanced binder and binding configuration options