ApsaraMQ for RocketMQ は、トランザクションの一貫性を確保するために、eXtended Architecture (X/Open XA) に類似した分散トランザクション処理を提供します。このトピックでは、TCP クライアント SDK for Java を使用してトランザクションメッセージを送受信する方法について説明します。
ApsaraMQ for RocketMQ を初めてご利用になる場合は、メッセージを送受信する前に、デモプロジェクト を参照して作業プロジェクトを設定してください。
トランザクションメッセージの仕組み
次の図は、トランザクションメッセージ配信中にプロデューサー、ブローカー、およびローカルトランザクションがどのように連携するかを示しています。

トランザクションメッセージモデルの詳細については、「トランザクションメッセージ」をご参照ください。
前提条件
開始する前に、以下を準備してください。
トランザクションメッセージの送信
トランザクションメッセージの送信には、次の 3 つのコンポーネントが必要です。
メッセージを送信するための
TransactionProducer。ハーフメッセージが送信されたときにローカルトランザクションを実行するための
LocalTransactionExecuter。未解決のトランザクションを検証するためにブローカーが呼び出す
LocalTransactionChecker。
完全なソースコードについては、「ApsaraMQ for RocketMQ コードライブラリ」をご参照ください。
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// ApsaraMQ for RocketMQ コンソールで作成されたコンシューマーグループ ID。
// トランザクションメッセージのコンシューマーグループ ID は、
// 他のメッセージタイプで使用される ID と同じにすることはできません。
properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
// 環境変数から AccessKey 認証情報を取得し、
// 機密情報のハードコーディングを避けます。
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// TCP エンドポイント。この値は、
// ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページの [TCP エンドポイント] セクションで確認できます。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
// プロデューサーを作成する前に、トランザクションチェッカーを登録します。
// ブローカーは、未解決のトランザクションを検証するためにこのチェッカーを呼び出します。
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("<your-topic>", "TagA", "Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try {
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
// ここでローカルトランザクションロジックを実行します。
System.out.println("Execute the local transaction and commit the transaction status.");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
} catch (ONSClientException e) {
// 失敗を処理します。再送信するか、後続処理のためにメッセージを永続化します。
System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Send transaction message success.");
}
}次のプレースホルダーを実際の値に置き換えてください。
| プレースホルダー | 説明 | 取得元 |
|---|---|---|
<your-group-id> | トランザクションメッセージのコンシューマーグループ ID | ApsaraMQ for RocketMQ コンソール |
<your-tcp-endpoint> | ご利用のインスタンスの TCP エンドポイント | [インスタンス詳細] ページ > [TCP エンドポイント] セクション |
<your-topic> | Topic 名 | ApsaraMQ for RocketMQ コンソール |
トランザクションチェッカーの実装
ブローカーは、ハーフメッセージのステータスを判断できない場合にトランザクションチェッカーを呼び出します。これは、次の場合に発生します。
LocalTransactionExecuterはTransactionStatus.Unknowを返します。プロデューサーがトランザクションステータスをコミットする前に終了する場合。
いずれの場合も、ブローカーはプロデューサークラスター内のプロデューサーに定期的にステータスチェックリクエストを送信します。プロデューサーはローカルトランザクションをチェックし、最終ステータスを報告します。
// トランザクションチェッカーの実装
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("Received transaction status check request. MsgId: " + msg.getMsgID());
// ローカルトランザクションステータスを照会し、結果を返します。
// CommitTransaction、RollbackTransaction、または Unknow を返します。
return TransactionStatus.CommitTransaction;
}
}ご利用の check メソッドは、次の処理を実行する必要があります。
ハーフメッセージに対応するローカルトランザクションのステータス (コミット済みまたはロールバック済み) をチェックします。
トランザクションステータスをブローカーに返します。
TransactionStatus.CommitTransaction、TransactionStatus.RollbackTransaction、またはTransactionStatus.Unknow。
トランザクションメッセージのサブスクライブ
トランザクションメッセージのサブスクライブは、通常メッセージのサブスクライブと同じです。詳細については、「メッセージのサブスクライブ」をご参照ください。