すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:サンプルコード

最終更新日:Nov 10, 2025

このトピックでは、Apache RocketMQ Java SDK を使用してメッセージを送受信するためのサンプルコードを提供します。

サンプルコード

重要

サーバーレスインスタンスを使用する場合は、SDK のバージョンとパブリックネットワークアクセスのその他の要件に注意してください。詳細については、「パブリックネットワークアクセスの SDK バージョン要件」をご参照ください。

gRPC プロトコル SDK

Remoting プロトコル SDK

  • RocketMQ-Spring のサンプルコードについては、「rocketmq-spring-boot-samples」をご参照ください。

  • rocketmq-client SDK は Remoting プロトコルを使用します。以下に、この SDK のサンプルコードを示します。

    通常メッセージ

    通常メッセージの送信 (同期)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQProducer {
        /**
         * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。
         * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。
         * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。
         * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
         * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // パブリックエンドポイントを使用する場合は、RPCHook を設定します。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。
            // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。
            producer.setEnableTrace(true);
    
            // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。
            // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // アプリケーションが終了する前に、プロデューサーオブジェクトを破棄します。
            // 注: プロデューサーオブジェクトを破棄すると、システムメモリが節約されます。頻繁にメッセージを送信する場合は、プロデューサーオブジェクトを破棄しないでください。
            producer.shutdown();
        }
    }

    通常メッセージの送信 (非同期)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    public class RocketMQAsyncProducer {
        /**
         * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。
         * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。
         * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。
         * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
         * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException, InterruptedException {
            // パブリックエンドポイントを使用する場合は、RPCHook を設定します。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。
            // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。
            producer.setEnableTrace(true);
    
            // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。
            // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult result) {
                            // メッセージが送信されます。
                            System.out.println("send message success. msgId= " + result.getMsgId());
                        }
    
                        @Override
                        public void onException(Throwable throwable) {
                            // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。
                            System.out.println("send message failed.");
                            throwable.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
            // 現在のスレッドを 3 秒間ブロックして、非同期送信の結果を待ちます。
            TimeUnit.SECONDS.sleep(3);
    
            // アプリケーションが終了する前に、プロデューサーオブジェクトを破棄します。
            // 注: プロデューサーオブジェクトを破棄すると、システムメモリが節約されます。頻繁にメッセージを送信する場合は、プロデューサーオブジェクトを破棄しないでください。
            producer.shutdown();
        }
    }

    通常メッセージの送信 (一方向)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQOnewayProducer {
        /**
         * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。
         * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。
         * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。
         * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
         * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // パブリックエンドポイントを使用する場合は、RPCHook を設定します。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。
            // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。
            producer.setEnableTrace(true);
    
            // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。
            // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.sendOneway(msg);
                } catch (Exception e) {
                    // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // アプリケーションが終了する前に、プロデューサーオブジェクトを破棄します。
            // 注: プロデューサーオブジェクトを破棄すると、システムメモリが節約されます。頻繁にメッセージを送信する場合は、プロデューサーオブジェクトを破棄しないでください。
            producer.shutdown();
        }
    }

    通常メッセージのサブスクライブ

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    
    import java.util.List;
    
    public class RocketMQPushConsumer {
        /**
         * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。
         * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。
         * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。
         * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
         * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // パブリックエンドポイントを使用する場合は、RPCHook を設定します。
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
            // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。
            // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。
            // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    
            // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。
            consumer.setConsumerGroup("YOUR GROUP ID");
    
            // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。
            consumer.setAccessChannel(AccessChannel.CLOUD);
            // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。
            consumer.setEnableTrace(true);
    
            // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。
            // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。
            consumer.setNamesrvAddr("YOUR ACCESS POINT");
            // これを ApsaraMQ for RocketMQ コンソールで作成した Topic に設定します。
            consumer.subscribe("YOUR TOPIC", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("Receive New Messages: %s %n", msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    }

    順序メッセージ

    順序メッセージの送信

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.List;
    
    public class RocketMQOrderProducer {
        /**
         * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。
         * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。
         * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。
         * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
         * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // パブリックエンドポイントを使用する場合は、RPCHook を設定します。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。
            // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。
            producer.setEnableTrace(true);
    
            // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。
            // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    int orderId = i % 10;
                    Message msg = new Message("YOUR ORDER TOPIC",
                            "YOUR MESSAGE TAG",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // 重要: 順序メッセージがキューに均等に分散されるようにするには、この設定項目を設定する必要があります。
                    // V5.x では、次のコード行を msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + ""); に置き換えることができます。
                    msg.putUserProperty("__SHARDINGKEY", orderId + "");
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            // ニーズに合ったパーティション選択アルゴリズムを選択して、同じパラメーターが同じ結果を生成するようにします。
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
    
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            producer.shutdown();
        }
    }

    順序メッセージのサブスクライブ

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    
    import java.util.List;
    
    public class RocketMQOrderConsumer {
        private static RPCHook getAclRPCHook() {
            /**
             * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。
             * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。
             * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。
             * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
             * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // パブリックエンドポイントを使用する場合は、RPCHook を設定します。
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
            // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。
            // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。
            // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    
            // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。
            consumer.setConsumerGroup("YOUR GROUP ID");
    
            // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。
            consumer.setAccessChannel(AccessChannel.CLOUD);
            // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。
            consumer.setEnableTrace(true);
    
            // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。
            // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。
            consumer.setNamesrvAddr("YOUR ACCESS POINT");
            consumer.subscribe("YOUR ORDER TOPIC", "*");
    
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerOrderly() {
    
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    // コンシュームに失敗した場合は、ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT を返して一時停止し、リトライします。
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }

    スケジュール/遅延メッセージ

    スケジュール/遅延メッセージの送信

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQDelayProducer {
        /**
         * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。
         * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。
         * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。
         * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
         * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // パブリックエンドポイントを使用する場合は、RPCHook を設定します。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。
            // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。
            producer.setEnableTrace(true);
    
            // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。
            // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    // これを ApsaraMQ for RocketMQ コンソールで作成した Topic に設定します。
                    Message msg = new Message("YOUR TOPIC",
                            // メッセージタグを設定します。
                            "YOUR MESSAGE TAG",
                            // メッセージ本文。
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // 遅延メッセージを送信するには、遅延時間をミリ秒 (ms) 単位で設定します。メッセージは指定された遅延後に配信されます。たとえば、メッセージは 3 秒後に配信されます。
                    long delayTime = System.currentTimeMillis() + 3000;
                    msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
    
                    // スケジュールメッセージを送信するには、配信の特定の時刻を設定します。たとえば、メッセージは 2021-08-10 の 18:45:00 に配信されます。
                    // 時刻のフォーマットは yyyy-MM-dd HH:mm:ss です。指定された時刻が現在の時刻より前の場合、メッセージはすぐにコンシューマーに配信されます。
                    // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                    // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // アプリケーションが終了する前に、プロデューサーオブジェクトを破棄します。
            // 注: プロデューサーオブジェクトを破棄すると、システムメモリが節約されます。頻繁にメッセージを送信する場合は、プロデューサーオブジェクトを破棄しないでください。
            producer.shutdown();
        }
    }

    スケジュール/遅延メッセージをサブスクライブするためのサンプルコードは、通常メッセージの場合と同じです。

    トランザクションメッセージ

    トランザクションメッセージの送信

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
             * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。
             * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。
             * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。
             * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
             * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // パブリックエンドポイントを使用する場合は、RPCHook を設定します。
            // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。注: トランザクションメッセージのグループ ID は、他のメッセージタイプと共有できません。
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
            // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。
            // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。
            // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
    
            // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
            // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。
            transactionMQProducer.setEnableTrace(true);
    
            // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。
            // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。
            transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
            transactionMQProducer.setTransactionListener(new TransactionListener() {
    
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    System.out.println("Start to execute the local transaction: " + msg);
                    return LocalTransactionState.UNKNOW;
                }
    
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.println("Received a transaction check request, MsgId: " + msg.getMsgId());
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    トランザクションメッセージをサブスクライブするためのサンプルコードは、通常メッセージの場合と同じです。

サーバーレスインスタンスのパブリックネットワークアクセスバージョンガイド

ApsaraMQ for RocketMQ のサーバーレスインスタンスにパブリックネットワーク経由でアクセスするには、SDK が次のバージョン要件を満たしている必要があります。また、指定されたコードをアプリケーションに追加する必要もあります。

説明

InstanceId を実際のインスタンス ID に置き換えてください。

  • SDK バージョン: rocketmq-client ≥ 5.2.0

    メッセージを送信するには、次のコードを追加します: producer.setNamespaceV2("InstanceId");

    メッセージをコンシュームするには、次のコードを追加します: consumer.setNamespaceV2("InstanceId");

  • SDK バージョン: rocketmq-client-java ≥ 5.0.6

    メッセージを送受信するには、次のコードを追加します:

    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();