All Products
Search
Document Center

ApsaraMQ for Kafka:Use the Spring Cloud framework to send and receive messages

Last Updated:Feb 20, 2024

Spring Cloud is a framework that is used to build message-driven microservice-oriented applications. Providing microservice-related solutions such as service discovery, configuration management, message transmission, and load balancing, the framework can be used to efficiently build distributed systems and implement communications between microservices. This topic describes how to use the Spring Cloud framework to connect to ApsaraMQ for Kafka to send and receive messages.

Prerequisites

Internet environments (authentication and encryption required for message transmission)

If your client connects to your ApsaraMQ for Kafka instance over the Internet, the SASL_SSL protocol is used for authentication and encryption. In Internet environments, a client uses the Secure Sockets Layer (SSL) endpoint to access an ApsaraMQ for Kafka instance. For information about endpoints, see Comparison among endpoints.

In this example, the demo package is uploaded to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory.

  1. Log on to the Linux system and run the following command to go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory of the demo package:

    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. Run the following command to go to the directory of the configuration file:

    cd sasl-ssl/src/main/resources/
  3. Run the following command to open the application.properties configuration file and specify the instance settings in the configuration file based on Parameters:

    vi application.properties
    ## Configure the following parameters based on your instance settings: 
    kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
    kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks
    
    ### Configure binding parameters to bind the ApsaraMQ for Kafka instance to Spring Cloud Stream Binder. Retain the default settings for the following parameters: 
    spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name}
    spring.cloud.stream.bindings.MyOutput.contentType=text/plain
    spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group}
    spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name}
    spring.cloud.stream.bindings.MyInput.contentType=text/plain
    
    ### Binder is the encapsulation module of Spring Cloud for messaging middleware. Retain the default settings for the following parameters: 
    spring.cloud.stream.kafka.binder.autoCreateTopics=false
    spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers}
    spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
    spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location}
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient
    ### If the following parameter is not included in the demo, manually add the parameter. This parameter specifies whether to enable server hostname verification. You can set this parameter to an empty string to disable server hostname verification because Simple Authentication and Security Layer (SASL) is used for identity verification. 
    ### Server hostname verification is to verify whether the hostname in the root SSL certificate matches the hostname of the server. The default value of this parameter is HTTPS. 
    spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
    Table 1. Parameters

    Parameter

    Description

    kafka.bootstrap-servers

    The endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    kafka.consumer.group

    The consumer group that subscribes to messages. You can create the consumer group on the Groups page in the ApsaraMQ for Kafka console. For more information, see Step 3: Create resources.

    kafka.output.topic.name

    The topic for outbound messages. The console program uses this topic to send messages at regular intervals. The content of each message is fixed. You can create the topic on the Topics page in the ApsaraMQ for Kafka console. For more information, see Step 3: Create resources.

    kafka.input.topic.name

    The topic for inbound messages. You can use this topic to send messages in the console. The demo program consumes the messages and displays the messages in logs.

    kafka.ssl.truststore.location

    The storage path of the root SSL certificate kafka.client.truststore.jks.

  4. Run the following command to open the kafka_client_jaas.conf file and specify the username and password of the SASL user for the instance:

    vi kafka_client_jaas.conf
    Note
    • If the access control list (ACL) feature is disabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the default SASL user for the instance on the Instance Details page in the ApsaraMQ for Kafka console.

    • If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user that you use is of the PLAIN type and that the user is granted permissions to send and receive messages. For more information, see Grant permissions to SASL users.

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="XXX"
      password="XXX";
    };
  5. Go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl directory and run the following command to run the demo:

    sh run_demo.sh

    If the following information is returned, the demo program receives the messages sent from the console program by using the topic specified in kafka.output.topic.name.

    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  6. Log on to the ApsaraMQ for Kafka console to verify whether messages are sent and received.

    • Check whether the topic specified in kafka.output.topic.name received the messages sent from the console program. For more information, see Query messages.

    • Send a message by using the topic specified in kafka.input.topic.name and check whether the message is displayed in the log of the demo program. For more information, see Send a message.

VPC environments (authentication and encryption not required for message transmission)

If a client connects to an ApsaraMQ for Kafka instance in a virtual private cloud (VPC), the PLAINTEXT protocol is used to transmit messages, and authentication and encryption are not required. In VPC environments, a client uses the default endpoint to access an ApsaraMQ for Kafka instance. For information about endpoints, see Comparison among endpoints.

In this example, the demo package is uploaded to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory.

  1. Log on to the Linux system and run the following command to go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory of the demo package:

    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. Run the following command to go to the directory of the configuration file:

    cd vpc/src/main/resources/
  3. Run the following command to open the application.properties configuration file and specify the instance settings in the configuration file based on Parameters:

    vi application.properties
    ### Configure the following parameters based on your instance settings: 
    kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
  4. Go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc directory and run the following command to run the demo:

    sh run_demo.sh

    The following information is returned:

    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  5. Log on to the ApsaraMQ for Kafka console to verify whether messages are sent and received.

    • Check whether the topic specified in kafka.output.topic.name received the messages sent from the console program. For more information, see Query messages.

    • Send a message by using the topic specified in kafka.input.topic.name and check whether the message is displayed in the log of the demo program. For more information, see Send a message.

References

For more information about the Spring Cloud framework, see Spring Cloud Stream Reference Documentation.