すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:トランザクションメッセージ

最終更新日:Mar 12, 2026

トランザクションメッセージは、ApsaraMQ for RocketMQ が提供する特殊なメッセージタイプです。ローカルトランザクションとメッセージ配信のいずれもが成功するか、あるいは失敗することを保証します。この二相コミットメカニズムにより、XA (eXtended Architecture) 分散トランザクションのようなリソースロックのオーバーヘッドなしに、コアサービスとそのダウンストリームコンシューマーの同期を保つことができます。

Distributed transaction requirements

トランザクションメッセージは、次のようなシナリオで使用します。

  • 注文システムは、データベースを更新し、および物流、ポイント、およびカートサービスに通知する必要があります。

  • 支払いサービスは、借方取引を記録および下流の元帳コンシューマーにイベントを配信する必要があります。

  • メッセージの配信とローカルトランザクションの完了が一致しない (またはその逆) ことで、システムが不整合な状態に陥る可能性がある場合。

トランザクションメッセージの仕組み

通常メッセージの課題

ローカルデータベースのトランザクションと通常メッセージの送信を組み合わせると、一方が成功し、もう一方が失敗する可能性が生じます。

  • メッセージは送信されたが、ローカルトランザクションが失敗した場合。ダウンストリームコンシューマーは、未コミットの変更に基づいて動作してしまいます。

  • ローカルトランザクションはコミットされたが、メッセージの送信が失敗した場合。ダウンストリームコンシューマーは、変更を検知できません。

  • タイムアウトが発生し、プロデューサーもブローカーもコミットすべきかロールバックすべきかを判断できない場合。

Normal message solution

XA トランザクションのコスト課題

XA プロトコルはシステム間で分散トランザクションを調整できますが、トランザクションの全期間にわたってリソースをロックします。参加するシステム数が増えるにつれて、ロックの競合が増加し、スループットが低下します。

ハーフメッセージによる二相コミット

ApsaraMQ for RocketMQ のトランザクションメッセージは、これらの問題を両方とも回避する二相コミットプロトコルを使用します。

Transactional message solution

  1. ハーフメッセージの送信。プロデューサーはブローカーにメッセージを送信します。ブローカーはそれを永続化し、プロデューサーに確認応答 (ACK) を返します。このメッセージは、*配信準備未完了*としてマークされます。この状態のメッセージはハーフメッセージと呼ばれます。ダウンストリームコンシューマーはまだこのメッセージを参照できません。

  2. ローカルトランザクションの実行。プロデューサーは、ローカルのデータベース操作 (例:注文ステータスを*未払い*から*支払い済み*に更新) を実行します。

  3. コミットまたはロールバック。プロデューサーは、ローカルトランザクションの結果をブローカーに報告します。

    • コミット:ブローカーはハーフメッセージを*配信準備完了*としてマークし、コンシューマーに配信します。

    • ロールバック:ブローカーはハーフメッセージを破棄します。コンシューマーがこのメッセージを受信することはありません。

  4. トランザクションステータスチェック (回復)。ネットワーク障害やプロデューサーの再起動により、ブローカーがコミットまたはロールバックの結果を受信しなかった場合、ブローカーはクラスター内のプロデューサーインスタンスにステータスクエリを送信します。プロデューサーはローカルトランザクションの結果を確認し、ブローカーに再報告します。

Transaction status check workflow

説明

クエリ間隔と最大リトライ回数については、「パラメーターの制限」をご参照ください。

メッセージのライフサイクル

トランザクションメッセージは、以下の状態を遷移します。

Transactional message lifecycle

状態説明
初期化プロデューサーがハーフメッセージをビルドし、ブローカーへの送信を準備します。
コミット待ちトランザクションブローカーはハーフメッセージをトランザクションストレージシステムに格納します。通常メッセージとは異なり、ハーフメッセージはブローカーによって標準的な方法では永続化されません。このメッセージはコンシューマーからは非表示です。
消費のためのコミット済みローカルトランザクションが成功します。ブローカーはハーフメッセージをストレージシステムに格納し、コンシューマーから参照できるようにします。
メッセージのロールバックローカルトランザクションが失敗します。ブローカーはハーフメッセージを破棄します。ワークフローは終了します。
消費中コンシューマーがメッセージを取得し、処理を開始します。設定されたタイムアウト時間内にコンシューマーが結果を返さない場合、ApsaraMQ for RocketMQ は配信をリトライします。詳細については、「消費リトライ」をご参照ください。
消費結果のコミットコンシューマーが消費結果をコミットします。メッセージは消費済みとしてマークされますが、すぐには削除されません。
メッセージの削除メッセージ保持期間が終了するか、ストレージ容量が少なくなると、ApsaraMQ for RocketMQ は最も古いメッセージから順次削除します。「メッセージのストレージとクリーンアップ」をご参照ください。

デフォルトでは、ApsaraMQ for RocketMQ はすべてのメッセージを保持します。消費済みのメッセージはすぐには削除されず、保持期間が終了するか、ストレージ容量が再利用されるまで、コンシューマーは再消費できます。

トランザクションメッセージの送信 (Java)

前提条件

開始する前に、以下が準備できていることを確認してください。

  • ApsaraMQ for RocketMQ コンソールで、MessageTypeTransaction に設定されたトピック。

  • インスタンスのエンドポイント([インスタンス詳細] ページの [エンドポイント] タブ)

  • (該当する場合) インスタンスのユーザー名とパスワード ([アクセス制御] ページの [インテリジェント認証] タブから取得)

通常メッセージとの違い

トランザクションメッセージの送信は、通常メッセージの送信と 2 つの点で異なります。

  • トランザクションチェッカーが必要。プロデューサーをビルドする際に、トランザクションチェッカーを登録します。このチェッカーは、障害発生後にブローカーがトランザクションステータスをクエリする場合に自動的に実行されます。

  • トピックバインディングが必要。ビルド時にターゲットトピックをプロデューサーにバインドすることで、組み込みのチェッカーがトランザクションステータスを回復できるようになります。

サンプルコード

トランザクションチェッカーを使用してプロデューサーをビルドし、トランザクションを開始し、ハーフメッセージを送信し、ローカルトランザクションを実行してから、コミットまたはロールバックします。

サンプル コード

import java.time.Duration;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.shaded.com.google.common.base.Strings;

public class ProducerTransactionMessageExample {

    // データベースに注文が存在するかどうかのチェックをシミュレートします。
    private static boolean checkOrderById(String orderId) {
        return true;
    }

    // ローカルトランザクション (例:注文レコードの挿入) をシミュレートします。
    private static boolean doLocalTransaction() {
        return true;
    }

    public static void main(String[] args) throws ClientException {
        // ご利用のインスタンスエンドポイントに置き換えてください。
        // これは、ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページの
        // [エンドポイント] タブで確認できます。
        String endpoints = "<your-instance-endpoint>";

        // トピックの MessageType は Transaction に設定する必要があります。
        String topic = "<your-transaction-topic>";

        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints);

        // 認証:
        // - パブリックエンドポイント:ユーザー名とパスワードを指定します (これらは
        //   [アクセス制御] ページの [インテリジェント認証] タで確認できます)。
        // - ECS 上の VPC エンドポイント:認証情報は不要です。ブローカーが VPC から
        //   解決します。
        // - サーバーレスインスタンス:アクセス方法に関係なく、常に認証情報を
        //   指定します。
        builder.setCredentialProvider(
            new StaticSessionCredentialsProvider("<your-username>", "<your-password>"));
        builder.setRequestTimeout(Duration.ofMillis(5000));
        ClientConfiguration configuration = builder.build();

        MessageBuilder messageBuilder = new MessageBuilderImpl();

        // トランザクションチェッカーを使用してプロデューサーをビルドします。
        // このチェッカーは、コミット/ロールバックの結果が受信されなかったハーフメッセージを
        // ブローカーがクエリする際に実行されます。
        Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                // ハーフメッセージにアタッチされた注文 ID を検索します。
                // 注文がデータベースに存在する場合、ローカルトランザクションは
                // 正常にコミットされています。それ以外の場合は、ロールバックします。
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId)
                    ? TransactionResolution.COMMIT
                    : TransactionResolution.ROLLBACK;
            })
            .setTopics(topic)
            .setClientConfiguration(configuration)
            .build();

        // ステップ 1:トランザクションを開始します。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            return;
        }

        // ステップ 2:ハーフメッセージをビルドして送信します。
        Message message = messageBuilder.setTopic(topic)
            .setKeys("messageKey1")
            .setTag("messageTag")
            // 後でトランザクションチェッカーがクエリできるように、ビジネス ID をアタッチします。
            .addProperty("OrderId", "xxx")
            .setBody("messageBody".getBytes())
            .build();

        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            // ハーフメッセージの送信に失敗しました。トランザクションはここで終了します。
            return;
        }

        // ステップ 3:ローカルトランザクションを実行します。
        boolean localTransactionOk = doLocalTransaction();

        // ステップ 4:ローカルトランザクションの結果に基づいてコミットまたはロールバックします。
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // コミット呼び出しが失敗した場合、ブローカーは
                // トランザクションチェッカーを呼び出してステータスを解決します。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // エラーをログに記録します。ブローカーはトランザクションチェッカーを
                // 呼び出してステータスを解決します。
                e.printStackTrace();
            }
        }
    }
}

以下のプレースホルダーを実際の値に置き換えてください。

プレースホルダー説明
<your-instance-endpoint>インスタンス エンドポイント([エンドポイント] タブの [インスタンスの詳細] ページから)xxx-hangzhou.rmq.aliyuncs.com:8080
<your-transaction-topic>MessageTypeTransactionorder-tx-topic
<your-username>インスタンスユーザー名([アクセス制御] ページの [インテリジェント認証] タブから)MjoxODgwNzcwODY5MD****
<your-password>インスタンスのパスワードNEh6cm9FVUl****

サポートされているすべての言語の完全な SDK サンプルについては、「Apache RocketMQ 5.x SDK」をご参照ください。

ベストプラクティス

不明なトランザクション結果の最小化

トランザクションステータスチェックは、コミットまたはロールバック中の障害に対するセーフティネットとして存在します。ステータスチェックの量が多いと、システムのパフォーマンスが低下し、メッセージ配信が遅延します。ローカルトランザクションは、明確なコミットまたはロールバックの結果をできるだけ早く返すように設計してください。

進行中トランザクションの適切な処理

ブローカーがハーフメッセージのステータスをクエリした際にローカルトランザクションがまだ実行中の場合は、CommitRollback ではなく、Unknown を返してください。時期尚早な結果を返すと、データの不整合を引き起こす可能性があります。

ローカルトランザクションが遅いためにステータスチェックが早すぎる場合は、以下のアプローチを検討してください。

  • 最初のチェック遅延を増やす。ブローカーが最初のステータスクエリを送信するまでの間隔を長く設定します。トレードオフ:これにより、本当に失敗したトランザクションの回復も遅延します。

  • 進行中の状態を明示的に検出する。ローカルトランザクションのロジックを、「実行中」と「失敗」を区別できるように設計し、チェッカーが正しいステータスを返すようにします。

制限事項

制約詳細
トピックタイプトランザクションメッセージには、MessageTypeTransaction に設定されたトピックが必要です。
トランザクションごとに 1 つの SendReceipt各トランザクションは、1 つの SendReceipt のみをサポートします。
結果整合性のみトランザクションメッセージは、ローカルトランザクションとメッセージ配信の間の整合性を保証します。ダウンストリームコンシューマー間でのリアルタイムの整合性は保証しません。メッセージが配信されるまで、ダウンストリームの状態はアップストリームのトランザクションより遅れる可能性があります。トランザクションメッセージは、非同期のダウンストリーム処理が許容される場合にのみ使用してください。
コンシューマー側の責任ApsaraMQ for RocketMQ はコミットされたメッセージが配信されることを保証しますが、各ダウンストリームコンシューマーは処理を正しく行う必要があります。一時的な障害に対応するために、消費リトライロジックを実装してください。「消費リトライ」をご参照ください。
トランザクションのタイムアウト設定されたタイムアウト時間と最大リトライ回数が経過してもブローカーがトランザクションの結果を判断できない場合、デフォルトでハーフメッセージをロールバックします。「パラメーターの制限」をご参照ください。