全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kirim dan terima pesan terjadwal dan pesan tertunda

更新时间:Jul 02, 2025

Topik ini menyediakan contoh kode untuk mengirim dan menerima pesan terjadwal serta pesan tertunda menggunakan SDK klien HTTP untuk C#.

Informasi latar belakang

  • Pesan tertunda adalah pesan yang dikirim oleh broker ApsaraMQ for RocketMQ ke konsumen setelah periode waktu tertentu.

  • Pesan terjadwal adalah pesan yang dikirim oleh broker ApsaraMQ for RocketMQ ke konsumen pada titik waktu tertentu.

Konfigurasi kode untuk pesan terjadwal sama dengan konfigurasi kode untuk pesan tertunda melalui HTTP. Kedua jenis pesan tersebut dikirim ke konsumen setelah periode waktu tertentu berdasarkan atribut pesan.

Untuk informasi lebih lanjut, lihat Pesan Terjadwal dan Pesan Tertunda.

Prasyarat

Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:

  • Instal SDK untuk C#. Untuk informasi lebih lanjut, lihat Siapkan Lingkungan.

  • Buat sumber daya yang ingin Anda tentukan dalam kode di konsol ApsaraMQ for RocketMQ. Sumber daya mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Dapatkan pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Kirim pesan terjadwal dan pesan tertunda

Contoh kode berikut menunjukkan cara mengirim pesan terjadwal dan pesan tertunda menggunakan SDK klien HTTP untuk C#:

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. 
        // ID AccessKey yang digunakan untuk autentikasi. 
        private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
        // Rahasia AccessKey yang digunakan untuk autentikasi. 
        private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. 
        private const string _topicName = "${TOPIC}";
        // ID instance tempat topik dimiliki. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ. 
        // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // Kirim empat pesan secara siklik. 
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    // Isi pesan.     
                    sendMsg = new TopicMessage("hello mq");
                    // Atribut kustom pesan. 
                    sendMsg.PutProperty("a", i.ToString());
                    // Periode waktu setelah broker mengirimkan pesan. Dalam contoh ini, broker mengirimkan pesan ke konsumen setelah penundaan 10 detik. Atur parameter ini ke timestamp dalam milidetik.                         
                    // Jika produsen mengirim pesan terjadwal, atur parameter ini ke selisih waktu antara titik waktu terjadwal dan titik waktu saat ini. 
                    sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
                    
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}

Berlangganan pesan terjadwal dan pesan tertunda

Contoh kode berikut menunjukkan cara berlangganan pesan terjadwal dan pesan tertunda menggunakan SDK klien HTTP untuk C#:

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
    public class ConsumerSample
    {
        // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. 
        // ID AccessKey yang digunakan untuk autentikasi. 
        private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
        // Rahasia AccessKey yang digunakan untuk autentikasi. 
        private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. 
        private const string _topicName = "${TOPIC}";
        // ID instance tempat topik dimiliki. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ. 
        // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ. 
        private const string _instanceId = "${INSTANCE_ID}";
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. 
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Konsumsi pesan secara siklik di thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan. 
            while (true)
            {
                try
                {
                    // Konsumsi pesan dalam mode polling panjang. 
                    // Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu yang ditentukan. Jika pesan tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, respons segera dikirim ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik. 
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai terbesar yang dapat Anda tentukan adalah 16. 
                            3 // Durasi periode polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30. 
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " Tidak ada pesan baru, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Menerima Pesan:");
                    // Logika konsumsi pesan. 
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Properti a adalah:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // Jika broker gagal menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter Message.nextConsumeTime berakhir, broker mengirimkan pesan untuk dikonsumsi lagi. 
                    // Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi. 
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack pesan berhasil:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // Jika handle pesan kedaluwarsa, broker gagal menerima ACK untuk pesan dari konsumen. 
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack pesan gagal, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",KodeKesalahan:" + errorItem.ErrorCode + ",PesanKesalahan:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}