すべてのプロダクト
Search
ドキュメントセンター

:キューメッセージの順序保証

最終更新日:Mar 14, 2025

この記事では、メッセージサービス(MNS)キューにメッセージを送信する場合、または MNS キューからメッセージを消費する場合に、メッセージの順序を保証する方法について説明します。

背景情報

シンプルメッセージキュー(旧 MNS) キューは、高い信頼性、可用性、および並行性を提供します。各キューのデータは、Apsara クラウドプラットフォームの 3 つのレプリカに永続的に保存されます。各キューには、少なくとも 2 つのサーバーがサービスを提供し、各サーバーは高度に並行したリクエストを処理できます。シンプルメッセージキュー(旧 MNS) の分散サービスのため、キューメッセージに対してFIFO(先入れ先出し)戦略を厳密に実装することはできません。

複数のクライアントが同時にキューにメッセージを送信する場合、同時リクエストとネットワークレイテンシのため、メッセージの順序は保証されません。この場合、クライアントがメッセージを送信する順序と、メッセージがサーバーに到着する順序を取得することはできません。複数のクライアントが同時にメッセージを受信する場合、メッセージが処理される順序も取得できません。

メッセージの順序は、送信側クライアントと受信側クライアントが 1 つずつ存在する場合にのみ保証されます。この場合、クライアントがメッセージを送信する順序と、メッセージがサーバーに到着する順序を取得できます。

解決策

背景情報に基づいて、MNS は、キューメッセージを送受信する際にメッセージの順序を保証するための以下の解決策を提供します。

  1. 送信側クライアントのメッセージに SeqId パラメーターを追加します。 SeqId パラメーターの値は #num# 形式です。
  2. コンシューマークライアントで SeqId パラメーターに基づいてメッセージを取得し、ソートします。メッセージが繰り返し消費されないように、バックエンドスレッドが使用されます。
  3. SeqId パラメーターの値は、永続ストレージのためにローカルディスクファイルにバックアップされます。これにより、送信側または受信側クライアントに障害が発生した場合でも、値は失われません。オブジェクトストレージサービス(OSS)、Tablestore、ApsaraDB RDS データベースなど、他の Alibaba Cloud サービスを使用して SeqId パラメーターの値を保存することもできます。Flowchart

注意事項

  • サンプルコードを本番環境に適用する前に、テストすることをお勧めします。
  • 送信側クライアントと受信側クライアントの SeqId パラメーターの値は、キュー内のメッセージと一致します。キューを削除または作成する場合は、ローカルディスクファイルの SeqId パラメーターの値がキュー内のメッセージと一致するかどうかを確認します。キューが SeqId パラメーターが指定されたメッセージのみを受信する場合は、SeqId パラメーターが指定されていないメッセージをキューに送信しないことをお勧めします。
  • キューメッセージの保持期間と実際の処理結果は、メッセージの順序に影響を与える可能性があります。コードを使用してこの問題を解決する必要があります。

サンプルコード

Python サンプルコードを例として使用します。詳細については、SDK をご参照ください。サンプルコードは、Python 用 MNS SDK でサポートされています。 OrderedQueueWrapper クラス(oredered_queue.py ファイル)を使用すると、MNS キューを順序付きキューに変更できます。

OrderedQueueWrapper クラスは、SendMessageInOrder()メソッドと ReceiveMessageInOrder()メソッドを提供します。

  • SendMessageInOrder()メソッドは、SeqId パラメーターが指定されたメッセージを送信するために使用されます。
  • ReceiveMessageInOrder()メソッドは、コンシューマークライアントでメッセージを取得してソートするために使用されます。

メッセージを送受信するために使用されるサンプルコードは、OrderedQueueWrapper パッケージの send_message_in_order.py ファイルと receive_message_in_order.py ファイルに提供されています。

send_message_in_order.py:
    # 順序付きキューの初期化
    seqIdConfig = {"localFileName":"/tmp/mns_send_message_seq_id"}  # SeqId パラメーターの値を永続的に保存するディスクファイルを指定します。
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, sendSeqIdPersistStorage = seqIdPS)
    orderedQueue.SendMessageInOrder(message)

receive_message_in_order.py:
    # 順序付きキューの初期化
    seqIdConfig = {"localFileName":"/tmp/mns_receive_message_seq_id"} # SeqId パラメーターの値を永続的に保存するディスクファイルを指定します。
    seqIdPS = LocalDiskStorage(seqIdConfig)
    orderedQueue = OrderedQueueWrapper(myQueue, receiveSeqIdPersistStorage = seqIdPS)
    recv_msg = orderedQueue.ReceiveMessageInOrder(wait_seconds)

実行方法

  1. g_endpointg_accessKeyIdg_accessKeySecret、および g_testQueueName パラメーターを send_message_in_order.py ファイルと receive_message_in_order.py ファイルに設定します。
  2. send_message_in_order.py ファイルのサンプルコードを実行します。
    python send_message_in_order.py
  3. receive_message_in_order.py ファイルのサンプルコードを実行します。
    python receive_message_in_order.py

    送信側クライアントは 20 件のメッセージを送信し、受信側クライアントはこれら 20 件のメッセージを順番に消費します。

    Ordered consumption

oredered_queue.py ファイルのサンプルコードを実行して、順序付けされていないキューと順序付けされたキューを比較することもできます。次のコマンドを実行します。

python ordered_queue.py
説明 oredered_queue.py ファイルのエンドポイントと AccessKey ペアを設定する必要があります。

戻り値

次の 2 つの結果が返されます。

  • 順序付けされていないキューUnordered queue

    メッセージは厳密に順序付けされていません。これは、単一の MNS キューに対して複数のサーバーが同時にリクエストを処理していることを示しています。

  • 順序付けされたキューOrdered queue