このトピックでは、Apache RocketMQ Java SDK を使用してメッセージを送受信するためのサンプルコードを提供します。
サンプルコード
サーバーレスインスタンスを使用する場合は、SDK のバージョンとパブリックネットワークアクセスのその他の要件に注意してください。詳細については、「パブリックネットワークアクセスの SDK バージョン要件」をご参照ください。
gRPC プロトコル SDK
RocketMQ-Spring のサンプルコードについては、「rocketmq-v5-client-spring-boot-samples」をご参照ください。
rocketmq-client-javaSDK は gRPC プロトコルを使用します。以下に、この SDK のサンプルコードを示します。重要gRPC プロトコル用の SDK を使用してトランザクションメッセージを送信する場合、プロデューサーの起動時に Topic を設定しないと、トランザクションチェックが遅延します。メッセージが 4 時間以内に送信されない場合、ハーフトランザクションメッセージは破棄される可能性があります。したがって、トランザクションプロデューサーを起動するときは、Topic を設定する必要があります。
メッセージタイプ
メッセージ送信のサンプルコード
メッセージサブスクリプションのサンプルコード
同期サブスクリプション: SimpleConsumerExample.java
非同期サブスクリプション: AsyncSimpleConsumerExample.java
なし
Remoting プロトコル SDK
RocketMQ-Spring のサンプルコードについては、「rocketmq-spring-boot-samples」をご参照ください。
rocketmq-clientSDK は Remoting プロトコルを使用します。以下に、この SDK のサンプルコードを示します。通常メッセージ
通常メッセージの送信 (同期)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQProducer { /** * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。 * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。 * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。 * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。 * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // パブリックエンドポイントを使用する場合は、RPCHook を設定します。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。 // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。 // DefaultMQProducer producer = new DefaultMQProducer(); // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。 producer.setProducerGroup("YOUR GROUP ID"); // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。 producer.setAccessChannel(AccessChannel.CLOUD); // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。 producer.setEnableTrace(true); // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。 // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // アプリケーションが終了する前に、プロデューサーオブジェクトを破棄します。 // 注: プロデューサーオブジェクトを破棄すると、システムメモリが節約されます。頻繁にメッセージを送信する場合は、プロデューサーオブジェクトを破棄しないでください。 producer.shutdown(); } }通常メッセージの送信 (非同期)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; import java.util.concurrent.TimeUnit; public class RocketMQAsyncProducer { /** * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。 * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。 * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。 * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。 * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException, InterruptedException { // パブリックエンドポイントを使用する場合は、RPCHook を設定します。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。 // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。 // DefaultMQProducer producer = new DefaultMQProducer(); // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。 producer.setProducerGroup("YOUR GROUP ID"); // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。 producer.setAccessChannel(AccessChannel.CLOUD); // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。 producer.setEnableTrace(true); // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。 // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // メッセージが送信されます。 System.out.println("send message success. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。 System.out.println("send message failed."); throwable.printStackTrace(); } }); } catch (Exception e) { // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // 現在のスレッドを 3 秒間ブロックして、非同期送信の結果を待ちます。 TimeUnit.SECONDS.sleep(3); // アプリケーションが終了する前に、プロデューサーオブジェクトを破棄します。 // 注: プロデューサーオブジェクトを破棄すると、システムメモリが節約されます。頻繁にメッセージを送信する場合は、プロデューサーオブジェクトを破棄しないでください。 producer.shutdown(); } }通常メッセージの送信 (一方向)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQOnewayProducer { /** * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。 * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。 * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。 * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。 * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // パブリックエンドポイントを使用する場合は、RPCHook を設定します。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。 // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。 // DefaultMQProducer producer = new DefaultMQProducer(); // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。 producer.setProducerGroup("YOUR GROUP ID"); // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。 producer.setAccessChannel(AccessChannel.CLOUD); // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。 producer.setEnableTrace(true); // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。 // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } catch (Exception e) { // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // アプリケーションが終了する前に、プロデューサーオブジェクトを破棄します。 // 注: プロデューサーオブジェクトを破棄すると、システムメモリが節約されます。頻繁にメッセージを送信する場合は、プロデューサーオブジェクトを破棄しないでください。 producer.shutdown(); } }通常メッセージのサブスクライブ
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import java.util.List; public class RocketMQPushConsumer { /** * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。 * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。 * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。 * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。 * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // パブリックエンドポイントを使用する場合は、RPCHook を設定します。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook()); // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。 // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。 // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。 consumer.setConsumerGroup("YOUR GROUP ID"); // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。 consumer.setAccessChannel(AccessChannel.CLOUD); // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。 consumer.setEnableTrace(true); // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。 // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。 consumer.setNamesrvAddr("YOUR ACCESS POINT"); // これを ApsaraMQ for RocketMQ コンソールで作成した Topic に設定します。 consumer.subscribe("YOUR TOPIC", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }順序メッセージ
順序メッセージの送信
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.List; public class RocketMQOrderProducer { /** * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。 * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。 * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。 * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。 * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // パブリックエンドポイントを使用する場合は、RPCHook を設定します。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。 // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。 // DefaultMQProducer producer = new DefaultMQProducer(); // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。 producer.setProducerGroup("YOUR GROUP ID"); // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。 producer.setAccessChannel(AccessChannel.CLOUD); // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。 producer.setEnableTrace(true); // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。 // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { int orderId = i % 10; Message msg = new Message("YOUR ORDER TOPIC", "YOUR MESSAGE TAG", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 重要: 順序メッセージがキューに均等に分散されるようにするには、この設定項目を設定する必要があります。 // V5.x では、次のコード行を msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + ""); に置き換えることができます。 msg.putUserProperty("__SHARDINGKEY", orderId + ""); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // ニーズに合ったパーティション選択アルゴリズムを選択して、同じパラメーターが同じ結果を生成するようにします。 Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }順序メッセージのサブスクライブ
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import java.util.List; public class RocketMQOrderConsumer { private static RPCHook getAclRPCHook() { /** * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。 * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。 * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。 * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。 * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。 */ return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // パブリックエンドポイントを使用する場合は、RPCHook を設定します。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook()); // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。 // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。 // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。 consumer.setConsumerGroup("YOUR GROUP ID"); // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。 consumer.setAccessChannel(AccessChannel.CLOUD); // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。 consumer.setEnableTrace(true); // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。 // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。 consumer.setNamesrvAddr("YOUR ACCESS POINT"); consumer.subscribe("YOUR ORDER TOPIC", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // コンシュームに失敗した場合は、ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT を返して一時停止し、リトライします。 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }スケジュール/遅延メッセージ
スケジュール/遅延メッセージの送信
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQDelayProducer { /** * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。 * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。 * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。 * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。 * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // パブリックエンドポイントを使用する場合は、RPCHook を設定します。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。 // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。 // DefaultMQProducer producer = new DefaultMQProducer(); // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。 producer.setProducerGroup("YOUR GROUP ID"); // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。 producer.setAccessChannel(AccessChannel.CLOUD); // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。 producer.setEnableTrace(true); // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。 // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { // これを ApsaraMQ for RocketMQ コンソールで作成した Topic に設定します。 Message msg = new Message("YOUR TOPIC", // メッセージタグを設定します。 "YOUR MESSAGE TAG", // メッセージ本文。 "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 遅延メッセージを送信するには、遅延時間をミリ秒 (ms) 単位で設定します。メッセージは指定された遅延後に配信されます。たとえば、メッセージは 3 秒後に配信されます。 long delayTime = System.currentTimeMillis() + 3000; msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime)); // スケジュールメッセージを送信するには、配信の特定の時刻を設定します。たとえば、メッセージは 2021-08-10 の 18:45:00 に配信されます。 // 時刻のフォーマットは yyyy-MM-dd HH:mm:ss です。指定された時刻が現在の時刻より前の場合、メッセージはすぐにコンシューマーに配信されます。 // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime(); // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { // メッセージの送信に失敗した場合は、再送信するか、後で処理するためにデータを保存します。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // アプリケーションが終了する前に、プロデューサーオブジェクトを破棄します。 // 注: プロデューサーオブジェクトを破棄すると、システムメモリが節約されます。頻繁にメッセージを送信する場合は、プロデューサーオブジェクトを破棄しないでください。 producer.shutdown(); } }スケジュール/遅延メッセージをサブスクライブするためのサンプルコードは、通常メッセージの場合と同じです。
トランザクションメッセージ
トランザクションメッセージの送信
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQTransactionProducer { private static RPCHook getAclRPCHook() { /** * パブリックネットワーク経由でインスタンスにアクセスする場合は、インスタンスのユーザー名とパスワードを使用して RPCHook を設定する必要があります。 * ユーザー名とパスワードは、Resource Access Management コンソールの Intelligent Identity Recognition タブで取得できます。 * 重要: Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret は使用しないでください。 * Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、RPCHook を初期化する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。 * サーバーレスインスタンスを使用する場合は、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由のパスワードなしのアクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。 */ return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // パブリックエンドポイントを使用する場合は、RPCHook を設定します。 // ApsaraMQ for RocketMQ コンソールで作成したグループ ID。注: トランザクションメッセージのグループ ID は、他のメッセージタイプと共有できません。 TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook()); // VPC エンドポイントを使用する場合は、RPCHook を設定する必要はありません。 // サーバーレスインスタンスを使用する場合は、RPCHook を設定する必要があります。 // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID"); // 接続タイプを Alibaba Cloud に設定します。これは、クラウドメッセージトレース機能を使用するために必要です。メッセージトレースを有効にしない場合は、このコードを実行する必要はありません。 transactionMQProducer.setAccessChannel(AccessChannel.CLOUD); // SDK V5.3.0 以降では、メッセージトレースを有効にするために AccessChannel を設定するだけでなく、EnableTrace パラメーターも追加する必要があります。 transactionMQProducer.setEnableTrace(true); // これを ApsaraMQ for RocketMQ コンソールから取得したエンドポイントに設定します。例: rmq-cn-XXXX.rmq.aliyuncs.com:8080。 // 重要: コンソールで提供されているドメイン名とポートを入力してください。http:// または https:// プレフィックスを追加しないでください。解決済みの IP アドレスは使用しないでください。 transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT"); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("Start to execute the local transaction: " + msg); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("Received a transaction check request, MsgId: " + msg.getMsgId()); return LocalTransactionState.COMMIT_MESSAGE; } }); transactionMQProducer.start(); for (int i = 0; i < 10; i++) { try { Message message = new Message("YOUR TRANSACTION TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null); assert sendResult != null; } catch (Exception e) { e.printStackTrace(); } } } }トランザクションメッセージをサブスクライブするためのサンプルコードは、通常メッセージの場合と同じです。
サーバーレスインスタンスのパブリックネットワークアクセスバージョンガイド
ApsaraMQ for RocketMQ のサーバーレスインスタンスにパブリックネットワーク経由でアクセスするには、SDK が次のバージョン要件を満たしている必要があります。また、指定されたコードをアプリケーションに追加する必要もあります。
InstanceId を実際のインスタンス ID に置き換えてください。
SDK バージョン: rocketmq-client ≥ 5.2.0
メッセージを送信するには、次のコードを追加します:
producer.setNamespaceV2("InstanceId");メッセージをコンシュームするには、次のコードを追加します:
consumer.setNamespaceV2("InstanceId");SDK バージョン: rocketmq-client-java ≥ 5.0.6
メッセージを送受信するには、次のコードを追加します:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();