All Products
Search
Document Center

Exactly-Once delivery semantics

Last Updated: Jun 28, 2019

This topic descibes how to send and receive messages through Exactly-Once delivery semantics. By doing so, the final processing result of a message is written to the database for once only.

Note: Currently, the Exactly-Once delivery semantics is only supported in Java SDK. For more information about the SDK download, refer to Release Notes.

Background Information

The Exactly-Once delivery semantics of RocketMQ applies to the following process: receive a message > process the message > persistently store the result in database. It ensures that the final consumption result of each message is written to your database only once, keeping message consumption idempotent.

Procedure

To use the Exactly-Once delivery semantics, follow these steps:

  1. Add dependencies of the SDK package and Spring 3.0 or later to the application. For more information, see Step 1: Add dependencies.

  2. Create a table named transaction_record in the database that stores message consumption results. For more information, see Step 2: Create a consumption transaction table.

Note: The database system that stores message consumption results must support local transactions.

  1. On the message producer, set the PropertyKeyConst.EXACTLYONCE_DELIVERY attribute to enable the Exactly-Once delivery semantics. For more information, see Step 3: Enable the Exactly-Once delivery semantics on the producer.

  2. Create an Exactly-Once consumer on the consumer client and enable Exactly-Once consumption. For more information, see Step 4: Enable the Exactly-Once delivery semantics on the consumer.

Step 1: Add dependencies

The Exactly-Once consumer feature of RocketMQ is released in SDK ons-client-ext-1.8.0.Final . To use the Exactly-Once delivery semantics, add the dependency of this SDK in the application.

In addition, the Exactly-Once consumer depends on Spring to enable Exactly-Once consumption by using the annotation @MQTransaction. Therefore, you must also add the dependency of Spring 3.0 or later in the application.

Add the dependencies as follows:

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>ons-client-ext</artifactId>
  4. <version>1.8.0.Final</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework</groupId>
  8. <artifactId>spring-context</artifactId>
  9. <version>${spring-version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework</groupId>
  13. <artifactId>spring-jdbc</artifactId>
  14. <version>${spring-version}</version>
  15. </dependency>

Step 2: Create a consumption transaction table

To use the Exactly-Once delivery semantics of RocketMQ, create a table named transaction_record in the database that persistently stores business processing results. Ensure that this table is in the same database as the table that stores business processing results, and that the database supports local transactions.

The Exactly-Once delivery semantics of RocketMQ support access to MySQL and SQL Server data sources. The statements for creating the transaction_record table for the two types of data sources are as follows.

  • MySQL
  1. CREATE TABLE `transaction_record` (
  2. `consumer_group` varchar(128) NOT NULL DEFAULT '',
  3. `message_id` varchar(255) NOT NULL DEFAULT '',
  4. `topic_name` varchar(255) NOT NULL DEFAULT '',
  5. `ctime` bigint(20) NOT NULL,
  6. `queue_id` int(11) NOT NULL,
  7. `offset` bigint(20) NOT NULL,
  8. `broker_name` varchar(255) NOT NULL DEFAULT '',
  9. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  10. PRIMARY KEY (`id`),
  11. UNIQUE KEY `message_id_unique_key` (`message_id`),
  12. KEY `ctime_key` (`ctime`),
  13. KEY `load_key` (`queue_id`,`broker_name`,`topic_name`,`ctime`)
  14. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • SQL Server
  1. CREATE TABLE transaction_record
  2. (
  3. [consumer_group] varchar(128) NOT NULL ,
  4. [message_id] varchar(255) NOT NULL ,
  5. [topic_name] varchar(255) NOT NULL ,
  6. [ctime] bigint NOT NULL ,
  7. [queue_id] int NOT NULL ,
  8. [offset] bigint NOT NULL ,
  9. [broker_name] varchar(50) NOT NULL ,
  10. [id] bigint IDENTITY(1,1) PRIMARY KEY
  11. )
  12. CREATE NONCLUSTERED INDEX load_key ON transaction_record
  13. (queue_id, broker_name, topic_name, ctime);
  14. CREATE UNIQUE NONCLUSTERED INDEX message_id_uniq_key ON transaction_record
  15. (message_id);
  16. CREATE NONCLUSTERED INDEX ctime_key ON transaction_record
  17. (ctime);

Note:

  • If you are using Enterprise SQL Server, we recommend that you run ALTER DATABASE [USERDB] SET PARTNER SAFETY OFF; to enable the asynchronous mode, which improves the database read and write performance.

  • You can also enable the delayed durability feature for the SQL Server database by running ALTER DATABASE [USERDB] SET DELAYED_DURABILITY=FORCED. This feature reduces the IOPS of the database.

Step 3: Enable the Exactly-Once delivery semantics on the producer

On the producer, set the PropertyKeyConst.EXACTLYONCE_DELIVERY attribute to true to enable the Exactly-Once delivery semantics. The sample code is as follows:

  1. /**
  2. * Start TestExactlyOnceProducer.
  3. * Set the PropertyKeyConst.EXACTLYONCE_DELIVERY attribute to enable the Exactly-Once delivery semantics.
  4. */
  5. public class TestExactlyOnceProducer {
  6. public static void main(String[] args) {
  7. Properties producerProperties = new Properties();
  8. producerProperties.setProperty(PropertyKeyConst.GROUP_ID, "{gid}");
  9. producerProperties.setProperty(PropertyKeyConst.AccessKey, "{accessKey}");
  10. producerProperties.setProperty(PropertyKeyConst.SecretKey, "{secretKey}");
  11. producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, "{NAMESRV_ADDR}");
  12. producerProperties.setProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY, "true");
  13. Producer producer = ExactlyOnceONSFactory.createProducer(producerProperties);
  14. producer.start();
  15. System.out.println("Producer Started");
  16. for (int i = 0; i < 10; i++) {
  17. Message message = new Message("{topic}", "{tag}", "mq send transaction message test".getBytes());
  18. try {
  19. SendResult sendResult = producer.send(message);
  20. assert sendResult ! = null;
  21. System.out.println(new Date() + " Send mq message success! msgId is: " + sendResult.getMessageId());
  22. } catch (ONSClientException e) {
  23. System.out.println("Sending failure");
  24. // This exception indicates a message sending failure. To prevent message loss, we recommend that you cache the message and try again.
  25. }
  26. }
  27. producer.shutdown();
  28. }
  29. }

Step 4: Enable the Exactly-Once delivery semantics on the consumer

When the Exactly-Once delivery semantics of RocketMQ is used for consumption, you need to create an Exactly-Once consumer on the consumer client by using ExactlyOnceONSFactory to call the createExactlyOnceConsumer operation. The Exactly-Once consumer then implements Exactly-Once consumption.

For an Exactly-Once consumer, pay attention to the following:

  • When creating an Exactly-Once consumer, set the PropertyKeyConst.EXACTLYONCE_DELIVERY attribute to enable or disable the Exactly-Once delivery semantics. Exactly-Once delivery semantics is enabled for an Exactly-Once consumer by default.

  • When an Exactly-Once consumer is used for consumption, your business processing logic needs to use MQDataSource in the consume method of the message listener to read and write data in the database.

Use any of the following methods to enable the Exactly-Once delivery semantics on the consumer:

Enable Exactly-Once delivery semantics without using Spring

Example:

  1. /**
  2. * Start the Exactly-Once consumer.
  3. * Set the PropertyKeyConst.EXACTLYONCE_DELIVERY attribute to enable the Exactly-Once delivery semantics.
  4. */
  5. public class TestExactlyOnceConsumer {
  6. private ExactlyOnceConsumer consumer;
  7. private TxMessageListener listener;
  8. public TestExactlyOnceConsumer() {
  9. Properties consumerProperties = new Properties();
  10. consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, "{gid}");
  11. consumerProperties.setProperty(PropertyKeyConst.AccessKey, "{accessKey}");
  12. consumerProperties.setProperty(PropertyKeyConst.SecretKey, "{secretKey}");
  13. consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, "{NAMESRV_ADDR}");
  14. this.consumer = ExactlyOnceONSFactory.createExactlyOnceConsumer(consumerProperties);
  15. this.consumer.subscribe("{topic}", "", new TestExactlyOnceListener());
  16. consumer.start();
  17. System.out.println("Consumer start success.") ;
  18. }
  19. }
  20. /**
  21. * SimpleListener is an example of the use of Exactly-Once delivery consumer for message consumption.
  22. * It completes a simple process of recording messages in the database, and ensures that each message is persistently stored in the database and takes effect only once.
  23. */
  24. public class SimpleListener implements MessageListener {
  25. private MQDataSource dataSource;
  26. public SimpleListener() {
  27. this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}");
  28. }
  29. @Override
  30. public Action consume(Message message, ConsumeContext context) {
  31. Connection connection = null;
  32. PreparedStatement statement = null;
  33. try {
  34. /**
  35. * The consumed messages are processed for business accounting, and the processing results are persistently stored in the database system by using MQDataSource.
  36. * This example demonstrates how messages are consumed and recorded in the database system. The actual business processing process is as follows: receive a message > process the message > persistently store the result.
  37. * Exactly-Once delivery semantics ensures that each message is persistently stored only once.
  38. */
  39. connection = dataSource.getConnection();
  40. statement = connection.prepareStatement("INSERT INTO app(msg, ctime) VALUES(?, ?") ;
  41. statement.setString(1, new String(message.getBody()));
  42. statement.setLong(2, System.currentTimeMillis());
  43. statement.execute();
  44. System.out.println("consume message success");
  45. return Action.CommitMessage;
  46. } catch (Throwable e) {
  47. System.out.println("consume message fail:" + e.getMessage());
  48. return Action.ReconsumeLater;
  49. } finally {
  50. if (statement ! = null) {
  51. try {
  52. statement.close();
  53. } catch (Exception e) {
  54. }
  55. }
  56. if (connection ! = null) {
  57. try {
  58. connection.close();
  59. } catch (Exception e) {
  60. }
  61. }
  62. }
  63. }
  64. }

Create transactions on the message listener for database operations and message consumption

Example:

  1. /**
  2. * Implement TestExactlyOnceListener
  3. * Multiple business tables are updated in one transaction, and each operation in the transaction takes effect only once.
  4. */
  5. public class SimpleTxListener implements MessageListener {
  6. private MQDataSource dataSource;
  7. public SimpleTxListener() {
  8. this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}");
  9. }
  10. @Override
  11. public Action consume(Message message, ConsumeContext context) {
  12. Connection connection = null;
  13. Statement statement = null;
  14. try {
  15. /**
  16. * The consumed messages are processed for business accounting, and the processing results are persistently stored in the database system by using MQDataSource.
  17. * This example demonstrates an update of multiple tables in one transaction. The use of Exactly-Once delivery semantics ensures that each operation is performed only once in the transaction.
  18. * The business processing logic is designed based on the following process: receive a message > process the message > persistently store the result.
  19. */
  20. connection = dataSource.getConnection();
  21. connection.setAutoCommit(false);
  22. String insertSql = String.format("INSERT INTO app(msg, message_id, ctime) VALUES(\"%s\", \"%s\", %d)",
  23. new String(message.getBody()), message.getMsgID(), System.currentTimeMillis());
  24. String updateSql = String.format("UPDATE consume_count SET cnt = count + 1 WHERE consumer_group = \"%s\"", "GID_TEST");
  25. statement = connection.createStatement();
  26. statement.execute(insertSql);
  27. statement.execute(updateSql);
  28. connection.commit();
  29. System.out.println("consume message :" + message.getMsgID());
  30. return Action.CommitMessage;
  31. } catch (Throwable e) {
  32. try {
  33. connection.rollback();
  34. } catch (Exception e1) {
  35. }
  36. System.out.println("consume message fail");
  37. return Action.ReconsumeLater;
  38. } finally {
  39. if (statement ! = null) {
  40. try {
  41. statement.close();
  42. } catch (Exception e) { }
  43. }
  44. if (connection ! = null) {
  45. try {
  46. connection.close();
  47. } catch (Exception e) { }
  48. }
  49. }
  50. }
  51. }

Use the Spring Boot annotation to enable the Exactly-Once delivery semantics on the message listener

  1. Create a message listener.
  1. /**
  2. * On the message listener, use an annotation to enable the Exactly-Once delivery semantics.
  3. * You only need to add @MQTransaction to the consume method of the message listener.
  4. * This method is applicable to the Spring Boot microservice.
  5. */
  6. public class TestMessageListener implements MessageListener {
  7. private final static String INSERTSQLFORMAT = "INSERT INTO app(message_id, ctime) VALUES(\"%s\", %d)";
  8. private MQDataSource dataSource;
  9. @Override
  10. @MQTransaction
  11. public Action consume(Message message, ConsumeContext context) {
  12. Connection connection = null;
  13. Statement statement = null;
  14. try {
  15. String insertSql = String.format(INSERTSQLFORMAT, message.getMsgID(), System.currentTimeMillis());
  16. connection = this.dataSource.getConnection();
  17. statement = connection.createStatement();
  18. statement.execute(insertSql);
  19. return Action.CommitMessage;
  20. } catch (Throwable e) {
  21. return Action.ReconsumeLater;
  22. } finally {
  23. if (statement ! = null) {
  24. try {
  25. statement.close();
  26. } catch(Exception e) { }
  27. }
  28. if (connection ! = null) {
  29. try {
  30. connection.close();
  31. } catch (Exception e) { }
  32. }
  33. }
  34. }
  35. public void setDataSource(MQDataSource dataSource) {
  36. this.dataSource = dataSource;
  37. }
  38. }
  1. Define a consumer bean and other related information in consumer.xml.
  1. <? xml version="1.0" encoding="UTF-8"? >
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
  5. <bean id="mqDataSource" class="com.aliyun.openservices.ons.api.impl.rocketmq.exactlyonce.datasource.MQDataSource" init-method="init" destroy-method="close">
  6. <property name="url" value="{url}" />
  7. <property name="username" value="{user}" />
  8. <property name="password" value="{passwd}" />
  9. <property name="driverClass" value="{driver}" />
  10. </bean>
  11. <bean id="msgListener" class="com.aliyun.openservices.ons.api.impl.rocketmq.exactlyonce.spring.TestMessageListener">
  12. <property name="dataSource" ref="mqDataSource"> <! --Consumer configuration-->
  13. </property>
  14. </bean> <! --Listener configuration -->
  15. <! --When multiple group IDs subscribe to the same topic, you can create multiple consumer beans.
  16. <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ExactlyOnceConsumerBean" init-method="start" destroy-method="shutdown">
  17. <property name="properties" > <! --Consumer configuration-->
  18. <props>
  19. <prop key="GROUP_ID">{gid}</prop>
  20. <prop key="AccessKey">{accessKey}</prop>
  21. <prop key="SecretKey">{secretKey}</prop>
  22. <! --Set the number of consumer threads to 50.
  23. <prop key="ConsumeThreadNums">50</prop>
  24. -->
  25. </props>
  26. </property>
  27. <property name="subscriptionTable">
  28. <map>
  29. <entry value-ref="msgListener">
  30. <key>
  31. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
  32. <property name="topic" value="{topic}"/>
  33. <property name="expression" value="{subExpression}"/><! --The expression is a tag. You can set it to a specific tag, such as taga||tagb||tagc, or enter an asterisk (*) in this field. The asterisk (*) indicates subscription of all tags, but is not used as a wildcard.-->
  34. </bean>
  35. </key>
  36. </entry>
  37. </map>
  38. </property>
  39. </bean>
  40. </beans>
  1. Run the consumer that is integrated with Spring.
  1. public class ConsumeWithSpring {
  2. public static void main(String[] args) {
  3. /**
  4. * The consumer bean is configured in consumer.xml. You can call ApplicationContext to obtain the bean or to inject it to other classes, such as a controller.
  5. */
  6. ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
  7. System.out.println("Consumer Started");
  8. }

Use MyBatis to enable the Exactly-Once delivery semantics on the message listener

  1. Design the data access object (DAO) for write operations in the business database.
  1. package com.aliyun.openservices.tcp.example.mybatis;
  2. public interface AppDAO {
  3. Integer insertMessage(String msgId);
  4. }
  1. Write the INSERT statement in the mapper.xml file.
  1. <? xml version="1.0" encoding="UTF-8" ? >
  2. <! DOCTYPE mapper
  3. PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  4. "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  5. <mapper namespace="com.aliyun.openservices.tcp.example.mybatis.AppDAO">
  6. <insert id="insertMessage">
  7. INSERT INTO app (message_id, ctime) VALUES (#{msgId}, now())
  8. </insert>
  9. </mapper>
  1. Call MQDataSource to define the custom DataSourceFactory.
  1. public class MQDataSourceFactoty extends DruidDataSourceFactory implements DataSourceFactory {
  2. protected Properties properties;
  3. @Override
  4. public void setProperties(Properties properties) {
  5. this.properties = properties;
  6. }
  7. @Override
  8. public DataSource getDataSource() {
  9. try {
  10. DruidDataSource druidDataSource = (DruidDataSource) createDataSource(this.properties);
  11. return new MQDataSource(druidDataSource);
  12. } catch (Exception e) {
  13. System.out.println("err:" + e.getMessage());
  14. }
  15. return null;
  16. }
  17. }
  1. Register the data source as the MQDataSourceFactoty class in mybatis-config.xml.
  1. <configuration>
  2. <environments default="development">
  3. <environment id="development">
  4. <transactionManager type="JDBC"/>
  5. <! --Database connection configuration-->
  6. <dataSource type="com.aliyun.openservices.tcp.example.mybatis.MQDataSourceFactoty">
  7. <property name="driverClass" value="com.mysql.jdbc.Driver"/>
  8. <property name="url" value="{url}"/>
  9. <property name="username" value="{username}"/>
  10. <property name="password" value="{password}"/>
  11. <property name="initialSize" value="10" />
  12. <property name="maxActive" value="20"/>
  13. </dataSource>
  14. </environment>
  15. </environments>
  16. <mappers>
  17. <mapper resource="mapper.xml"/>
  18. </mappers>
  19. </configuration>
  1. Connect to the database from the message listener by using MyBatis to implement Exactly-Once consumption.
  1. public class TestMybatisListener implements MessageListener {
  2. private static SqlSessionFactory sqlSessionFactory;
  3. static {
  4. String resource = "mybatis-config.xml";
  5. Reader reader = null;
  6. try {
  7. reader= Resources.getResourceAsReader(resource);
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. }
  11. sqlSessionFactory = new SqlSessionFactoryBuilder().build(reader);
  12. }
  13. @Override
  14. public Action consume(Message message, ConsumeContext context) {
  15. long begion = System.currentTimeMillis();
  16. SqlSession sqlSession = null;
  17. try {
  18. sqlSession = sqlSessionFactory.openSession();
  19. AppDAO appDAO = sqlSession.getMapper(AppDAO.class);
  20. appDAO.insertMessage(message.getMsgID());
  21. System.out.println("consume : " + message.getMsgID());
  22. sqlSession.commit();
  23. return Action.CommitMessage;
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. sqlSession.rollback();
  27. return Action.ReconsumeLater;
  28. } finally {
  29. sqlSession.close();
  30. }
  31. }
  32. }

Precautions

When using an Exactly-Once consumer of RocketMQ for message consumption, pay attention to the following points:

  • The consumer offset cannot be reset manually in the console. If you reset the consumer offset to a consumed time point, the Exactly-Once delivery semantics becomes ineffective.

  • Each insert or update operation in a database triggers an update operation. In addition, the Exactly-Once consumer queries and deletes data in the database periodically. These extra operations increase the IOPS of the database.