利用シーン
順序付きイベント処理、取引におけるマッチメイキング、リアルタイムの増分データ同期などのシナリオでは、異種システム間で状態同期を通じて強力な整合性を維持する必要があります。これらのシナリオでは、上流システムからのイベント変更が下流システムに順序通りに渡されることが求められます。ApsaraMQ for RocketMQ の順序付きメッセージは、このようなケースでのデータ転送の順序を保証します。
シナリオ 1:取引におけるマッチメイキング
例えば、証券や株式の取引において、複数の入札が同じ価格であった場合、「最初に入札したものが最初に取引される」という原則が適用されます。下流の注文処理システムは、入札シーケンスに厳密に従って注文を処理する必要があります。
シナリオ 2:リアルタイムの増分データ同期
図 1. 通常メッセージ
図 2. 順序付きメッセージ
例えば、データベースの変更同期シナリオでは、上流のソースデータベースが必要に応じて追加、削除、変更操作を実行します。そして、バイナリ操作ログをメッセージとして ApsaraMQ for RocketMQ を通じて下流の検索システムに送信します。下流システムはメッセージデータを順序通りに復元し、状態データを正しい順序で更新し続けます。通常メッセージを使用すると、期待される操作結果と一致しない順序不同の状態になる可能性があります。順序付きメッセージは、下流の状態が上流の操作結果と一致することを保証します。
仕組み
順序付きメッセージとは
順序付きメッセージは、ApsaraMQ for RocketMQ の高度なメッセージタイプです。これにより、コンシューマーはメッセージが送信されたのと同じ順序でメッセージを取得できます。これにより、ビジネスシナリオでのシーケンシャルな処理が可能になります。
他のメッセージタイプと比較して、順序付きメッセージは送信、ストレージ、配信の過程における複数のメッセージ間のシーケンシャルな関係を重視します。
ApsaraMQ for RocketMQ は、メッセージグループを使用して順序付きメッセージのシーケンスを識別し、決定します。順序付きメッセージを送信する際は、各メッセージにメッセージグループを設定する必要があります。
重要
順序が保証されるのは、同じメッセージグループ内のメッセージのみです。異なるメッセージグループのメッセージ間や、グループのないメッセージ間にはシーケンシャルな関係はありません。
メッセージグループに基づいて順序を決定するロジックにより、ビジネスロジックに基づいた詳細なパーティショニングが可能になります。これにより、ローカルなシーケンシャル要件を満たしながら、システムの並列性とスループットを向上させることができます。
メッセージの順序を保証する方法
ApsaraMQ for RocketMQ におけるメッセージの順序は、送信順序と消費順序の 2 つの部分から構成されます。
送信順序 : ApsaraMQ for RocketMQ は、プロデューサーとサーバー間のプロトコルを使用して、単一のプロデューサーがメッセージをシリアルに送信することを保証します。メッセージはその後、同じ順序で保存され、永続化されます。
メッセージの送信順序を保証するには、以下の条件を満たす必要があります:
同じメッセージグループ
送信順序の範囲はメッセージグループです。プロデューサーは、送信する各メッセージにメッセージグループを設定できます。順序が保証されるのは、同じグループ内のメッセージのみです。異なるグループのメッセージやグループのないメッセージは、順序が保証されません。
単一のプロデューサー
送信順序は、単一のプロデューサーでのみサポートされます。異なるプロデューサーが異なるシステムに存在する場合、同じメッセージグループを使用していても、そのメッセージの順序は決定できません。
シリアル送信
ApsaraMQ for RocketMQ のプロデューサークライアントはマルチスレッドアクセスをサポートしています。ただし、プロデューサーが複数のスレッドを使用してメッセージを並行して送信する場合、異なるスレッドからのメッセージの順序は決定できません。
これらの条件を満たすプロデューサーが順序付きメッセージを ApsaraMQ for RocketMQ に送信すると、同じメッセージグループを持つメッセージは、送信された順序で同じキューに保存されます。サーバー側のストレージロジックは次のとおりです:
図に示すように、メッセージグループ 1 とメッセージグループ 4 のメッセージはキュー 1 に混在しています。ApsaraMQ for RocketMQ は、メッセージグループ 1 のメッセージ G1-M1、G1-M2、G1-M3 が送信された順序で保存されることを保証します。また、メッセージグループ 4 のメッセージ G4-M1 と G4-M2 が順序通りに保存されることも保証します。ただし、メッセージグループ 1 とメッセージグループ 4 のメッセージ間にはシーケンシャルな関係はありません。
消費順序 : ApsaraMQ for RocketMQ は、コンシューマーとサーバー間のプロトコルを使用して、メッセージが保存された順序で厳密に消費されることを保証します。
メッセージの消費順序を保証するには、以下の条件を満たす必要があります:
配送オーダー
ApsaraMQ for RocketMQ は、クライアント SDK とサーバー通信プロトコルを使用して、メッセージが保存された順序で配信されることを保証します。ただし、メッセージを消費する際には、非同期処理による順序の乱れを避けるために、ご利用のビジネス側で受信-処理-確認セマンティクスに厳密に従う必要があります。
重要
コンシューマータイプが PushConsumer の場合、ApsaraMQ for RocketMQ はメッセージが保存された順序で 1 つずつコンシューマーに配信されることを保証します。コンシューマータイプが SimpleConsumer の場合、コンシューマーは一度に複数のメッセージをプルする可能性があります。この場合、ご利用のビジネス側で消費順序を保証する必要があります。コンシューマータイプの詳細については、「コンシューマータイプ 」をご参照ください。
リトライ回数の制限
順序付きメッセージの場合、ApsaraMQ for RocketMQ は配信リトライの回数を制限します。最大リトライ回数を超えてもメッセージの消費に失敗した場合、そのメッセージはスキップされます。これにより、メッセージが後続のメッセージをブロックするのを防ぎます。
厳密な消費順序が要求されるシナリオでは、不適切なパラメーターによる順序の乱れを避けるために、リトライ回数を適切に設定してください。
生産順序と消費順序の組み合わせ
メッセージを厳密な先入れ先出し (FIFO) 順で処理する必要がある場合は、送信順序と消費順序の両方を保証する必要があります。しかし、多くのビジネスシナリオでは、単一のプロデューサーが複数の下流コンシューマーにサービスを提供し、そのすべてが順序付き消費を必要とするわけではありません。さまざまなビジネスシナリオに合わせて、送信順序と消費順序を異なる方法で組み合わせることができます。例えば、順序付きメッセージを送信し、順序付けされていない同時消費を使用してスループットを向上させることができます。次の表は、その他の組み合わせについて説明しています:
製造オーダー
消費順序
順序付け効果
メッセージグループを設定して順序付き送信を保証する。
順序付き消費
メッセージグループレベルでメッセージの順序を厳密に保証します。
同じメッセージグループ内のメッセージの消費順序は、送信順序と同一になります。
メッセージグループを設定して順序付き送信を保証できます。
同時消費
メッセージは同時に消費され、可能な限り時系列順に処理されます。
メッセージグループは設定されていません。メッセージは順序不同で送信されます。
順序付き消費
キューストレージレベルでの厳密な順序付け。
ApsaraMQ for RocketMQ のキューのプロパティに基づき、消費順序はキューの保存順序と一致しますが、必ずしも送信順序とは一致しません。
メッセージグループは設定されていません。メッセージは順序不同で送信されます。
同時消費
メッセージは同時に消費され、ベストエフォートで時系列順に処理されます。
順序付きメッセージのライフサイクル
初期化
メッセージはプロデューサーによって構築および初期化され、ブローカーへの送信準備が整います。
消費待機中
メッセージはブローカーに送信され、コンシューマーから可視で利用可能な状態になります。
消費中
メッセージはコンシューマーによって取得され、コンシューマーのローカルビジネスロジックに基づいて処理されます。
このプロセスでは、ブローカーはコンシューマーが消費結果を返すのを待ちます。特定の期間内にコンシューマーから応答がない場合、ApsaraMQ for RocketMQ はメッセージのリトライを実行します。詳細については、「消費リトライ 」をご参照ください。
消費コミット
コンシューマーは消費を完了し、消費結果をブローカーにコミットします。ブローカーは現在のメッセージが消費されたかどうかをマークします。
デフォルトでは、ApsaraMQ for RocketMQ はすべてのメッセージを保持します。消費結果がコミットされると、メッセージはすぐに削除されるのではなく、消費済みとしてマークされます。メッセージが削除されるのは、保持期間が過ぎた場合、またはシステムのストレージ容量が不足した場合のみです。メッセージが削除される前に、コンシューマーはメッセージを再消費できます。
メッセージ削除
メッセージの保持期間が過ぎた場合、またはストレージ容量が不足した場合、ApsaraMQ for RocketMQ は物理ファイルから最も古く保存されたメッセージをローリング方式で削除します。詳細については、「メッセージのストレージとクリーンアップ 」をご参照ください。
制限事項
順序付きメッセージは、MessageType が FIFO に設定されているトピックにのみ送信できます。メッセージタイプはトピックタイプと一致する必要があります。
重要
コンシューマーグループが順序付き配信を使用するように設定されている場合、そのグループが消費するすべてのメッセージは、メッセージタイプに関係なく順序付きメッセージとして課金されます。ご利用のビジネスで厳密な順序付き消費が不要な場合は、グループの配信順序を同時配信に設定することで、コストを削減できます。
消費の並行性の最適化
RocketMQ 5.x gRPC SDK を使用する必要があります。順序付き消費を実行する際、この SDK シリーズの PushConsumer クライアントは、同じ MessageQueue からのメッセージを、その MessageGroup に基づいて異なるスレッドに割り当てて同時消費させることができます。これにより、順序付きメッセージの消費の並行性が大幅に向上します 。MessageGroup の値が離散的であるほど、改善効果は大きくなります。サポートされている SDK のバージョンは次のとおりです:
Java SDK: 5.0.8 以降。
C++ SDK: 5.0.3 以降。
その他の SDK: サポートされていません。
使用例
通常メッセージとは異なり、順序付きメッセージでは送信時にメッセージグループを指定する必要があります。効果的なビジネスパーティショニングと同時スケーリングをサポートするために、ビジネスシナリオに基づいて可能な限り細かい粒度でメッセージグループを設計してください。
以下の Java コードは、順序付きメッセージを送受信する方法を示しています:
メッセージの送受信に関する完全なサンプルコードについては、「RocketMQ 5.x gRPC SDK 」をご参照ください。
サンプルコード
順序付きメッセージの送信 import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* インスタンスのエンドポイント。コンソールのインスタンス詳細ページの [エンドポイント] タブから取得します。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合は、VPC エンドポイントを使用します。
* ローカルマシンからパブリックネットワーク経由で、またはオンプレミスのデータセンターからインスタンスにアクセスする場合は、パブリックエンドポイントを使用します。パブリックエンドポイントを使用するには、インスタンスのパブリックネットワークアクセスを有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// メッセージの送信先トピック名。事前にコンソールでトピックを作成する必要があります。作成されていないトピックを使用するとエラーが返されます。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* パブリックネットワーク経由でインスタンスにアクセスする場合は、設定でインスタンスのユーザー名とパスワードも設定する必要があります。ユーザー名とパスワードは、コンソールの [アクセス制御] ページの [インテリジェント認証] タブから取得します。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、これを設定する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
* サーバーレスインスタンスを使用する場合、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由の認証不要アクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
*/
// builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 順序付きメッセージを送信します。
Message message = provider.newMessageBuilder()
.setTopic("topic")
// メッセージキーを設定します。キーを使用してメッセージを正確に見つけることができます。
.setKeys("messageKey")
// メッセージタグを設定します。コンシューマーはタグを使用してメッセージをフィルターできます。
.setTag("messageTag")
// 順序付きメッセージのメッセージグループを設定します。ホットスポットグループを避けるために、グループをできるだけ離散的に保ちます。
.setMessageGroup("fifoGroup001")
// メッセージ本文。
.setBody("messageBody".getBytes())
.build();
try {
// メッセージを送信します。送信結果に注意し、失敗などの例外をキャッチします。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
PushConsumer での消費 import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* インスタンスのエンドポイント。コンソールのインスタンス詳細ページの [エンドポイント] タブから取得します。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合は、VPC エンドポイントを使用します。
* ローカルマシンからパブリックネットワーク経由で、またはオンプレミスのデータセンターからインスタンスにアクセスする場合は、パブリックエンドポイントを使用します。パブリックエンドポイントを使用するには、インスタンスのパブリックネットワークアクセスを有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// サブスクライブするトピック。事前にコンソールでトピックを作成する必要があります。作成されていないトピックを使用するとエラーが返されます。
String topic = "Your Topic";
// コンシューマーのコンシューマーグループ。事前にコンソールでグループを作成する必要があります。作成されていないグループを使用するとエラーが返されます。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* パブリックネットワーク経由でインスタンスにアクセスする場合は、設定でインスタンスのユーザー名とパスワードも設定する必要があります。ユーザー名とパスワードは、コンソールの [アクセス制御] ページの [インテリジェント認証] タブから取得します。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、これを設定する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
* サーバーレスインスタンスを使用する場合、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由の認証不要アクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
// メッセージサブスクリプションのフィルター ルール。これは、すべてのタグがサブスクライブされることを意味します。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// PushConsumer を初期化します。コンシューマーグループ、通信パラメーター、およびサブスクリプション関係をバインドする必要があります。
// 順序付きメッセージを消費する場合、現在のコンシューマーグループが順序付き配信モードであることを確認してください。そうでない場合、メッセージは引き続き同時かつ順序不同で配信されます。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// コンシューマーグループを設定します。
.setConsumerGroup(consumerGroup)
// 事前にバインドされたサブスクリプション関係を設定します。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 消費リスナーを設定します。
.setMessageListener(messageView -> {
// メッセージを処理し、消費結果を返します。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// PushConsumer が不要になった場合は、プロセスを閉じます。
//pushConsumer.close();
}
}
SimpleConsumer での消費 import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* インスタンスのエンドポイント。コンソールのインスタンス詳細ページの [エンドポイント] タブから取得します。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合は、VPC エンドポイントを使用します。
* ローカルマシンからパブリックネットワーク経由で、またはオンプレミスのデータセンターからインスタンスにアクセスする場合は、パブリックエンドポイントを使用します。パブリックエンドポイントを使用するには、インスタンスのパブリックネットワークアクセスを有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// サブスクライブするトピック。事前にコンソールでトピックを作成する必要があります。作成されていないトピックを使用するとエラーが返されます。
String topic = "Your Topic";
// コンシューマーのコンシューマーグループ。事前にコンソールでグループを作成する必要があります。作成されていないグループを使用するとエラーが返されます。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* パブリックネットワーク経由でインスタンスにアクセスする場合は、設定でインスタンスのユーザー名とパスワードも設定する必要があります。ユーザー名とパスワードは、コンソールの [アクセス制御] ページの [インテリジェント認証] タブから取得します。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、これを設定する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
* サーバーレスインスタンスを使用する場合、パブリックネットワークアクセスのためのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由の認証不要アクセスが有効になっている場合は、内部ネットワークアクセスのためのユーザー名とパスワードを設定する必要はありません。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
// メッセージサブスクリプションのフィルター ルール。これは、すべてのタグがサブスクライブされることを意味します。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// SimpleConsumer を初期化します。コンシューマーグループ、通信パラメーター、およびサブスクリプション関係をバインドする必要があります。
// 順序付きメッセージを消費する場合、現在のコンシューマーグループが順序付き配信モードであることを確認してください。そうでない場合、メッセージは引き続き同時かつ順序不同で配信されます。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)
// コンシューマーグループを設定します。
.setConsumerGroup(consumerGroup)
// ロングポーリングのタイムアウトを設定します。
.setAwaitDuration(awaitDuration)
// 事前にバインドされたサブスクリリプション関係を設定します。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
// 今回プルするメッセージの最大数を設定します。
int maxMessageNum = 16;
// メッセージの不可視期間を設定します。
Duration invisibleDuration = Duration.ofSeconds(10);
// SimpleConsumer は、クライアントが能動的にループしてメッセージを取得し、処理する必要があります。
// リアルタイム消費を改善するために、複数のスレッドを使用してメッセージを同時にプルします。
while (true) {
// 同じ MessageGroup 内のメッセージについて、先行するメッセージが消費されていない場合、receive を再度呼び出しても後続のメッセージを取得できないことに注意してください。
final List<MessageView> messageViewList = consumer.receive(maxMessageNum, invisibleDuration);
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// 処理が完了したら、能動的に ack を呼び出して消費結果をコミットする必要があります。
try {
consumer.ack(messageView);
} catch (ClientException e) {
// システムの速度制限やその他の理由でプルが失敗した場合は、リクエストを再開してメッセージを取得する必要があります。
e.printStackTrace();
}
});
}
// SimpleConsumer が不要になった場合は、プロセスを閉じます。
// consumer.close();
}
}
順序付きメッセージの消費リトライログの取得
PushConsumer による順序付き消費のリトライは、コンシューマークライアント側で発生します。サーバーは消費リトライの詳細なログを取得できません。メッセージトレースで順序付きメッセージの配信結果が「failed」の場合、コンシューマークライアントのログで最大リトライ回数やコンシューマークライアントなどの情報を確認してください。
コンシューマークライアントのログパスの詳細については、「ログ設定 」をご参照ください。
クライアントログで以下のキーワードを検索すると、消費の失敗に関連するコンテンツをすばやく見つけることができます:
Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt times
ベストプラクティス
シリアル消費を使用し、バッチ消費を避けて順序不同の処理を防ぐ
メッセージはシリアルに消費してください。一度に複数のメッセージを消費することは避けてください。順序不同で処理される原因となる可能性があります。
例えば、メッセージが 1→2→3→4 の順序で送信されるとします。バッチ消費中、順序は 1→[2,3](バッチ処理失敗)→[2,3](リトライ)→4 となります。このシナリオでは、メッセージ 3 の失敗によりメッセージ 2 が再度処理される可能性があり、結果として順序不同の消費が発生します。
メッセージグループを分散させてホットスポットを回避する
ApsaraMQ for RocketMQ は、同じメッセージグループ内のメッセージが同じキューに保存されることを保証します。さまざまなビジネスシナリオからのメッセージが 1 つまたは少数のメッセージグループに集中すると、ストレージの負荷が特定のサーバー側キューに集中します。これにより、パフォーマンスのホットスポットが容易に発生し、スケーラビリティが妨げられる可能性があります。メッセージグループを設計する際の一般的な推奨事項は、注文 ID やユーザー ID を順序付けの基準として使用することです。これにより、同じエンドユーザーのメッセージは順序通りに処理され、異なるユーザーのメッセージは順序付けが不要になります。
したがって、メッセージグループの粒度でビジネスをパーティショニングしてください。例えば、注文 ID やユーザー ID をメッセージグループのキーとして使用します。これにより、同じエンドユーザーのメッセージは順序通りに処理され、異なるユーザーのメッセージは順序付けが不要になります。