Pesan terurut adalah jenis pesan yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini diterbitkan dan dikonsumsi dalam urutan pertama masuk, pertama keluar (FIFO) yang ketat. Topik ini menyajikan contoh kode untuk mengirim dan menerima pesan terurut melalui TCP menggunakan Apache RocketMQ SDK untuk Java.
Prasyarat
Pastikan langkah-langkah berikut telah dilakukan:
SDK untuk Java versi 1.2.7 atau lebih baru telah diunduh. Untuk informasi lebih lanjut, lihat Catatan Rilis.
Lingkungan telah dipersiapkan. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
(Opsional) Pengaturan logging telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Pengaturan Logging.
Informasi latar belakang
Pesan terurut dalam topik tertentu dipartisi berdasarkan kunci sharding. Pesan dalam setiap partisi dikonsumsi dalam urutan FIFO yang ketat. Kunci sharding adalah bidang kunci yang digunakan untuk mengidentifikasi partisi berbeda dalam pesan terurut. Kunci sharding berbeda dari kunci pesan normal.
Untuk informasi lebih lanjut, lihat Pesan Terurut.
Jika Anda adalah pengguna baru ApsaraMQ for RocketMQ, kami sarankan Anda merujuk ke proyek demo untuk membangun proyek sebelum menggunakan ApsaraMQ for RocketMQ untuk mengirim dan menerima pesan.
Kirim pesan terurut
Broker ApsaraMQ for RocketMQ menentukan urutan pembuatan pesan berdasarkan urutan pengiriman oleh satu produser atau thread. Jika pengirim menggunakan beberapa produser atau thread secara bersamaan, urutan pesan ditentukan oleh urutan penerimaan broker ApsaraMQ for RocketMQ. Urutan ini mungkin berbeda dari urutan pengiriman di sisi bisnis.
Untuk informasi lebih lanjut tentang kode contoh, lihat ApsaraMQ for RocketMQ Repositori Kode.
Contoh kode:
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import java.util.Properties;
public class ProducerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
// ID AccessKey yang digunakan untuk autentikasi.
properties.put(PropertyKeyConst.AccessKey,"XXX");
// Rahasia AccessKey yang digunakan untuk autentikasi.
properties.put(PropertyKeyConst.SecretKey,"XXX");
// Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
producer.start();
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
Message msg = new Message(
// Topik tempat pesan yang ingin Anda kirim.
"Order_global_topic",
// Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk menyaring pesan di broker ApsaraMQ for RocketMQ.
"TagA",
// Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus sepakat pada metode yang digunakan untuk serialisasi dan deserialisasi tubuh pesan.
"send order global msg".getBytes()
);
// Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
// Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk mencari pesan di konsol ApsaraMQ for RocketMQ dan mengirimnya lagi.
// Catatan: Pesan dapat dikirim dan diterima meskipun Anda tidak menentukan kunci pesan.
msg.setKey(orderId);
// Bidang kunci yang digunakan dalam pesan terurut untuk mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan normal.
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = producer.send(msg, shardingKey);
// Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan berhasil dikirim.
if (sendResult != null) {
System.out.println(new Date() + " Kirim pesan mq berhasil. Topik adalah:" + msg.getTopic() + " msgId adalah: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
System.out.println(new Date() + " Kirim pesan mq gagal. Topik adalah:" + msg.getTopic());
e.printStackTrace();
}
}
// Sebelum Anda keluar dari aplikasi, matikan produser.
// Catatan: Jika Anda mematikan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produser.
producer.shutdown();
}
}Berlangganan pesan terurut
Contoh kode:
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import java.util.Properties;
public class ConsumerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
// ID AccessKey yang digunakan untuk autentikasi.
properties.put(PropertyKeyConst.AccessKey,"XXX");
// Rahasia AccessKey yang digunakan untuk autentikasi.
properties.put(PropertyKeyConst.SecretKey,"XXX");
// Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
// Periode timeout dalam milidetik sebelum percobaan ulang dilakukan ketika sistem gagal mengonsumsi pesan terurut. Nilai valid: 10 hingga 30.000.
properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
// Jumlah maksimum percobaan ulang yang dapat dilakukan pada pesan jika pesan gagal dikonsumsi.
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
// Sebelum Anda menggunakan konsumen untuk berlangganan pesan, panggil metode start() hanya sekali untuk memulai konsumen.
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
consumer.subscribe(
// Topik tempat pesan yang ingin Anda langganan.
"Order_global_topic",
// Berlangganan pesan yang berisi tag tertentu di topik tertentu.
// 1. Asterisk (*) menentukan bahwa konsumen berlangganan semua pesan.
// 2. TagA || TagB || TagC menentukan bahwa konsumen berlangganan pesan yang berisi TagA, TagB, atau TagC.
"*",
new MessageOrderListener() {
/**
* 1 Jika pesan gagal dikonsumsi atau pengecualian terjadi selama pemrosesan pesan, OrderAction.Suspend dikembalikan.
* 2. Jika pesan diproses, OrderAction.Success dikembalikan.
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
System.out.println(message);
return OrderAction.Success;
}
});
consumer.start();
}
}