All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim dan terima pesan terjadwal dan tertunda

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ mendukung dua jenis pengiriman pesan berbasis waktu: scheduled messages, yang dikirimkan pada waktu tertentu, dan delayed messages, yang dikirimkan setelah penundaan tetap. Keduanya menggunakan properti __STARTDELIVERTIME untuk mengontrol kapan broker mengirimkan pesan ke konsumen.

Kode contoh Java berikut menunjukkan cara mengirim dan berlangganan pesan terjadwal dan tertunda dengan SDK client TCP (Community Edition).

Cara kerja

Saat produsen mengirim pesan dengan properti __STARTDELIVERTIME, broker ApsaraMQ for RocketMQ menyimpan pesan tersebut hingga waktu pengiriman yang ditentukan tiba. Pada saat itu, broker mengirimkan pesan tersebut ke konsumen yang berlangganan seperti halnya pesan biasa.

  • Delayed message: Atur __STARTDELIVERTIME ke System.currentTimeMillis() + delayMillis. Broker akan mengirimkan pesan setelah jeda waktu tersebut berlalu.

  • Scheduled message: Atur __STARTDELIVERTIME ke timestamp Unix masa depan dalam milidetik. Broker akan mengirimkan pesan tepat pada waktu tersebut.

Jika waktu yang ditentukan berada di masa lalu, broker akan segera mengirimkan pesan tersebut.

Scheduled vs. delayed: kapan masing-masing digunakan

TipeKasus penggunaanContoh
DelayedMemicu aksi setelah menunggu selama periode tetapBatalkan pesanan yang belum dibayar 30 menit setelah dibuat
ScheduledMemicu aksi pada waktu tertentuKirim notifikasi pukul 09.00 setiap hari Senin

Perbedaan dari Apache RocketMQ

Apache RocketMQ mendukung delayed messages tetapi tidak mendukung scheduled messages, serta tidak menyediakan antarmuka khusus untuk scheduled messages.

ApsaraMQ for RocketMQ menyediakan kemampuan tambahan:

  • Dukungan untuk kedua tipe pesan: delayed dan scheduled

  • Presisi tingkat kedua untuk waktu pengiriman

  • Konkurensi lebih tinggi untuk pemrosesan pesan berbasis waktu

Penting

Metode konfigurasi dan hasilnya berbeda antara Apache RocketMQ dan ApsaraMQ for RocketMQ. Gunakan kode contoh pada halaman ini untuk ApsaraMQ for RocketMQ di cloud.

Prasyarat

Sebelum memulai, pastikan Anda telah:

Kirim pesan terjadwal dan tertunda

Kedua jenis pesan—terjadwal dan tertunda—menggunakan properti user yang sama, yaitu __STARTDELIVERTIME. Satu-satunya perbedaan terletak pada cara menghitung nilai timestamp-nya.

Ganti placeholder berikut dengan nilai aktual Anda:

PlaceholderDeskripsiContoh
<your-group-id>Group ID yang dibuat di Konsol ApsaraMQ for RocketMQGID_example
<your-access-point>Titik akhir instans dari konsolhttp://MQ_INST_XXXX.aliyuncs.com:80
<your-topic>Topik yang dibuat di Konsol ApsaraMQ for RocketMQTopic_example
<your-message-tag>Tag pesan untuk penyaringanTagA
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * Membaca kredensial dari variabel lingkungan:
     *   ALIBABA_CLOUD_ACCESS_KEY_ID
     *   ALIBABA_CLOUD_ACCESS_KEY_SECRET
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Buat produsen dengan jejak pesan diaktifkan.
        // Untuk menonaktifkan jejak pesan, gunakan:
        //   new DefaultMQProducer("<your-group-id>", getAclRPCHook());
        DefaultMQProducer producer = new DefaultMQProducer(
            "<your-group-id>", getAclRPCHook(), true, null);

        // Diperlukan untuk jejak pesan di cloud.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Tetapkan titik akhir instans dari Konsol ApsaraMQ for RocketMQ.
        producer.setNamesrvAddr("<your-access-point>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message(
                    "<your-topic>",
                    "<your-message-tag>",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                // --- Opsi A: Delayed message ---
                // Kirim 3 detik dari sekarang.
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // --- Opsi B: Scheduled message ---
                // Kirim pada tanggal dan waktu tertentu.
                // Hapus komentar baris berikut dan beri komentar pada Opsi A untuk menggunakannya.
                //
                // long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                //     .parse("2025-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                System.out.println(new Date() + " Gagal mengirim pesan mq.");
                e.printStackTrace();
            }
        }

        // Matikan produsen sebelum keluar (opsional).
        producer.shutdown();
    }
}

Ringkasan:

  • Properti __STARTDELIVERTIME menerima timestamp Unix dalam milidetik.

  • Untuk delayed messages, tambahkan jeda waktu yang diinginkan (dalam milidetik) ke waktu saat ini.

  • Untuk scheduled messages, uraikan string tanggal-waktu target dalam format yyyy-MM-dd HH:mm:ss menjadi timestamp Unix.

  • Jika waktu yang ditentukan lebih awal dari waktu saat ini, pesan akan langsung dikirimkan.

Berlangganan pesan terjadwal dan tertunda

Berlangganan pesan terjadwal dan tertunda identik dengan berlangganan pesan biasa. Broker menyimpan pesan hingga waktu pengiriman yang ditentukan tiba, sehingga tidak diperlukan konfigurasi khusus di sisi konsumen.

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
     * Membaca kredensial dari variabel lingkungan:
     *   ALIBABA_CLOUD_ACCESS_KEY_ID
     *   ALIBABA_CLOUD_ACCESS_KEY_SECRET
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
            System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Buat konsumen dengan jejak pesan diaktifkan.
        // Untuk menonaktifkan jejak pesan, gunakan:
        //   new DefaultMQPushConsumer("<your-group-id>", getAclRPCHook(),
        //       new AllocateMessageQueueAveragely());
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
            "<your-group-id>", getAclRPCHook(),
            new AllocateMessageQueueAveragely(), true, null);

        // Tetapkan titik akhir instans dari Konsol ApsaraMQ for RocketMQ.
        // Nilainya dalam format http://xxxx.mq-internet.aliyuncs.com:80.
        consumer.setNamesrvAddr("<your-access-point>");

        // Diperlukan untuk jejak pesan di cloud.
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // Berlangganan semua tag (*) pada topik.
        consumer.subscribe("<your-topic>", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.printf("Terima Pesan Baru: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

Lihat juga

  • Pesan terjadwal dan tertunda: konsep, presisi pengiriman, dan batasan penggunaan

  • Jelajahi tipe pesan lain seperti pesan transaksional dan pesan terurut