All Products
Search
Document Center

ApsaraMQ for Kafka:Send and receive messages with Spring Cloud Stream

Last Updated:Mar 11, 2026

Spring Cloud Stream provides a binder abstraction that decouples your application code from the underlying message broker. This topic walks you through connecting a Spring Cloud Stream application to an ApsaraMQ for Kafka instance to produce and consume messages.

The demo project includes two preconfigured profiles:

ProfileProtocolPortAccess method
SASL_SSLSASL_SSL9093Internet (authentication and TLS encryption)
PLAINTEXTPLAINTEXT9092VPC (no authentication or encryption)

Prerequisites

Before you begin, make sure you have:

Step 1: Configure the application

Navigate to the demo directory:

cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo

Then open the application.properties file in the profile directory that matches your access method:

Access methodProfile directory
Internet (SASL_SSL)sasl-ssl/src/main/resources/
VPC (PLAINTEXT)vpc/src/main/resources/

Internet access (SASL_SSL)

In Internet environments, a client uses the SSL endpoint to access the ApsaraMQ for Kafka instance. For information about endpoints, see Comparison among endpoints.

Open the configuration file:

cd sasl-ssl/src/main/resources/
vi application.properties

Update the following properties with your instance values:

## Instance-specific settings (replace with your values)
kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
kafka.consumer.group=test-spring
kafka.output.topic.name=test-output
kafka.input.topic.name=test-input
kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks

Keep the default Spring Cloud Stream binder settings unless you have specific requirements:

## Spring Cloud Stream bindings (retain defaults)
spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name}
spring.cloud.stream.bindings.MyOutput.contentType=text/plain
spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group}
spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name}
spring.cloud.stream.bindings.MyInput.contentType=text/plain

## Kafka binder configuration (retain defaults)
spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers}
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location}
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient

## Disable server hostname verification because SASL handles authentication.
## If this line is missing from the demo, add it manually.
spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=

Configure SASL credentials

Open the kafka_client_jaas.conf file in the same directory and set the username and password:

vi kafka_client_jaas.conf
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<your-username>"
  password="<your-password>";
};

Replace the placeholders with your actual values:

PlaceholderDescriptionWhere to find it
<your-username>SASL usernameInstance Details page in the ApsaraMQ for Kafka console
<your-password>SASL passwordInstance Details page in the ApsaraMQ for Kafka console
Note

If the Access Control List (ACL) feature is disabled on your instance, get the default SASL username and password from the Instance Details page in the ApsaraMQ for Kafka console. If the ACL feature is enabled, the SASL user must be of the PLAIN type and have send and receive permissions. See Grant permissions to SASL users.

VPC access (PLAINTEXT)

In VPC environments, a client uses the default endpoint to access the ApsaraMQ for Kafka instance. For information about endpoints, see Comparison among endpoints.

Open the configuration file:

cd vpc/src/main/resources/
vi application.properties

Update the following properties with your instance values:

## Instance-specific settings (replace with your values)
kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
kafka.consumer.group=test-spring
kafka.output.topic.name=test-output
kafka.input.topic.name=test-input

No additional security configuration is needed for VPC access.

Parameter reference

ParameterDescription
kafka.bootstrap-serversSSL endpoint (Internet) or default endpoint (VPC) of your ApsaraMQ for Kafka instance. Find this in the Endpoint Information section on the Instance Details page in the ApsaraMQ for Kafka console.
kafka.consumer.groupConsumer group that subscribes to messages. Create one on the Groups page in the ApsaraMQ for Kafka console. See Step 3: Create resources.
kafka.output.topic.nameTopic for outbound messages. The demo sends fixed-content messages to this topic at regular intervals. Create one on the Topics page in the ApsaraMQ for Kafka console. See Step 3: Create resources.
kafka.input.topic.nameTopic for inbound messages. Send a message to this topic from the console, and the demo consumes and logs it.
kafka.ssl.truststore.locationPath to the SSL root certificate kafka.client.truststore.jks. Internet access only.

Step 2: Run the demo

Navigate to the profile directory that matches your access method, then run the demo script:

Internet access (SASL_SSL)

cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl
sh run_demo.sh

VPC access (PLAINTEXT)

cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc
sh run_demo.sh

On success, output similar to the following appears:

Send: hello world !!
Send: hello world !!
Send: hello world !!
Send: hello world !!

This confirms the demo is producing messages to the topic specified in kafka.output.topic.name.

Step 3: Verify message delivery

  1. Verify produced messages -- Log in to the ApsaraMQ for Kafka console and check whether the topic specified in kafka.output.topic.name received the messages sent by the demo. See Query messages.

  2. Verify consumed messages -- Send a test message to the topic specified in kafka.input.topic.name from the console and check whether the message appears in the demo's log output. See Send a message.

What's next