Topik ini menjelaskan cara berlangganan pesan menggunakan TCP Client SDK untuk .NET yang disediakan oleh ApsaraMQ for RocketMQ.
Pastikan langganan tetap konsisten untuk semua instance konsumen dengan ID grup yang sama. Untuk informasi lebih lanjut, lihat Konsistensi Langganan.
ApsaraMQ for RocketMQ mendukung mode langganan berikut:
Langganan Klustering
Semua konsumen dalam grup dengan ID yang sama mengonsumsi jumlah pesan yang sama. Sebagai contoh, sebuah topik berisi sembilan pesan dan grup konsumen terdiri dari tiga konsumen. Dalam mode klustering, setiap konsumen mengonsumsi tiga pesan.
// Konfigurasikan langganan klustering, yang merupakan mode default. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);Langganan Siaran
Setiap konsumen dalam grup dengan ID yang sama mengonsumsi semua pesan sekali. Sebagai contoh, sebuah topik berisi sembilan pesan dan grup konsumen terdiri dari tiga konsumen. Dalam mode siaran, setiap konsumen mengonsumsi kesembilan pesan tersebut.
// Konfigurasikan langganan siaran. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
Kode contoh:
using System;
using System.Threading;
using System.Text;
using ons;
// Fungsi callback yang dieksekusi ketika pesan ditarik dari broker ApsaraMQ for RocketMQ.
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// AccessKey ID yang digunakan untuk autentikasi.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// Rahasia AccessKey yang digunakan untuk autentikasi.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// Jalur log.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Konsumsi klustering.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// Konsumsi siaran.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// Buat instance konsumen.
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// Berlangganan ke topik.
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// Mulai instance konsumen.
consumer.start();
// Pengaturan ini hanya digunakan dalam demo ini. Di lingkungan produksi sebenarnya, Anda tidak dapat keluar dari proses.
Thread.Sleep(300000);
// Sebelum keluar dari proses, hentikan instance konsumen.
consumer.shutdown();
}
}