ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang mirip dengan eXtended Architecture (X/Open XA) untuk memastikan konsistensi transaksi di ApsaraMQ for RocketMQ. Topik ini memberikan contoh kode tentang cara mengirim dan menerima pesan transaksional menggunakan SDK Klien HTTP untuk Java.
Informasi latar belakang
Gambar berikut menunjukkan proses interaksi pesan transaksional.

Untuk informasi lebih lanjut, lihat Pesan Transaksional.
Prasyarat
Sebelum memulai, pastikan operasi berikut telah dilakukan:
Instal SDK untuk Java. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Dapatkan pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan transaksional
Contoh kode berikut menunjukkan cara mengirim pesan transaksional menggunakan SDK Klien HTTP untuk Java:
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQTransProducer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.List;
public class TransProducer {
static void processCommitRollError(Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Kesalahan Commit/Roll transaksi, requestId adalah:" + errors.getRequestId() + ", gagal handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", Kode Kesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", Pesan Kesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
}
}
public static void main(String[] args) throws Throwable {
MQClient mqClient = new MQClient(
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
// ID AccessKey yang digunakan untuk otentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// Rahasia AccessKey yang digunakan untuk otentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
// Setiap topik dapat digunakan untuk mengirim dan menerima pesan jenis tertentu. Misalnya, topik yang digunakan untuk mengirim dan mengonsumsi pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
final String topic = "${TOPIC}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
final String instanceId = "${INSTANCE_ID}";
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
final String groupId = "${GROUP_ID}";
final MQTransProducer mqTransProducer = mqClient.getTransProducer(instanceId, topic, groupId);
for (int i = 0; i < 4; i++) {
TopicMessage topicMessage = new TopicMessage();
topicMessage.setMessageBody("trans_msg");
topicMessage.setMessageTag("a");
topicMessage.setMessageKey(String.valueOf(System.currentTimeMillis()));
// Interval waktu antara waktu pengiriman pesan transaksional dan waktu mulai pemeriksaan pertama status transaksi lokal. Interval waktu ini menentukan waktu relatif ketika status diperiksa pertama kali. Unit: detik. Nilai valid: 10 hingga 300.
// Jika pesan tidak dikomit atau dibatalkan setelah pemeriksaan status transaksi pertama dilakukan, broker memulai permintaan untuk memeriksa status transaksi lokal pada interval 10 detik dalam 24 jam ke depan.
topicMessage.setTransCheckImmunityTime(10);
topicMessage.getProperties().put("a", String.valueOf(i));
TopicMessage pubResultMsg = null;
pubResultMsg = mqTransProducer.publishMessage(topicMessage);
System.out.println("Kirim---->msgId adalah: " + pubResultMsg.getMessageId()
+ ", bodyMD5 adalah: " + pubResultMsg.getMessageBodyMD5()
+ ", Handle: " + pubResultMsg.getReceiptHandle()
);
if (pubResultMsg != null && pubResultMsg.getReceiptHandle() != null) {
if (i == 0) {
// Setelah produsen mengirim pesan transaksional, broker mendapatkan handle dari pesan setengah yang sesuai dengan pesan transaksional dan mengomitm atau membatalkan pesan transaksional berdasarkan status handle.
try {
mqTransProducer.commit(pubResultMsg.getReceiptHandle());
System.out.println(String.format("MessageId:%s, commit", pubResultMsg.getMessageId()));
} catch (Throwable e) {
// Jika pesan transaksional tidak dikomit atau dibatalkan sebelum periode waktu yang ditentukan oleh parameter TransCheckImmunityTime untuk handle pesan transaksional berakhir, operasi commit atau rollback gagal.
if (e instanceof AckMessageException) {
processCommitRollError(e);
continue;
}
}
}
}
}
// Klien membutuhkan thread atau proses untuk memproses pesan transaksional yang belum diakui.
// Mulai thread untuk memproses pesan transaksional yang belum diakui.
Thread t = new Thread(new Runnable() {
public void run() {
int count = 0;
while(true) {
try {
if (count == 3) {
break;
}
List<Message> messages = mqTransProducer.consumeHalfMessage(3, 3);
if (messages == null) {
System.out.println("Tidak ada pesan setengah!");
continue;
}
System.out.println(String.format("Setengah---->MessageId:%s,Properties:%s,Body:%s,Latency:%d",
messages.get(0).getMessageId(),
messages.get(0).getProperties(),
messages.get(0).getMessageBodyString(),
System.currentTimeMillis() - messages.get(0).getPublishTime()));
for (Message message : messages) {
try {
if (Integer.valueOf(message.getProperties().get("a")) == 1) {
// Konfirmasi untuk mengomitm pesan transaksional.
mqTransProducer.commit(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
} else if (Integer.valueOf(message.getProperties().get("a")) == 2
&& message.getConsumedTimes() > 1) {
// Konfirmasi untuk mengomitm pesan transaksional.
mqTransProducer.commit(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
} else if (Integer.valueOf(message.getProperties().get("a")) == 3) {
// Konfirmasi untuk membatalkan pesan transaksional.
mqTransProducer.rollback(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, rollback", message.getMessageId()));
} else {
// Periksa status lain kali.
System.out.println(String.format("MessageId:%s, tidak diketahui", message.getMessageId()));
}
} catch (Throwable e) {
// Jika pesan transaksional dikomit atau dibatalkan setelah waktu yang ditentukan oleh parameter TransCheckImmunityTime untuk handle pesan transaksional berakhir atau setelah periode timeout yang ditentukan untuk handle consumeHalfMessage berakhir, commit atau rollback gagal. Dalam contoh ini, periode timeout ditentukan sebagai 10 detik untuk handle consumeHalfMessage.
processCommitRollError(e);
}
}
} catch (Throwable e) {
System.out.println(e.getMessage());
}
}
}
});
t.start();
t.join();
mqClient.close();
}
}Langganan pesan transaksional
Contoh kode berikut menunjukkan cara berlangganan pesan transaksional menggunakan SDK Klien HTTP untuk Java:
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
// ID AccessKey yang digunakan untuk otentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// Rahasia AccessKey yang digunakan untuk otentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
// Setiap topik dapat digunakan untuk mengirim dan menerima pesan jenis tertentu. Misalnya, topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
final String topic = "${TOPIC}";
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
final String groupId = "${GROUP_ID}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
final String instanceId = "${INSTANCE_ID}";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Konsumsi pesan secara siklik di thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan.
do {
List<Message> messages = null;
try {
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu yang ditentukan. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, broker segera mengirim respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
messages = consumer.consumeMessage(
3,// Tentukan jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3// Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// Tidak ada pesan dalam topik yang tersedia untuk dikonsumsi.
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": tidak ada pesan baru, lanjutkan!");
continue;
}
// Logika konsumsi pesan.
for (Message message : messages) {
System.out.println("Terima pesan: " + message);
}
// Jika broker tidak menerima pengakuan (ACK) untuk pesan dari konsumen sebelum interval pengulangan pengiriman berakhir, broker mengirim pesan untuk dikonsumsi lagi.
// Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// Jika handle pesan habis waktu, broker tidak dapat menerima ACK untuk pesan dari konsumen.
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Gagal mengirim ACK pesan, requestId adalah:" + errors.getRequestId() + ", gagal handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", Kode Kesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", Pesan Kesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}