edit-icon download-icon

Spring integration

Last Updated: Jun 19, 2018

This topic introduces how to send and receive messages using MQ in Spring framework. This section mainly include 3 parts: integration of normal message producer and Spring, integration of transactional message producer and Spring, and integration of message consumer and Spring.

  • Make sure the subscriptions of all consumer instances under the same consumer ID are consistent. For detailed information, see Subscription consistency.

Configuration parameters supported in Spring framework are the same with those in TCP Java. For detailed information, see Java SDK introduction.

Integration of Producer and Spring

  1. Define information such as producer Bean in producer.xml.

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    5. <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
    6. <!-- All configuration items supported in Java SDK are also supported in Spring method. -->
    7. <property name="properties" > <!--Producer configuration-->
    8. <props>
    9. <prop key="ProducerId">PID_DEMO</prop> <!--Please replace XXX-->
    10. <prop key="AccessKey">XXX</prop>
    11. <prop key="SecretKey">XXX</prop>
    12. </props>
    13. </property>
    14. </bean>
    15. </beans>
  2. Produce messages through the producer that has been integrated with Spring.

    1. package demo;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.Producer;
    4. import com.aliyun.openservices.ons.api.SendResult;
    5. import com.aliyun.openservices.ons.api.exception.ONSClientException;
    6. import org.springframework.context.ApplicationContext;
    7. import org.springframework.context.support.ClassPathXmlApplicationContext;
    8. public class ProduceWithSpring {
    9. public static void main(String[] args) {
    10. /**
    11. * The producer Bean is configured in producer.xml, which can be obtained through ApplicationContext or directly injected to other classes (such as specific controller).
    12. */
    13. ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
    14. Producer producer = (Producer) context.getBean("producer");
    15. //Send messages in loop
    16. for (int i = 0; i < 100; i++) {
    17. Message msg = new Message( //
    18. // The topic of the message
    19. "TopicTestMQ",
    20. //Message tag, which is similar to tag in Gmail, and is used to classify messages. Consumers can then set filtering conditions for messages to be filtered in MQ broker.
    21. "TagA",
    22. // Message body, which can be any data in binary format.
    23. // Serialization and deserialization methods need to be negotiated and remain consistent between the producer and the consumer.
    24. "Hello MQ".getBytes());
    25. // The setting represents the key business property of the message, so please keep it globally unique.
    26. // You can query a message and resend it through the MQ console when you cannot receive the message properly.
    27. // Note: Message sending and receiving is not affected if you do not configure this setting.
    28. msg.setKey("ORDERID_100");
    29. // Synchronous message sending will succeed as long as no exception is thrown.
    30. try {
    31. SendResult sendResult = producer.send(msg);
    32. assert sendResult != null;
    33. System.out.println("send success: " + sendResult.getMessageId());
    34. }catch (ONSClientException e) {
    35. System.out.println(“Message sending fails");
    36. }
    37. }
    38. }
    39. }

Integration of Transactional Message Producer and Spring

For the concept of transactional messages, see Send and receive transactional messages.

  1. Implement LocalTransactionChecker, as shown below. A message producer can have only one LocalTransactionChecker.

    1. package demo;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
    4. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    5. public class DemoLocalTransactionChecker implements LocalTransactionChecker {
    6. public TransactionStatus check(Message msg) {
    7. System.out.println("Start to check status of the local transaction");
    8. return TransactionStatus.CommitTransaction; //Return different transaction status according to the check result
    9. }
    10. }
  2. Define information such as transactional message producer Bean in transactionProducer.xml.

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    5. <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
    6. <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
    7. <property name="properties" > <!--Transactional message producer configuration-->
    8. <props>
    9. <prop key="ProducerId">PID_DEMO</prop> <!--Please replace XXX-->
    10. <prop key="AccessKey">AKDEMO</prop>
    11. <prop key="SecretKey">SKDEMO</prop>
    12. </props>
    13. </property>
    14. <property name="localTransactionChecker" ref="localTransactionChecker"></property>
    15. </bean>
    16. </beans>
  3. Produce transactional messages through the producer that has been integrated with Spring.

    1. package demo;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.SendResult;
    4. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    5. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    6. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    7. import org.springframework.context.ApplicationContext;
    8. import org.springframework.context.support.ClassPathXmlApplicationContext;
    9. public class ProduceTransMsgWithSpring {
    10. public static void main(String[] args) {
    11. /**
    12. * The transactional message producer Bean is configured in transactionProducer.xml, which can be obtained through ApplicationContext or directly injected to other classes (such as specific controller).
    13. * Refer to the example "Send Transactional Messages"
    14. */
    15. ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
    16. TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
    17. Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
    18. SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
    19. @Override
    20. public TransactionStatus execute(Message msg, Object arg) {
    21. System.out.println("Execute local transaction");
    22. return TransactionStatus.CommitTransaction; //Return different transaction status according to the execution result of local transaction
    23. }
    24. }, null);
    25. }
    26. }

Integration of Consumer and Spring

  1. Create MessageListener, as shown below.

    1. package demo;
    2. import com.aliyun.openservices.ons.api.Action;
    3. import com.aliyun.openservices.ons.api.ConsumeContext;
    4. import com.aliyun.openservices.ons.api.Message;
    5. import com.aliyun.openservices.ons.api.MessageListener;
    6. public class DemoMessageListener implements MessageListener {
    7. public Action consume(Message message, ConsumeContext context) {
    8. System.out.println("Receive: " + message.getMsgID());
    9. try {
    10. //do something..
    11. return Action.CommitMessage;
    12. }catch (Exception e) {
    13. //Consumption fails
    14. return Action.ReconsumeLater;
    15. }
    16. }
    17. }
  2. Define information such as consumer Bean in consumer.xml.

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    5. <bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener configuration-->
    6. <!-- You can create multiple ConsumerBeans for multiple CIDs to subscribe to the same topic-->
    7. <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
    8. <property name="properties" > <!--Consumer configuration-->
    9. <props>
    10. <prop key="ConsumerId">CID_DEMO</prop> <!--Please replace XXX-->
    11. <prop key="AccessKey">AKDEMO</prop>
    12. <prop key="SecretKey">SKDEMO</prop>
    13. <!--Fix the number of consumer threads at 50
    14. <prop key="ConsumeThreadNums">50</prop>
    15. -->
    16. </props>
    17. </property>
    18. <property name="subscriptionTable">
    19. <map>
    20. <entry value-ref="msgListener">
    21. <key>
    22. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
    23. <property name="Topic" value="TopicTestMQ"/>
    24. <property name="expression" value="*"/><!--”expression” is the tag, which can be set to a specific tag, such as taga||tagb||tagc, or can be set to *. * means subscribing to all tags, and wildcards are not supported-->
    25. </bean>
    26. </key>
    27. </entry>
    28. <!--For more subscriptions, you can add entry nodes, as shown below-->
    29. <entry value-ref="msgListener">
    30. <key>
    31. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
    32. <property name="Topic" value="TopicTestMQ-Other"/> <!--subscribe to another topic -->
    33. <property name="expression" value="Taga||Tagb"/> <!-- subscribe to multiple tags -->
    34. </bean>
    35. </key>
    36. </entry>
    37. </map>
    38. </property>
    39. </bean>
    40. </beans>
  3. Run the consumer that has been integrated with Spring, as shown below.

    1. package demo;
    2. import org.springframework.context.ApplicationContext;
    3. import org.springframework.context.support.ClassPathXmlApplicationContext;
    4. public class ConsumeWithSpring {
    5. public static void main(String[] args) {
    6. /**
    7. * The consumer Bean is configured in consumer.xml, which can be obtained through ApplicationContext or directly injected to other classes (such as specific controller).
    8. */
    9. ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    10. System.out.println("Consumer Started");
    11. }
    12. }
Thank you! We've received your feedback.