Pesan terurut adalah jenis pesan yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini diterbitkan dan dikonsumsi dalam urutan ketat first in, first out (FIFO). Topik ini menyertakan contoh kode untuk mengirim dan menerima pesan terurut menggunakan Apache RocketMQ TCP client SDK for Java.
Informasi latar belakang
Pesan terurut di partisi tertentu dari sebuah topik dipartisi berdasarkan kunci sharding. Pesan dalam setiap partisi dikonsumsi dalam urutan FIFO yang ketat. Kunci sharding adalah bidang kunci yang digunakan untuk mengidentifikasi partisi berbeda pada pesan terurut. Kunci sharding berbeda dari kunci pesan biasa.
Untuk informasi lebih lanjut, lihat Pesan Terurut.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Apache RocketMQ SDK for Java versi 4.5.2 atau yang lebih baru telah diunduh. Untuk informasi lebih lanjut, kunjungi Halaman Unduhan RocketMQ.
Lingkungan telah disiapkan. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Sepasang AccessKey telah dibuat di akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Sepasang AccessKey.
Kirim pesan terurut
Broker ApsaraMQ for RocketMQ menentukan urutan pembuatan pesan berdasarkan urutan pengiriman menggunakan satu produser atau thread. Jika pengirim menggunakan beberapa produser atau thread secara bersamaan, urutan pesan ditentukan oleh urutan penerimaan pesan oleh broker ApsaraMQ for RocketMQ. Urutan ini mungkin berbeda dari urutan pengiriman di sisi bisnis.
Kode sampel berikut menunjukkan cara mengirim pesan terurut menggunakan Apache RocketMQ TCP client SDK for Java:
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQOrderProducer {
private static RPCHook getAclRPCHook()
{
/**
* ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda.
* Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
*/
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* Buat produser dan aktifkan fitur jejak pesan.
* Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook(), true, null);
/**
* Tentukan Alibaba Cloud sebagai saluran akses. Sebelum menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong.
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
// Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80.
*/
producer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
producer.start();
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message("YOUR ORDER TOPIC",
"YOUR MESSAGE TAG",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
/**
* Catatan: Parameter ini diperlukan. Pesan terurut dapat didistribusikan secara merata ke setiap antrian hanya setelah parameter ini dikonfigurasi.
* Jika versi SDK adalah 5.x atau yang lebih baru, Anda dapat menggunakan metode berikut untuk menentukan urutan pesan:
* msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
*/
msg.putUserProperty("__SHARDINGKEY", orderId + "");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// Pilih algoritma pemilihan partisi yang sesuai dengan kebutuhan bisnis Anda. Algoritma ini dapat digunakan untuk memastikan bahwa hasil dari parameter yang sama adalah sama.
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}Berlangganan pesan terurut
Kode sampel berikut menunjukkan cara berlangganan pesan terurut menggunakan Apache RocketMQ TCP client SDK for Java:
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQOrderConsumer {
private static RPCHook getAclRPCHook()
{
/**
* ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda.
* Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
*/
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* Buat konsumen dan aktifkan fitur jejak pesan.
* Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
/**
* Tentukan Alibaba Cloud sebagai saluran akses. Sebelum menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong.
*/
consumer.setAccessChannel(AccessChannel.CLOUD);
/**
// Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80.
*/
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
consumer.subscribe("YOUR ORDER TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Terima Pesan Baru: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;// Jika pesan gagal dikonsumsi, permintaan ditangguhkan dan dicoba kembali dan hasil berikut dikembalikan: ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
}
});
consumer.start();
System.out.printf("Konsumen Dimulai.%n");
}