ApsaraMQ for RocketMQ が提供するメッセージの種類の 1 つである、順序付けされたメッセージについて説明します。順序付けされたメッセージは、厳密な先入れ先出し (FIFO) の順序で公開および使用されます。このトピックでは、Apache RocketMQ SDK for Java を使用して TCP 経由で順序付けされたメッセージを送受信するために使用されるサンプルコードを示します。
前提条件
以下の操作が実行されていることを確認します。
背景情報
特定の Topic 内のパーティション順序付けメッセージは、シャーディングキーに基づいてパーティション化されます。各パーティション内のメッセージは、厳密な FIFO 順序で使用されます。シャーディングキーは、順序付けされたメッセージが異なるパーティションを識別するために使用されるキーフィールドです。シャーディングキーは、通常のメッセージのキーとは異なります。
詳細については、「順序付けされたメッセージ」をご参照ください。
ApsaraMQ for RocketMQ を初めて使用する場合は、デモ プロジェクト を参照してプロジェクトを構築してから、ApsaraMQ for RocketMQ を使用してメッセージを送受信することをお勧めします。
順序付けされたメッセージを送信する
ApsaraMQ for RocketMQ ブローカーは、送信者が単一の プロデューサー または スレッド を使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。送信者が複数の プロデューサー または スレッド を使用して同時にメッセージを送信する場合、メッセージの順序は、ApsaraMQ for RocketMQ ブローカーがメッセージを受信する順序によって決定されます。この順序は、ビジネス側の送信順序とは異なる場合があります。
詳細なサンプルコードについては、ApsaraMQ for RocketMQ コードリポジトリ を参照してください。
サンプルコード:
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import java.util.Properties;
public class ProducerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// ApsaraMQ for RocketMQ コンソールで作成したコンシューマーグループの ID。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
// 認証に使用される AccessKey ID。
properties.put(PropertyKeyConst.AccessKey,"XXX");
// 認証に使用される AccessKey シークレット。
properties.put(PropertyKeyConst.SecretKey,"XXX");
// TCP エンドポイント。エンドポイントは、ApsaraMQ for RocketMQ コンソールのインスタンスの詳細ページの TCP エンドポイントセクションで取得できます。
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// メッセージを送信する前に、start() メソッドを 1 回だけ呼び出してプロデューサーを起動します。
producer.start();
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
Message msg = new Message(
// 送信するメッセージが属する Topic。
"Order_global_topic",
// メッセージタグ。メッセージタグは Gmail タグに似ており、コンシューマーが ApsaraMQ for RocketMQ ブローカーでメッセージをフィルタリングするために使用できます。
"TagA",
// メッセージ本文。メッセージ本文はバイナリ形式のデータです。ApsaraMQ for RocketMQ はメッセージ本文を処理しません。プロデューサーとコンシューマーは、メッセージ本文をシリアル化および逆シリアル化するために使用されるメソッドについて合意する必要があります。
"send order global msg".getBytes()
);
// メッセージキー。キーは、メッセージのビジネス固有の属性であり、可能な限りグローバルに一意である必要があります。
// メッセージを期待どおりに受信できない場合は、キーを使用して ApsaraMQ for RocketMQ コンソールでメッセージをクエリし、再度送信できます。
// 注: メッセージキーを指定しなくても、メッセージを送受信できます。
msg.setKey(orderId);
// 順序付けされたメッセージで使用される、パーティションを識別するためのキーフィールド。シャーディングキーは、通常のメッセージのキーとは異なります。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = producer.send(msg, shardingKey);
// メッセージを送信します。例外がスローされない場合、メッセージは送信されます。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// メッセージの送信に失敗し、再送信が必要な場合に、メッセージを再送信または永続化するロジックを指定します。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// アプリケーションを終了する前に、プロデューサーをシャットダウンします。
// 注: プロデューサーをシャットダウンすると、メモリを節約できます。頻繁にメッセージを送信する必要がある場合は、プロデューサーをシャットダウンしないでください。
producer.shutdown();
}
}
順序付けされたメッセージをサブスクライブする
サンプルコード:
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import java.util.Properties;
public class ConsumerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// ApsaraMQ for RocketMQ コンソールで作成したコンシューマーグループの ID。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
// 認証に使用される AccessKey ID。
properties.put(PropertyKeyConst.AccessKey,"XXX");
// 認証に使用される AccessKey シークレット。
properties.put(PropertyKeyConst.SecretKey,"XXX");
// TCP エンドポイント。エンドポイントは、ApsaraMQ for RocketMQ コンソールのインスタンスの詳細ページの TCP エンドポイントセクションで取得できます。
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
// システムが順序付けされたメッセージの使用に失敗した場合にリトライが実行されるまでのタイムアウト期間 (ミリ秒)。有効な値: 10 ~ 30,000。
properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
// メッセージの使用に失敗した場合にメッセージに対して実行できる最大リトライ回数。
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
// コンシューマーを使用してメッセージをサブスクライブする前に、start() メソッドを 1 回だけ呼び出してコンシューマーを起動します。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
consumer.subscribe(
// サブスクライブするメッセージが属する Topic。
"Order_global_topic",
// 指定された Topic 内の指定されたタグを含むメッセージをサブスクライブします。
// 1. アスタリスク (*) は、コンシューマーがすべてのメッセージをサブスクライブすることを指定します。
// 2. TagA || TagB || TagC は、コンシューマーが TagA、TagB、または TagC を含むメッセージをサブスクライブすることを指定します。
"*",
new MessageOrderListener() {
/**
* 1. メッセージの使用に失敗した場合、またはメッセージ処理中に例外が発生した場合、OrderAction.Suspend が返されます。
* 2. メッセージが処理された場合、OrderAction.Success が返されます。
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
System.out.println(message);
return OrderAction.Success;
}
});
consumer.start();
}
}