全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kirim dan terima pesan terjadwal dan pesan tertunda

更新时间:Jun 28, 2025

Topik ini menyediakan contoh kode untuk menggunakan TCP client SDK untuk Java dari Edisi Komunitas guna mengirim dan menerima pesan terjadwal serta pesan tertunda.

Informasi latar belakang

  • Pesan Terjadwal: Pesan yang dikirim oleh broker ApsaraMQ for RocketMQ kepada konsumen pada waktu tertentu.

  • Pesan Tertunda: Pesan yang dikirim oleh broker ApsaraMQ for RocketMQ kepada konsumen setelah jangka waktu tertentu.

Untuk informasi lebih lanjut, lihat Pesan Terjadwal dan Pesan Tertunda.

Penting

Metode konfigurasi dan hasil pesan terjadwal serta pesan tertunda berbeda antara Apache RocketMQ dan ApsaraMQ for RocketMQ. Apache RocketMQ mendukung pesan tertunda tetapi tidak mendukung pesan terjadwal, sehingga tidak ada antarmuka khusus untuk penjadwalan pesan. ApsaraMQ for RocketMQ mendukung kedua jenis pesan tersebut, memungkinkan pengaturan waktu terjadwal dan periode penundaan hingga detik serta menawarkan konkurensi yang lebih tinggi. Kami merekomendasikan penggunaan cloud untuk mengirim dan menerima pesan terjadwal dan pesan tertunda. Untuk detail lebih lanjut, lihat bagian-bagian berikut.

Prasyarat

Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:

  • Edisi Komunitas dari SDK untuk Java versi 4.5.2 atau lebih baru telah diunduh. Untuk informasi lebih lanjut, kunjungi Halaman Unduhan RocketMQ.

  • Lingkungan telah disiapkan. Untuk informasi lebih lanjut, lihat Siapkan Lingkungan.

  • Pair AccessKey telah dibuat di akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pair AccessKey.

Kirim pesan terjadwal dan pesan tertunda

Berikut adalah contoh kode untuk mengirim pesan terjadwal dan pesan tertunda menggunakan TCP client SDK untuk Java dari Edisi Komunitas:

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
    /**
    * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda. 
    * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. 
    */ 
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * Buat produser dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
         * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
         *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
        /**
         * Tentukan Alibaba Cloud sebagai saluran akses. Jika Anda ingin menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong. 
         */
        producer.setAccessChannel(AccessChannel.CLOUD);
        /**
         * Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Nilainya dalam format http://MQ_INST_XXXX.aliyuncs.com:80. 
         */
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try { 
                /* Tentukan topik yang Anda buat di konsol ApsaraMQ for RocketMQ. */
                Message msg = new Message("YOUR TOPIC",
                    /* Tag pesan. */
                    "YOUR MESSAGE TAG",
                    // Tubuh pesan. */
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                /* Periode penundaan setelah pesan tertunda dikirim. Unit: milidetik. Misalnya, jika Anda ingin pesan tertunda dikirim setelah 3 detik, tentukan nilainya sebagai 3000. */
                long delayTime = System.currentTimeMillis()+3000;
                msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(delayTime));
      
                /**
                * Titik waktu saat pesan terjadwal dikirim. Misalnya, jika Anda ingin pesan terjadwal dikirim pada pukul 18:45:00 tanggal 10 Agustus 2021, tentukan nilainya sebagai 2021-08-10 18:45:00. 
                * Nilai parameter ini dalam format yyyy-MM-dd HH:mm:ss. Jika Anda menentukan waktu yang lebih awal dari waktu saat ini, pesan akan segera dikirim ke konsumen. 
                * long timeStamp = newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                * msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                */
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // Logika untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim lagi. 
                System.out.println(new Date() + " Pengiriman pesan mq gagal.");
                e.printStackTrace();
            }
        }

        // Sebelum keluar dari aplikasi, matikan objek produser. 
        // Catatan: Operasi ini bersifat opsional. 
        producer.shutdown();
    }
}

Berlangganan pesan terjadwal dan pesan tertunda

Berikut adalah contoh kode untuk berlangganan pesan terjadwal dan pesan tertunda menggunakan TCP client SDK untuk Java dari Edisi Komunitas:

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda. 
    * Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. 
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * Buat konsumen dan aktifkan fitur jejak pesan. Atur parameter ini ke ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
         * Jika Anda tidak ingin mengaktifkan fitur jejak pesan, Anda dapat menggunakan metode berikut untuk membuat produser:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        // Titik akhir dari instance ApsaraMQ for RocketMQ. 
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        // Atur parameter AccessChannel ke CLOUD. Jika Anda ingin menggunakan fitur jejak pesan di cloud, konfigurasikan parameter ini. Jika Anda tidak ingin mengaktifkan fitur jejak pesan, biarkan parameter ini kosong. 
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Terima Pesan Baru: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}