This topic describes how to use Message Queue for Apache RocketMQ to send and subscribe to messages in the Spring framework.

Background information

The following sections describe how to:
  • Integrate a normal message producer with Spring
  • Integrate a transactional message producer with Spring
  • Integrate a message consumer with Spring

Maintain consistent subscriptions for all consumer instances with the same group ID. For more information, see Subscription consistency.

Parameters supported by the Spring framework are the same as those of TCP Java SDK. For more information, see Parameters of Java SDK.

For more information about ons-client, see Release notes of Java SDK.

Integrate a producer with Spring

  1. Define information such as the producer bean in producer.xml.
        <? xml version="1.0" encoding="UTF-8"? >
        <beans xmlns="http://www.springframework.org/schema/beans"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
            <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
            <! --The Spring framework supports all the configuration items that are supported by the Java SDK.-->
                <property name="properties" > <! --Producer configuration-->
                    <props>
                        <prop key="AccessKey">XXX</prop>
                        <prop key="SecretKey">XXX</prop>
                        <! --The ons-client version is 1.8.4.Final (see Java SDK release notes at the top of this page), which needs to be configured (copy the TCP endpoint from the Instance Details page).
                        <prop key="NAMESRV_ADDR">XXX</prop>
                        -->
                    </props>
                </property>
            </bean>
    
        </beans>
  2. Produce messages by using the producer that has been integrated with Spring.
        package demo;
    
        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.exception.ONSClientException;
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.support.ClassPathXmlApplicationContext;
    
        public class ProduceWithSpring {
            public static void main(String[] args) {
                /**
                 * The producer bean is configured in producer.xml. You can call ApplicationContext to obtain the bean or directly inject it to other classes, such as a specific controller.
                 */
                ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
    
                Producer producer = (Producer) context.getBean("producer");
                // Send messages cyclically.
                for (int i = 0; i < 100; i++) {
                    Message msg = new Message( //
                            // The topic of the message.
                            "TopicTestMQ",
                            // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on the specified criteria.
                            "TagA",
                            // The message body in any binary format. Message Queue for Apache RocketMQ does not process the message body.
                            // The producer and consumer must negotiate the consistent serialization and deserialization methods.
                            "Hello MQ".getBytes());
                    // The message key, which must be globally unique.
                    // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
                    // Note: Messages can still be sent and received even if this attribute is not set.
                    msg.setKey("ORDERID_100");
                    // The message sending result, which is successful if no exception occurs.
                    try {
                        SendResult sendResult = producer.send(msg);
                        assert sendResult ! = null;
                        System.out.println("send success: " + sendResult.getMessageId());
                    }catch (ONSClientException e) {
                        System.out.println("Sending failure");
                    }
    
                }
            }
        }

Integrate a transactional message producer with Spring

For more information about transactional messages, see Send and subscribe to transactional messages.

  1. Implement LocalTransactionChecker. A message producer can have only one LocalTransactionChecker.
        package demo;
    
        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
        import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    
        public class DemoLocalTransactionChecker implements LocalTransactionChecker {
            public TransactionStatus check(Message msg) {
                System.out.println("Begin to check the local transaction status");
                return TransactionStatus.CommitTransaction; //Return different TransactionStatus values based on the result of the local transaction status check.
            }
        }
  2. Define information such as the producer bean in transactionProducer.xml.
        <? xml version="1.0" encoding="UTF-8"? >
        <beans xmlns="http://www.springframework.org/schema/beans"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
            <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
    
            <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
                <property name="properties" > <! --Transactional message producer configuration-->
                    <props>
                        <prop key="AccessKey">AKDEMO</prop>
                        <prop key="SecretKey">SKDEMO</prop>
                        <prop key="GROUP_ID">GID_DEMO</prop>
                        <! --The ons-client version is 1.8.4.Final (see Java SDK release notes at the top of this page), which needs to be configured (copy the TCP endpoint from the Instance Details page).
                        <prop key="NAMESRV_ADDR">XXX</prop>
                        -->
                    </props>
                </property>
                <property name="localTransactionChecker" ref="localTransactionChecker"></property>
            </bean>
    
        </beans>
  3. Produce transactional messages by using the producer that has been integrated with Spring.
        package demo;
    
        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
        import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
        import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.support.ClassPathXmlApplicationContext;
    
        public class ProduceTransMsgWithSpring {
    
            public static void main(String[] args) {
                /**
                 * The transactional message producer bean is configured in transactionProducer.xml. You can call ApplicationContext to obtain the bean or directly inject it to other classes, such as a specific controller.
                 * See the example of sending transactional messages.
                 */
                ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
    
                TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
    
                Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
    
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        System.out.println("Execute a local transaction");
                        return TransactionStatus.CommitTransaction; //Return different TransactionStatus values based on the local transaction execution result.
                    }
                }, null);
            }
        }

Integrate a consumer with Spring

  1. Implement MessageListener.
        package demo;
    
        import com.aliyun.openservices.ons.api.Action;
        import com.aliyun.openservices.ons.api.ConsumeContext;
        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.MessageListener;
    
        public class DemoMessageListener implements MessageListener {
    
            public Action consume(Message message, ConsumeContext context) {
    
                System.out.println("Receive: " + message.getMsgID());
                try {
                    //do something..
                    return Action.CommitMessage;
                }catch (Exception e) {
                    // Consumption failed.
                    return Action.ReconsumeLater;
                }
            }
        }
  2. Define information such as the consumer bean in consumer.xml.
        <? xml version="1.0" encoding="UTF-8"? >
        <beans xmlns="http://www.springframework.org/schema/beans"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
            <bean id="msgListener" class="demo.DemoMessageListener"></bean> <! --Listener configuration-->
        <! --When multiple consumers with the same group ID subscribe to the same topic, you can create multiple consumer beans.-->
            <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
                <property name="properties" > <! --Consumer configuration-->
                    <props>
                        <prop key="GROUP_ID">GID_DEMO</prop> <! --Replace XXX with the actual value-->
                        <prop key="AccessKey">AKDEMO</prop>
                        <prop key="SecretKey">SKDEMO</prop>
                        <! --The ons-client version is 1.8.4.Final (see Java SDK release notes at the top of this page), which needs to be configured (copy the TCP endpoint from the Instance Details page).
                        <prop key="NAMESRV_ADDR">XXX</prop>
                        -->
                        <! --Set the number of consumer threads to 50.
                        <prop key="ConsumeThreadNums">50</prop>
                        -->
                    </props>
                </property>
                <property name="subscriptionTable">
                    <map>
                        <entry value-ref="msgListener">
                            <key>
                                <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                                    <property name="topic" value="TopicTestMQ"/>
                                    <property name="expression" value="*"/><! --The expression is actually a tag, which can be set to a specific tag such as taga||tagb||tagc, or can be set to *. * indicates the subscription to all tags. Wildcards are not supported.-->
                                </bean>
                            </key>
                        </entry>
                        <! --Add entry nodes to subscribe to more tags.-->
                        <entry value-ref="msgListener">
                            <key>
                                <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                                    <property name="topic" value="TopicTestMQ-Other"/> <! --Subscribe to another topic.-->
                                    <property name="expression" value="taga||tagb"/> <! --Subscribe to multiple tags.-->
                                </bean>
                            </key>
                        </entry>
                    </map>
                </property>
            </bean>
    
        </beans>
  3. Run the consumer that has been integrated with Spring.
        package demo;
    
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.support.ClassPathXmlApplicationContext;
    
        public class ConsumeWithSpring {
            public static void main(String[] args) {
    
                /**
                 * The consumer bean is configured in consumer.xml. You can call ApplicationContext to obtain the bean or directly inject it to other classes, such as a specific controller.
                 */
                ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
                System.out.println("Consumer Started");
            }
        }