Pesan terurut adalah jenis pesan yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini diterbitkan dan dikonsumsi dalam urutan first-in-first-out (FIFO) yang ketat. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan terurut menggunakan HTTP client SDK untuk C#.
Informasi latar belakang
Pesan terurut dibagi menjadi jenis-jenis berikut:
Pesan terurut global: Jika pesan dalam sebuah topik termasuk jenis ini, pesan tersebut diterbitkan dan dikonsumsi dalam urutan FIFO.
Pesan terurut berpartisi: Jika pesan dalam sebuah topik termasuk jenis ini, pesan tersebut didistribusikan ke partisi yang berbeda menggunakan kunci sharding. Pesan dalam setiap partisi dikonsumsi dalam urutan FIFO. Kunci sharding adalah bidang kunci yang digunakan untuk pesan terurut guna mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan.
Untuk informasi lebih lanjut, lihat Pesan Terurut.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Instal SDK untuk C#. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Peroleh pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan terurut
Contoh kode berikut menunjukkan cara mengirim pesan terurut menggunakan HTTP client SDK untuk C#:
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class OrderProducerSample
{
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
private const string _endpoint = "${HTTP_ENDPOINT}";
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk otentikasi.
private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
// Rahasia AccessKey yang digunakan untuk otentikasi.
private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
private const string _topicName = "${TOPIC}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
private const string _instanceId = "${INSTANCE_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
// Kirim delapan pesan secara siklik.
for (int i = 0; i < 8; i++)
{
// Isi dan tag pesan.
TopicMessage sendMsg = new TopicMessage("hello mq", "tag");
// Atribut kustom pesan.
sendMsg.PutProperty("a", i.ToString());
// Kunci sharding yang digunakan untuk mendistribusikan pesan terurut ke partisi tertentu. Kunci sharding dapat digunakan untuk mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan.
sendMsg.ShardingKey = (i % 2).ToString();
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publis message success:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}Berlangganan pesan terurut
Contoh kode berikut menunjukkan cara berlangganan pesan terurut menggunakan HTTP client SDK untuk C#:
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class OrderConsumerSample
{
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
private const string _endpoint = "${HTTP_ENDPOINT}";
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk otentikasi.
private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
// Rahasia AccessKey yang digunakan untuk otentikasi.
private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
private const string _topicName = "${TOPIC}";
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
private const string _groupId = "${GROUP_ID}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
private const string _instanceId = "${INSTANCE_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// Konsumsi pesan secara siklik di thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan.
while (true)
{
try
{
// Konsumsi pesan dalam mode polling panjang. Konsumen mungkin menarik pesan terurut berpartisi dari beberapa partisi. Konsumen mengonsumsi pesan dari partisi yang sama dalam urutan pengiriman pesan.
// Anggaplah seorang konsumen menarik pesan terurut berpartisi dari satu partisi. Jika broker tidak menerima pengakuan (ACK) untuk pesan dari konsumen, broker akan mengirimkan pesan di partisi itu kepada konsumen lagi.
// Konsumen hanya dapat mengonsumsi batch pesan berikutnya dari partisi setelah semua pesan yang ditarik dari partisi dalam batch sebelumnya diakui sebagai dikonsumsi.
// Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi di topik, permintaan ditangguhkan di broker selama periode waktu yang ditentukan. Jika pesan tersedia untuk dikonsumsi selama periode waktu yang ditentukan, broker segera mengirimkan respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
List<Message> messages = null;
try
{
messages = consumer.ConsumeMessageOrderly(
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3 // Durasi periode polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " Tidak ada pesan baru, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name + " Menerima Pesan:");
// Logika konsumsi pesan.
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Properti a adalah:" + message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter Message.nextConsumeTime berakhir, broker mengirimkan pesan untuk dikonsumsi lagi.
// Timestamp unik ditentukan untuk penanganan pesan setiap kali pesan dikonsumsi.
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack pesan berhasil:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// Jika penanganan pesan kedaluwarsa, broker tidak dapat menerima ACK untuk pesan dari konsumen.
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack pesan gagal, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",KodeKesalahan:" + errorItem.ErrorCode + ",PesanKesalahan:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}