ApsaraMQ for RocketMQ mendukung dua jenis pengiriman pesan berbasis waktu: scheduled messages, yang dikirimkan pada waktu tertentu, dan delayed messages, yang dikirimkan setelah penundaan tetap. Keduanya menggunakan properti __STARTDELIVERTIME untuk mengontrol kapan broker mengirimkan pesan ke konsumen.
Kode contoh Java berikut menunjukkan cara mengirim dan berlangganan pesan terjadwal dan tertunda dengan SDK client TCP (Community Edition).
Cara kerja
Saat produsen mengirim pesan dengan properti __STARTDELIVERTIME, broker ApsaraMQ for RocketMQ menyimpan pesan tersebut hingga waktu pengiriman yang ditentukan tiba. Pada saat itu, broker mengirimkan pesan tersebut ke konsumen yang berlangganan seperti halnya pesan biasa.
Delayed message: Atur
__STARTDELIVERTIMEkeSystem.currentTimeMillis() + delayMillis. Broker akan mengirimkan pesan setelah jeda waktu tersebut berlalu.Scheduled message: Atur
__STARTDELIVERTIMEke timestamp Unix masa depan dalam milidetik. Broker akan mengirimkan pesan tepat pada waktu tersebut.
Jika waktu yang ditentukan berada di masa lalu, broker akan segera mengirimkan pesan tersebut.
Scheduled vs. delayed: kapan masing-masing digunakan
| Tipe | Kasus penggunaan | Contoh |
|---|---|---|
| Delayed | Memicu aksi setelah menunggu selama periode tetap | Batalkan pesanan yang belum dibayar 30 menit setelah dibuat |
| Scheduled | Memicu aksi pada waktu tertentu | Kirim notifikasi pukul 09.00 setiap hari Senin |
Perbedaan dari Apache RocketMQ
Apache RocketMQ mendukung delayed messages tetapi tidak mendukung scheduled messages, serta tidak menyediakan antarmuka khusus untuk scheduled messages.
ApsaraMQ for RocketMQ menyediakan kemampuan tambahan:
Dukungan untuk kedua tipe pesan: delayed dan scheduled
Presisi tingkat kedua untuk waktu pengiriman
Konkurensi lebih tinggi untuk pemrosesan pesan berbasis waktu
Metode konfigurasi dan hasilnya berbeda antara Apache RocketMQ dan ApsaraMQ for RocketMQ. Gunakan kode contoh pada halaman ini untuk ApsaraMQ for RocketMQ di cloud.
Prasyarat
Sebelum memulai, pastikan Anda telah:
SDK Community Edition untuk Java versi 4.5.2 atau lebih baru terinstal
Pasangan AccessKey telah dibuat untuk Akun Alibaba Cloud Anda
Topik dan Group ID telah dibuat di Konsol ApsaraMQ for RocketMQ
Kirim pesan terjadwal dan tertunda
Kedua jenis pesan—terjadwal dan tertunda—menggunakan properti user yang sama, yaitu __STARTDELIVERTIME. Satu-satunya perbedaan terletak pada cara menghitung nilai timestamp-nya.
Ganti placeholder berikut dengan nilai aktual Anda:
| Placeholder | Deskripsi | Contoh |
|---|---|---|
<your-group-id> | Group ID yang dibuat di Konsol ApsaraMQ for RocketMQ | GID_example |
<your-access-point> | Titik akhir instans dari konsol | http://MQ_INST_XXXX.aliyuncs.com:80 |
<your-topic> | Topik yang dibuat di Konsol ApsaraMQ for RocketMQ | Topic_example |
<your-message-tag> | Tag pesan untuk penyaringan | TagA |
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
/**
* Membaca kredensial dari variabel lingkungan:
* ALIBABA_CLOUD_ACCESS_KEY_ID
* ALIBABA_CLOUD_ACCESS_KEY_SECRET
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
// Buat produsen dengan jejak pesan diaktifkan.
// Untuk menonaktifkan jejak pesan, gunakan:
// new DefaultMQProducer("<your-group-id>", getAclRPCHook());
DefaultMQProducer producer = new DefaultMQProducer(
"<your-group-id>", getAclRPCHook(), true, null);
// Diperlukan untuk jejak pesan di cloud.
producer.setAccessChannel(AccessChannel.CLOUD);
// Tetapkan titik akhir instans dari Konsol ApsaraMQ for RocketMQ.
producer.setNamesrvAddr("<your-access-point>");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(
"<your-topic>",
"<your-message-tag>",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// --- Opsi A: Delayed message ---
// Kirim 3 detik dari sekarang.
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
// --- Opsi B: Scheduled message ---
// Kirim pada tanggal dan waktu tertentu.
// Hapus komentar baris berikut dan beri komentar pada Opsi A untuk menggunakannya.
//
// long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// .parse("2025-08-10 18:45:00").getTime();
// msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
System.out.println(new Date() + " Gagal mengirim pesan mq.");
e.printStackTrace();
}
}
// Matikan produsen sebelum keluar (opsional).
producer.shutdown();
}
}Ringkasan:
Properti
__STARTDELIVERTIMEmenerima timestamp Unix dalam milidetik.Untuk delayed messages, tambahkan jeda waktu yang diinginkan (dalam milidetik) ke waktu saat ini.
Untuk scheduled messages, uraikan string tanggal-waktu target dalam format
yyyy-MM-dd HH:mm:ssmenjadi timestamp Unix.Jika waktu yang ditentukan lebih awal dari waktu saat ini, pesan akan langsung dikirimkan.
Berlangganan pesan terjadwal dan tertunda
Berlangganan pesan terjadwal dan tertunda identik dengan berlangganan pesan biasa. Broker menyimpan pesan hingga waktu pengiriman yang ditentukan tiba, sehingga tidak diperlukan konfigurasi khusus di sisi konsumen.
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/**
* Membaca kredensial dari variabel lingkungan:
* ALIBABA_CLOUD_ACCESS_KEY_ID
* ALIBABA_CLOUD_ACCESS_KEY_SECRET
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
// Buat konsumen dengan jejak pesan diaktifkan.
// Untuk menonaktifkan jejak pesan, gunakan:
// new DefaultMQPushConsumer("<your-group-id>", getAclRPCHook(),
// new AllocateMessageQueueAveragely());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"<your-group-id>", getAclRPCHook(),
new AllocateMessageQueueAveragely(), true, null);
// Tetapkan titik akhir instans dari Konsol ApsaraMQ for RocketMQ.
// Nilainya dalam format http://xxxx.mq-internet.aliyuncs.com:80.
consumer.setNamesrvAddr("<your-access-point>");
// Diperlukan untuk jejak pesan di cloud.
consumer.setAccessChannel(AccessChannel.CLOUD);
// Berlangganan semua tag (*) pada topik.
consumer.subscribe("<your-topic>", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Terima Pesan Baru: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}Lihat juga
Pesan terjadwal dan tertunda: konsep, presisi pengiriman, dan batasan penggunaan
Jelajahi tipe pesan lain seperti pesan transaksional dan pesan terurut