すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:注文メッセージの送受信

最終更新日:Dec 10, 2024

注文メッセージは、ApsaraMQ for RocketMQによって提供されるメッセージの一種です。 順序付けられたメッセージは、厳密な先入れ先出し (FIFO) の順序で発行および消費されます。 このトピックでは、Apache RocketMQ TCPクライアントSDK for Javaを使用して順序付けられたメッセージを送受信するためのサンプルコードを提供します。

背景情報

順序付けられたメッセージは、次のタイプに分類されます。

  • グローバルに順序付けられたメッセージ: 指定されたトピックのすべてのメッセージは、先入れ先出し (FIFO) の順序で発行および使用されます。

  • パーティション順メッセージ: 指定されたトピックのすべてのメッセージは、シャーディングキーを使用して異なるパーティションに配布されます。 各パーティション内のメッセージは、FIFO順に発行され、消費される。 シャーディングキーは、異なるパーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーは、通常のメッセージのキーとは異なります。

詳細については、「注文メッセージ」をご参照ください。

前提条件

開始する前に、次の操作が実行されていることを確認してください。

  • Apache RocketMQ SDK for Java 4.5.2以降がダウンロードされます。 詳細については、RocketMQのダウンロードページをご覧ください。

  • 環境を整えます。 詳細については、「環境の準備」をご参照ください。

  • Alibaba CloudアカウントにAccessKeyペアが作成されます。 詳細については、「AccessKey の作成」をご参照ください。

順序付けられたメッセージを送信する

重要

ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。

次のサンプルコードは、Apache RocketMQ TCPクライアントSDK for Javaを使用して順序付けられたメッセージを送信する方法の例を示しています。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOrderProducer {
    private static RPCHook getAclRPCHook() 
   {
       /**
       * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
       * Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. 
       */
       return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * Create a producer and enable the message trace feature. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         * DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook());
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook(), true, null);

        /**
         * Specify Alibaba Cloud as the access channel. Before you use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
         */
        producer.setAccessChannel(AccessChannel.CLOUD);
        /**
        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
        */
        producer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("YOUR ORDER TOPIC",
                    "YOUR MESSAGE TAG",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                /**
                 * Note: This parameter is required. Ordered messages can be evenly distributed to each queue only after this parameter is configured. 
                 * If the version of the SDK is 5.x or later, you can use the following method to specify the message order:
                 * msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                 */
                msg.putUserProperty("__SHARDINGKEY", orderId + "");
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // Select a partition selection algorithm that meets your business requirements. The algorithm can be used to ensure that the results of the same parameter are the same. 
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

注文されたメッセージを購読する

次のサンプルコードは、Apache RocketMQ TCPクライアントSDK for Javaを使用して順序付きメッセージをサブスクライブする方法の例を示しています。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() 
   {
         /**
         * The AccessKey ID and AccessKey secret of your Alibaba Cloud account. 
         * Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. 
         */
         return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {

        /**
         * Create a consumer and enable the message trace feature. 
         * If you do not want to enable the message trace feature, you can use the following method to create a producer:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), null);
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);

        /**
         * Specify Alibaba Cloud as the access channel. Before you use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty. 
         */
        consumer.setAccessChannel(AccessChannel.CLOUD);
        /**
        // The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80. 
        */
        consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        consumer.subscribe("YOUR ORDER TOPIC", "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true); 
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;// If a message fails to be consumed, the request is suspended and retried and the following result is returned: ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }