すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:トランザクションメッセージの送受信

最終更新日:Mar 12, 2026

ApsaraMQ for RocketMQ は、トランザクションの一貫性を確保するために、eXtended Architecture (X/Open XA) に類似した分散トランザクション処理を提供します。このトピックでは、TCP クライアント SDK for Java を使用してトランザクションメッセージを送受信する方法について説明します。

説明

ApsaraMQ for RocketMQ を初めてご利用になる場合は、メッセージを送受信する前に、デモプロジェクト を参照して作業プロジェクトを設定してください。

トランザクションメッセージの仕組み

次の図は、トランザクションメッセージ配信中にプロデューサー、ブローカー、およびローカルトランザクションがどのように連携するかを示しています。

Transactional message interaction process

トランザクションメッセージモデルの詳細については、「トランザクションメッセージ」をご参照ください。

前提条件

開始する前に、以下を準備してください。

  • SDK for Java をダウンロード済みであること。バージョン情報については、「リリースノート」をご参照ください。

  • 開発環境を設定済みであること。詳細については、「環境準備」をご参照ください。

  • (オプション) ロギングを設定済みであること。詳細については、「ロギング設定」をご参照ください。

トランザクションメッセージの送信

トランザクションメッセージの送信には、次の 3 つのコンポーネントが必要です。

  • メッセージを送信するための TransactionProducer

  • ハーフメッセージが送信されたときにローカルトランザクションを実行するための LocalTransactionExecuter

  • 未解決のトランザクションを検証するためにブローカーが呼び出す LocalTransactionChecker

完全なソースコードについては、「ApsaraMQ for RocketMQ コードライブラリ」をご参照ください。

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // ApsaraMQ for RocketMQ コンソールで作成されたコンシューマーグループ ID。
        // トランザクションメッセージのコンシューマーグループ ID は、
        // 他のメッセージタイプで使用される ID と同じにすることはできません。
        properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
        // 環境変数から AccessKey 認証情報を取得し、
        // 機密情報のハードコーディングを避けます。
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // TCP エンドポイント。この値は、
        // ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページの [TCP エンドポイント] セクションで確認できます。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");

        // プロデューサーを作成する前に、トランザクションチェッカーを登録します。
        // ブローカーは、未解決のトランザクションを検証するためにこのチェッカーを呼び出します。
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("<your-topic>", "TagA", "Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try {
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        // ここでローカルトランザクションロジックを実行します。
                        System.out.println("Execute the local transaction and commit the transaction status.");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            } catch (ONSClientException e) {
                // 失敗を処理します。再送信するか、後続処理のためにメッセージを永続化します。
                System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Send transaction message success.");
    }
}

次のプレースホルダーを実際の値に置き換えてください。

プレースホルダー説明取得元
<your-group-id>トランザクションメッセージのコンシューマーグループ IDApsaraMQ for RocketMQ コンソール
<your-tcp-endpoint>ご利用のインスタンスの TCP エンドポイント[インスタンス詳細] ページ > [TCP エンドポイント] セクション
<your-topic>Topic 名ApsaraMQ for RocketMQ コンソール

トランザクションチェッカーの実装

ブローカーは、ハーフメッセージのステータスを判断できない場合にトランザクションチェッカーを呼び出します。これは、次の場合に発生します。

  • LocalTransactionExecuterTransactionStatus.Unknow を返します。

  • プロデューサーがトランザクションステータスをコミットする前に終了する場合。

いずれの場合も、ブローカーはプロデューサークラスター内のプロデューサーに定期的にステータスチェックリクエストを送信します。プロデューサーはローカルトランザクションをチェックし、最終ステータスを報告します。

// トランザクションチェッカーの実装
class LocalTransactionCheckerImpl implements LocalTransactionChecker {

    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("Received transaction status check request. MsgId: " + msg.getMsgID());
        // ローカルトランザクションステータスを照会し、結果を返します。
        // CommitTransaction、RollbackTransaction、または Unknow を返します。
        return TransactionStatus.CommitTransaction;
    }
}

ご利用の check メソッドは、次の処理を実行する必要があります。

  1. ハーフメッセージに対応するローカルトランザクションのステータス (コミット済みまたはロールバック済み) をチェックします。

  2. トランザクションステータスをブローカーに返します。TransactionStatus.CommitTransactionTransactionStatus.RollbackTransaction、または TransactionStatus.Unknow

トランザクションメッセージのサブスクライブ

トランザクションメッセージのサブスクライブは、通常メッセージのサブスクライブと同じです。詳細については、「メッセージのサブスクライブ」をご参照ください。