全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發事務訊息

更新時間:Jul 01, 2024

本文提供使用TCP協議下的社區版Java SDK收發事務訊息的範例程式碼供您參考。

互動流程

事務訊息互動流程如下圖所示。

process

更多資訊,請參見事務訊息

前提條件

您已完成以下操作:

發送事務訊息

發送事務訊息包含以下兩個步驟:

  1. 發送半事務訊息(Half Message)及執行本地事務,範例程式碼如下。

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
            * 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
            * 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
            */       
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
             /**
             * 建立事務訊息Producer,並開啟訊息軌跡。設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
             * 如果不想開啟訊息軌跡,可以按照如下方式建立:
             * TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
             */
             TransactionMQProducer transactionMQProducer = new TransactionMQProducer(null, "YOUR TRANSACTION GROUP ID", getAclRPCHook(), true, null);
             /**
           * 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“http://MQ_INST_XXXX.aliyuncs.com:80”。
           */
         transactionMQProducer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
            //阿里雲上訊息軌跡需要設定為CLOUD方式,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("開始執行本地事務: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. 提交事務訊息狀態

    當本地事務執行完成(執行成功或執行失敗),需要通知伺服器當前訊息的事務狀態。通知方式有以下兩種:

    • 執行本地事務完成後提交。

    • 執行本地事務一直沒提交狀態,等待伺服器回查訊息的事務狀態。

    事務狀態有以下三種:

    • LocalTransactionState.COMMIT_MESSAGE:提交事務,允許訂閱者消費該訊息。

    • LocalTransactionState.ROLLBACK_MESSAGE:復原事務,訊息將被丟棄不允許消費。

    • LocalTransactionState.UNKNOW:無法判斷狀態,期待阿里雲雲訊息佇列 RocketMQ 版的Broker向發送方再次詢問該訊息對應的本地事務的狀態。

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    /**
     * 訊息佇列RocketMQ版發送事務訊息本地Check介面實作類別。
     */
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        /**
         * 本地事務Checker。更多資訊,請參見事務訊息。
         */
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("收到事務訊息的回查請求, MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

事務回查機制說明

  • 發送事務訊息為什麼必須要實現回查Check機制?

    當步驟1中半事務訊息發送完成,但本地事務返回狀態為LocalTransactionState.UNKNOW,或者應用退出導致本地事務未提交任何狀態時,從Broker的角度看,這條Half狀態的訊息的狀態是未知的。因此Broker會定期要求發送方能Check該Half狀態訊息,並上報其最終狀態。

  • Check被回調時,商務邏輯都需要做些什嗎?

    事務訊息的Check方法裡面,應該寫一些檢查事務一致性的邏輯。雲訊息佇列 RocketMQ 版發送事務訊息時需要實現LocalTransactionChecker介面,用來處理Broker主動發起的本地事務狀態回查請求;因此在事務訊息的Check方法中,需要完成兩件事情:

    1. 檢查該半事務訊息對應的本地事務的狀態(committed or rollback)。

    2. 向Broker提交該半事務訊息本地事務的狀態。

訂閱事務訊息

事務訊息的訂閱與普通訊息訂閱一致,如下所示。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
    * 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * 建立Consumer,並開啟訊息軌跡。設定為您在阿里雲訊息佇列RocketMQ版控制台建立的Group ID。
         * 如果不想開啟訊息軌跡,可以按照如下方式建立:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        //設定為阿里雲訊息佇列RocketMQ版執行個體的存取點。
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        //阿里雲上訊息軌跡需要設定為CLOUD方式,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // 設定為您在阿里雲訊息佇列RocketMQ版控制台上建立的Topic。
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}