すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:トランザクションメッセージの送受信

最終更新日:Jul 09, 2024

このトピックでは、Community EditionのTCPクライアントSDK for Javaを使用してトランザクションメッセージを送受信するためのサンプルコードを提供します。

相互作用プロセス

次の図は、トランザクションメッセージの対話プロセスを示しています。

process

詳細については、「トランザクションメッセージ」をご参照ください。

前提条件

開始する前に、次の操作が実行されていることを確認してください。

  • SDK for Java 4.5.2以降のCommunity Editionがダウンロードされます。 詳細については、RocketMQのダウンロードページをご覧ください。

  • 環境を整えます。 詳細については、「環境の準備」をご参照ください。

  • Alibaba CloudアカウントにAccessKeyペアが作成されます。 詳細については、「AccessKey の作成」をご参照ください。

トランザクションメッセージの送信

トランザクションメッセージを送信するには、次の手順が必要です。

  1. ハーフメッセージを送信し、対応するローカルトランザクションを実行します。 サンプルコード:

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
            * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
            * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
            */       
            return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
        }
    
        public static void main(String[] args) throws MQClientException {
             /**
             * Create a producer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
             * If you do not want to enable the message trace feature, you can use the following method to create a producer:
             * TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
             */
             TransactionMQProducer transactionMQProducer = new TransactionMQProducer(null, "YOUR TRANSACTION GROUP ID", getAclRPCHook(), true, null);
             /**
           * The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
           */
         transactionMQProducer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
            // Set the AccessChannel parameter to CLOUD. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("Start executing the local transaction: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. トランザクションメッセージのステータスをコミットします。

    ローカルトランザクションの実行後、実行が成功したかどうかにかかわらず、ApsaraMQ for RocketMQブローカーに現在のメッセージのトランザクションステータスを通知する必要があります。 ApsaraMQ for RocketMQブローカーには、次のいずれかの方法で通知できます。

    • ローカルトランザクションの実行后にステータスをコミットします。

    • ApsaraMQ for RocketMQブローカーがメッセージのトランザクションステータスを確認するリクエストを送信するまで待ちます。

    トランザクションは、次のいずれかの状態になります。

    • LocalTransactionState.COMMIT_MESSAGE: トランザクションがコミットされています。 メッセージは、消費者によって消費され得る。

    • LocalTransactionState.ROLLBACK_MESSAGE: トランザクションはロールバックされます。 メッセージは破棄され、使用できません。

    • LocalTransactionState.UNKNOW: トランザクションのステータスが不明であり、システムはApsaraMQ for RocketMQブローカーがメッセージに対応するローカルトランザクションのステータスを照会するのを待っています。

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    /**
     * The class that is used to check the status of local transactions implemented by sending ApsaraMQ for RocketMQ transactional messages. 
     */
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        /**
         * The local transaction checker. For more information, see Transactional messages. 
         */
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("The request to check the transaction status of the message is received. MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

トランザクション状態チェックの仕組み

  • トランザクションメッセージの送信時にトランザクションステータスチェックのメカニズムを実装する必要があるのはなぜですか。

    手順1でハーフメッセージが送信されたが、TransactionStatus.Unknowが返された場合、またはアプリケーションの終了によりローカルトランザクションのステータスがコミットされていない場合、ハーフメッセージのステータスはApsaraMQ for RocketMQブローカーには不明です。 この場合、ブローカは、送信者に、半メッセージの状態をチェックして報告するように定期的に要求する。

  • チェックメソッドがコールバックされると、ビジネスロジックは何をしますか?

    トランザクションメッセージのチェックメソッドには、トランザクションの整合性をチェックするために使用されるロジックが含まれている必要があります。 トランザクションメッセージの送信後、ApsaraMQ for RocketMQLocalTransactionChecker APIを呼び出して、ローカルトランザクションのブローカーからのステータスチェック要求に応答する必要があります。 したがって、トランザクションメッセージのチェックに使用されるメソッドは、次の目的を達成する必要があります。

    1. ハーフメッセージに対応するローカルトランザクションのステータス (コミット済みまたはロールバック) を確認します。

    2. ハーフメッセージに対応するローカルトランザクションのステータスをブローカーにコミットします。

トランザクションメッセージの購読

トランザクションメッセージをサブスクライブするために使用されるメソッドは、通常のメッセージをサブスクライブするために使用されるメソッドと同じです。 サンプルコード:

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * Create a consumer and enable the message trace feature. Set this parameter to the ID of the group that you created in the ApsaraMQ for RocketMQ console. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        // The endpoint of the ApsaraMQ for RocketMQ instance. 
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        // Set the AccessChannel parameter to CLOUD. If you want to use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}