This topic describes how to use the exactly-once delivery semantics of Message Queue for Apache RocketMQ to send and subscribe to messages, to ensure that the final message processing result is written to the database only once.

Background information

Notice The use of the exactly-once delivery semantics is supported only in the Java SDK. For SDK download, see Release notes.

The exactly-once delivery semantics of Message Queue for Apache RocketMQ applies to the following process: subscribe to a message > process the message > persistently store the result in database. It ensures that the final consumption result of each message is written to your database only once, keeping message consumption idempotent.

For more information about the exactly-once delivery semantics and typical scenarios, see Exactly-once delivery semantics.

Procedure

To use the exactly-once delivery semantics, follow these steps:
  1. Add dependencies of the SDK package and Spring 3.0 or later to the application. For more information, see Step 1: Add dependencies.
  2. Create a table named transaction_record in the database that stores message consumption results. For more information, see Step 2: Create a consumption transaction table.
    Notice The database system that stores message consumption results must support local transactions.
  3. On the message producer, set the PropertyKeyConst.EXACTLYONCE_DELIVERY attribute to enable the exactly-once delivery semantics. For more information, see Step 3: Enable the exactly-once delivery semantics on the producer.
  4. Create ExactlyOnceConsumer on the consumer instance and enable exactly-once consumption. For more information, see Step 4: Enable the exactly-once delivery semantics on the consumer.

Step 1: Add dependencies

The exactly-once consumer feature of Message Queue for Apache RocketMQ is released in SDK ons-client-ext-1.8.4.Final. To use the exactly-once delivery semantics, add the dependency of this SDK to the application.

In addition, the exactly-once consumer depends on Spring to enable exactly-once consumption by using the annotation @MQTransaction. Therefore, you must also add the dependency of Spring 3.0 or later to the application.

Add the dependencies as follows:

        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client-ext</artifactId>
            <version>1.8.4.Final</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring-version}</version>
        </dependency>            

Step 2: Create a consumption transaction table

To use the exactly-once delivery semantics of Message Queue for Apache RocketMQ, create a table named transaction_record in the database that persistently stores business processing results. Ensure that this table is in the same database as the table that stores business processing results, and that the database supports local transactions.

The exactly-once delivery semantics of Message Queue for Apache RocketMQ supports access to ApsaraDB RDS for MySQL (MySQL) and ApsaraDB RDS for SQL Server (SQL Server) data sources. The statements for creating the transaction_record table for the two types of data sources are as follows.

  • MySQL
    CREATE TABLE `transaction_record` (
      `consumer_group` varchar(128) NOT NULL DEFAULT '',
      `message_id` varchar(255) NOT NULL DEFAULT '',
      `topic_name` varchar(255) NOT NULL DEFAULT '',
      `ctime` bigint(20) NOT NULL,
      `queue_id` int(11) NOT NULL,
      `offset` bigint(20) NOT NULL,
      `broker_name` varchar(255) NOT NULL DEFAULT '',
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      PRIMARY KEY (`id`),
      UNIQUE KEY `message_id_unique_key` (`message_id`),
      KEY `ctime_key` (`ctime`),
      KEY `load_key` (`queue_id`,`broker_name`,`topic_name`,`ctime`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;           
  • SQL Server
    CREATE TABLE transaction_record
    (
        [consumer_group] varchar(128) NOT NULL ,
        [message_id] varchar(255) NOT NULL ,
        [topic_name] varchar(255) NOT NULL ,
        [ctime] bigint NOT NULL ,
        [queue_id] int NOT NULL ,
        [offset] bigint NOT NULL ,
        [broker_name] varchar(50) NOT NULL ,
        [id] bigint IDENTITY(1,1)  PRIMARY KEY
    )
    CREATE NONCLUSTERED  INDEX load_key ON transaction_record 
    (queue_id, broker_name, topic_name, ctime);
    CREATE UNIQUE NONCLUSTERED INDEX message_id_uniq_key ON transaction_record 
    (message_id);
    CREATE NONCLUSTERED INDEX ctime_key ON transaction_record 
    (ctime);            
Notice
  • If you are using SQL Server Enterprise Edition, we recommend that you run ALTER DATABASE [USERDB] SET PARTNER SAFETY OFF; to enable the asynchronous mode, which improves the database read and write performance.
  • You can also enable the delayed durability feature for the SQL Server database by running ALTER DATABASE [USERDB] SET DELAYED_DURABILITY=FORCED. This feature reduces the IOPS of the database.

Step 3: Enable the exactly-once delivery semantics on the producer

On the producer, set PropertyKeyConst.EXACTLYONCE_DELIVERY to true to enable the exactly-once delivery semantics. The sample code is as follows:

/**
 * Start TestExactlyOnceProducer.
 * Set PropertyKeyConst.EXACTLYONCE_DELIVERY to true to enable the exactly-once delivery semantics.
 */

public class TestExactlyOnceProducer {
    public static void main(String[] args) {
        Properties producerProperties = new Properties();
        producerProperties.setProperty(PropertyKeyConst.GROUP_ID, "{GROUP_ID}");
        producerProperties.setProperty(PropertyKeyConst.AccessKey, "{accessKey}");
        producerProperties.setProperty(PropertyKeyConst.SecretKey, "{secretKey}");
        producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, "{NAMESRV_ADDR}");
        producerProperties.setProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY, "true");
        Producer producer = ExactlyOnceONSFactory.createProducer(producerProperties);
        producer.start();
        System.out.println("Producer Started");

        for (int i = 0; i < 10; i++) {
            Message message = new Message("{topic}", "{tag}", "mq send transaction message test".getBytes());
            try {
                SendResult sendResult = producer.send(message);
                assert sendResult ! = null;
                System.out.println(new Date() + " Send mq message success! msgId is: " + sendResult.getMessageId());
            } catch (ONSClientException e) {
                System.out.println("Sending failure");
                // This exception indicates a message sending failure. To prevent message loss, we recommend that you cache the message and try again.
            }
        }
        producer.shutdown();
    }
}            

Step 4: Enable the exactly-once delivery semantics on the consumer

When the exactly-once delivery semantics of Message Queue for Apache RocketMQ is used for consumption, you need to create an exactly-once consumer on the consumer client by using ExactlyOnceONSFactory to call createExactlyOnceConsumer. The exactly-once consumer then implements exactly-once consumption.

For an exactly-once consumer, pay attention to the following points:

  • When creating an exactly-once consumer, set PropertyKeyConst.EXACTLYONCE_DELIVERY to enable or disable the exactly-once delivery semantics. The exactly-once delivery semantics is enabled for an exactly-once consumer by default.
  • When an exactly-once consumer is used for consumption, your business processing logic needs to use MQDataSource in the consume method of MessageListener to read and write data in the database.

Use any of the following methods to enable the exactly-once delivery semantics on the consumer:

  • Enable the exactly-once delivery semantics without using Spring.

    An example is as follows:

    /**
     * Start the exactly-once consumer.
     * Set PropertyKeyConst.EXACTLYONCE_DELIVERY to true to enable the exactly-once delivery semantics.
     */
     public class TestExactlyOnceConsumer {
        private ExactlyOnceConsumer consumer;
        private TxMessageListener listener;
    
        public TestExactlyOnceConsumer() {
            Properties consumerProperties = new Properties();
            consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, "{GID}");
            consumerProperties.setProperty(PropertyKeyConst.AccessKey, "{accessKey}");
            consumerProperties.setProperty(PropertyKeyConst.SecretKey, "{secretKey}");
            consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, "{NAMESRV_ADDR}");
            this.consumer = ExactlyOnceONSFactory.createExactlyOnceConsumer(consumerProperties);
            this.consumer.subscribe("{topic}", "", new TestExactlyOnceListener());
            consumer.start();
            System.out.println("Consumer start success.") ;
        }
    }
    
    /**
     * SimpleListener is an example of the use of exactly-once delivery consumer for message consumption.
     * It completes a simple process of recording messages in the database, and ensures that each message is persistently stored in the database only once.
     */
    public class SimpleListener implements MessageListener {
        private MQDataSource dataSource;
    
        public SimpleListener() {
            this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}");
        }
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            Connection connection = null;
            PreparedStatement statement = null;
            try {
                /**
                 * The consumed messages are processed for business accounting, and the processing results are persistently stored in the database system by using MQDataSource.
                 * This example demonstrates how messages are consumed and recorded in the database system. The actual business processing process is as follows: subscribe to a message > process the message > persistently store the result.
                 * The exactly-once delivery semantics ensures that each message is persistently stored only once.
                 */
                connection = dataSource.getConnection();
                statement = connection.prepareStatement("INSERT INTO app(msg, ctime) VALUES(?, ?)");
                statement.setString(1, new String(message.getBody()));
                statement.setLong(2, System.currentTimeMillis());
                statement.execute();
                System.out.println("consume message success");
                return Action.CommitMessage;
            } catch (Throwable e) {
                System.out.println("consume message fail:" + e.getMessage());
                return Action.ReconsumeLater;
            } finally {
                if (statement ! = null) {
                    try {
                        statement.close();
                    } catch (Exception e) {
                    }
                }
                if (connection ! = null) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                    }
                }
            }
        }
    }            
  • Create transactions under MessageListener for database operations and message consumption

    An example is as follows:

    /**
     * Implement TestExactlyOnceListener
     * Multiple business tables are updated in one transaction, and each operation in the transaction takes effect only once.
     */
    public class SimpleTxListener implements MessageListener {
        private MQDataSource dataSource;
    
        public SimpleTxListener() {
            this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}");
        }
    
            @Override
        public Action consume(Message message, ConsumeContext context) {
            Connection connection = null;
            Statement statement = null;
            try {
                /**
                 * The consumed messages are processed for business accounting, and the processing results are persistently stored in the database system by using MQDataSource.
                 * This example demonstrates an update of multiple tables in one transaction. The use of exactly-once delivery semantics ensures that each operation is performed only once in the transaction.
                 * The business processing logic is designed based on the following process: subscribe to a message > process the message > persistently store the result.
                 */
                connection = dataSource.getConnection();
                connection.setAutoCommit(false);
                String insertSql = String.format("INSERT INTO app(msg, message_id, ctime) VALUES(\"%s\", \"%s\", %d)",
                    new String(message.getBody()), message.getMsgID(), System.currentTimeMillis());
                String updateSql = String.format("UPDATE consume_count SET cnt = count + 1 WHERE consumer_group = \"%s\"", "GID_TEST");
                statement = connection.createStatement();
                statement.execute(insertSql);
                statement.execute(updateSql);
                connection.commit();
                System.out.println("consume message :" + message.getMsgID());
                return Action.CommitMessage;
            } catch (Throwable e) {
                try {
                    connection.rollback();
                } catch (Exception e1) {
                }
                System.out.println("consume message fail");
                return Action.ReconsumeLater;
            } finally {
                if (statement ! = null) {
                    try {
                        statement.close();
                    } catch (Exception e) { }
                }
                if (connection ! = null) {
                    try {
                        connection.close();
                    } catch (Exception e) { }
                }
            }
        }                                                      
    }            
  • Add a Spring Boot annotation in MessageListener to enable the exactly-once delivery semantics
    1. Implement MessageListener.
      /**
       * Add an annotation in Message Listener to enable the exactly-once delivery semantics.
       * You only need to add @MQTransaction to the consume method of MessageListener.
       * This method is applicable to the Spring Boot microservice.
       */
      
      public class TestMessageListener implements MessageListener {
          private final static String INSERTSQLFORMAT = "INSERT INTO app(message_id, ctime) VALUES(\"%s\", %d)";
          private MQDataSource dataSource;
      
          @Override
          @MQTransaction
          public Action consume(Message message, ConsumeContext context) {
      
              Connection connection = null;
              Statement statement = null;
              try {
                  String insertSql = String.format(INSERTSQLFORMAT, message.getMsgID(), System.currentTimeMillis());
                  connection = this.dataSource.getConnection();
                  statement = connection.createStatement();
                  statement.execute(insertSql);
                  return Action.CommitMessage;
              } catch (Throwable e) {
                  return Action.ReconsumeLater;
              } finally {
                  if (statement ! = null) {
                      try {
                          statement.close();
                      } catch(Exception e) { }
                  }
                  if (connection ! = null) {
                      try {
                          connection.close();
                      } catch (Exception e) { }
                  }
              }
          }
      
          public void setDataSource(MQDataSource dataSource) {
              this.dataSource = dataSource;
          }
      }                    
    2. Define information such as the consumer bean in consumer.xml.
      <? xml version="1.0" encoding="UTF-8"? >
      <beans xmlns="http://www.springframework.org/schema/beans"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
      
          <bean id="mqDataSource" class="com.aliyun.openservices.ons.api.impl.rocketmq.exactlyonce.datasource.MQDataSource" init-method="init" destroy-method="close">
              <property name="url" value="{url}" />
              <property name="username" value="{user}" />
              <property name="password" value="{passwd}" />
              <property name="driverClass" value="{driver}" />
          </bean>
      
          <bean id="msgListener" class="com.aliyun.openservices.ons.api.impl.rocketmq.exactlyonce.spring.TestMessageListener">
              <property name="dataSource" ref="mqDataSource"> <! --Consumer configuration-->
              </property>
          </bean> <! --Listener configuration-->
      
          <! --When multiple consumers with the same group ID subscribe to the same topic, you can create multiple consumer beans.-->
          <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ExactlyOnceConsumerBean" init-method="start" destroy-method="shutdown">
              <property name="properties" > <! --Consumer configuration-->
                  <props>
                      <prop key="GROUP_ID">{gid}</prop> 
                      <prop key="AccessKey">{accessKey}</prop>
                      <prop key="SecretKey">{secretKey}</prop>
                      <! --Set the number of consumer threads to 50.
                      <prop key="ConsumeThreadNums">50</prop>
                      -->
                  </props>
              </property>
              <property name="subscriptionTable">
                  <map>
                      <entry value-ref="msgListener">
                          <key>
                              <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                                  <property name="topic" value="{topic}"/>
                                  <property name="expression" value="{subExpression}"/><! --The expression is actually a tag, which can be set to a specific tag such as taga||tagb||tagc, or can be set to *. * indicates the subscription to all tags. Wildcards are not supported.-->
                              </bean>
                          </key>
                      </entry>
                  </map>
              </property>
          </bean>
      </beans>                    
    3. Run the consumer that has been integrated with Spring.
      public class ConsumeWithSpring {
          public static void main(String[] args) {
              /**
               * The consumer bean is configured in consumer.xml. You can call ApplicationContext to obtain the bean or directly inject it to other classes, such as a specific controller.
               */
              ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
              System.out.println("Consumer Started");
          }                    
  • Use MyBatis in MessageListener to enable the exactly-once delivery semantics.
    1. Design the data access object (DAO) for write operations in the business database.
      package com.aliyun.openservices.tcp.example.mybatis;
      
      public interface AppDAO {
          Integer insertMessage(String msgId);
      }                    
    2. Write the INSERT statement in the mapper.xml file.
      <? xml version="1.0" encoding="UTF-8" ? >
      <! DOCTYPE mapper
              PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
              "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
          <mapper namespace="com.aliyun.openservices.tcp.example.mybatis.AppDAO">
              <insert id="insertMessage">
                  INSERT INTO app (message_id, ctime) VALUES (#{msgId}, now())
              </insert>
          </mapper>                    
    3. Call MQDataSource to define the custom DataSourceFactory.
      
      
      
      public class MQDataSourceFactoty extends DruidDataSourceFactory implements DataSourceFactory {
          protected Properties properties;
      
          @Override
          public void setProperties(Properties properties) {
              this.properties = properties;
          }
      
          @Override
          public DataSource getDataSource() {
              try {
                  DruidDataSource druidDataSource = (DruidDataSource) createDataSource(this.properties);
                  return new MQDataSource(druidDataSource);
              } catch (Exception e) {
                  System.out.println("err:" + e.getMessage());
              }
              return null;
          }
      }                    
    4. Register the data source as the MQDataSourceFactoty class in mybatis-config.xml.
      <configuration>
          <environments default="development">
              <environment id="development">
                  <transactionManager type="JDBC"/>
                  <! --Database connection configuration-->
                  <dataSource type="com.aliyun.openservices.tcp.example.mybatis.MQDataSourceFactoty">
                      <property name="driverClass" value="com.mysql.jdbc.Driver"/>
                      <property name="url" value="{url}"/>
                      <property name="username" value="{username}"/>
                      <property name="password" value="{password}"/>
      
                      <property name="initialSize" value="10" />
                      <property name="maxActive" value="20"/>
                  </dataSource>
              </environment>
          </environments>
          <mappers>
              <mapper resource="mapper.xml"/>
          </mappers>
      </configuration>                    
    5. Connect to the database from the message listener by using MyBatis to implement exactly-once consumption.
      public class TestMybatisListener implements MessageListener {
          private static SqlSessionFactory sqlSessionFactory;
      
          static {
              String resource = "mybatis-config.xml";
              Reader reader = null;
              try {
                  reader= Resources.getResourceAsReader(resource);
              } catch (IOException e) {
                  e.printStackTrace();
              }
              sqlSessionFactory = new SqlSessionFactoryBuilder().build(reader);
          }
      
          @Override
          public Action consume(Message message, ConsumeContext context) {
              long begion = System.currentTimeMillis();
                SqlSession sqlSession = null;
              try {
                  sqlSession = sqlSessionFactory.openSession();
                  AppDAO appDAO = sqlSession.getMapper(AppDAO.class);
                  appDAO.insertMessage(message.getMsgID());
                  System.out.println("consume : " + message.getMsgID());
                  sqlSession.commit();
                  return Action.CommitMessage;
              } catch (Exception e) {
                  e.printStackTrace();
                    sqlSession.rollback();
                  return Action.ReconsumeLater;
              } finally {
                  sqlSession.close();
              }
          }
      }                    

Precautions

When using an exactly-once consumer of Message Queue for Apache RocketMQ for message consumption, pay attention to the following points:

  • The consumer offset cannot be reset manually in the console. If you reset the consumer offset to a consumption time point that has passed, the exactly-once delivery semantics becomes ineffective.
  • Each insert or update operation in a database triggers an update operation. In addition, the exactly-once consumer of Message Queue for Apache RocketMQ queries and deletes data in the database periodically. These extra operations increase the IOPS of the database.