This topic shows you how to use the Spring Cloud framework to access Message Queue for Apache Kafka and send and subscribe to messages.

Background information

Spring Cloud is a framework that is used to build message-driven microservice-oriented applications. For more information, see Spring Cloud Stream.

Prerequisites

Internet environment (authentication and encryption required for message transmission)

If your client connects to your Message Queue for Apache Kafka instance over the Internet, use the SASL_SSL protocol for authentication and encryption. The client connects to the Message Queue for Apache Kafka instance by using the SSL endpoint of the instance. For more information about endpoints, see Comparison among endpoints.

In this example, the demo package is uploaded to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory.

  1. Log on to the Linux system and run the following command to go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory of the demo package:
    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. Run the following command to go to the directory of the configuration file:
    cd sasl-ssl/src/main/resources/
  3. Run the following command to modify the application.properties file and configure the instance settings based on Table 1:
    vi application.properties
    ## Set the following parameters based on the information about your Message Queue for Apache Kafka instance: 
    kafka.bootstrap-servers=47.99.XX.XX:9093,118.178.XX.XX:9093,47.110.XX.XX:9093,116.62.XX.XX:9093,101.37.XX.XX:9093
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
    kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks
    
    ### Set binding parameters to bind  Message Queue for Apache Kafka  to Spring Cloud Stream Binder. Retain the default settings of the following parameters: 
    spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name}
    spring.cloud.stream.bindings.MyOutput.contentType=text/plain
    spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group}
    spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name}
    spring.cloud.stream.bindings.MyInput.contentType=text/plain
    
    ### Binder is the encapsulation module of Spring Cloud for messaging middleware. Retain the default settings of the following parameters: 
    spring.cloud.stream.kafka.binder.autoCreateTopics=false
    spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers}
    spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
    spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location}
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsCl****
    ### If the following parameter is not included in the demo, manually add the parameter. This parameter specifies whether to enable server hostname verification. You can set this parameter to an empty string to disable server hostname verification, because Simple Authentication and Security Layer (SASL) is used for identity verification. 
    ### Server hostname verification is to verify whether the hostname in the SSL certificate matches the hostname of the server. The default value of this parameter is HTTPS. 
    spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
    Table 1. Parameters
    Parameter Description
    kafka.bootstrap-servers The endpoint of the Message Queue for Apache Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    kafka.consumer.group The consumer group that subscribes to messages. You can create the consumer group on the Groups page in the Message Queue for Apache Kafka console. For more information, see Step 3: Create resources.
    kafka.output.topic.name The topic of a message. The console program uses this topic to send messages at regular intervals. The content of each message is fixed. You can create the topic on the Topics page in the Message Queue for Apache Kafka console. For more information, see Step 3: Create resources.
    kafka.input.topic.name The topic of a message. You can use this topic to send messages in the console. The demo program consumes the messages and prints the messages in logs.
    ssl.truststore.location The storage path of the root SSL certificate kafka.client.truststore.jks.
  4. Run the following command to open the kafka_client_jaas.conf file and specify the username and password of the instance:
    vi kafka_client_jaas.conf
    Note
    • If access control list (ACL) is disabled for the Message Queue for Apache Kafka instance, you can obtain the default username and password of the instance on the Instance Details page in the Message Queue for Apache Kafka console.
    • If ACL is enabled for the Message Queue for Apache Kafka instance, make sure that the SASL user to be used is of the PLAIN type and that the user is authorized to send and subscribe to messages. For more information, see Authorize SASL users.
    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="XXX"
      password="XXX";
    };
  5. Go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl directory and run the following command to run the demo:
    sh run_demo.sh
    The following information is returned. This indicates that the demo program receives the messages sent from the console program by using the topic specified in kafka.output.topic.name.
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  6. Log on to the Message Queue for Apache Kafka console to verify whether the message is sent and received.
    • Check whether the topic specified in kafka.output.topic.name received the message sent from the console program. For more information, see Query messages.
    • Send a message by using the topic specified in kafka.input.topic.name and check whether the message is printed in the log of the demo program. For more information, see Send a message.

VPC environment (authentication and encryption not required for message transmission)

If a client connects to Message Queue for Apache Kafka in a virtual private cloud (VPC), authentication and encryption are not required. Use the PLAINTEXT protocol to transmit messages. The client connects to Message Queue for Apache Kafka by using the default endpoint. For more information about endpoints, see Comparison among endpoints.

In this example, the demo package is uploaded to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory.

  1. Log on to the Linux system and run the following command to go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory of the demo package:
    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. Run the following command to go to the directory of the configuration file:
    cd vpc/src/main/resources/
  3. Run the following command to modify the application.properties file and configure the instance settings based on Table 1:
    vi application.properties
    ### Set the following parameters based on the information about your instance:
    kafka.bootstrap-servers=192.168.XX.XX:9092,192.168.XX.XX:9092,192.168.XX.XX:9092,192.168.XX.XX:9092,192.168.XX.XX:9092
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
  4. Go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc directory and run the following command to run the demo:
    sh run_demo.sh
    The following information is returned:
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  5. Log on to the Message Queue for Apache Kafka console to verify whether the message is sent and received.
    • Check whether the topic specified in kafka.output.topic.name received the message sent from the console program. For more information, see Query messages.
    • Send a message by using the topic specified in kafka.input.topic.name and check whether the message is printed in the log of the demo program. For more information, see Send a message.