Setelah membuat sumber daya yang diperlukan di konsol ApsaraMQ for RocketMQ, Anda dapat menggunakan SDK Klien TCP dari ApsaraMQ for RocketMQ untuk mengirim dan berlangganan pesan normal.
Prasyarat
- Catatan
Pesan normal digunakan dalam contoh ini. Topik yang dibuat untuk pesan normal tidak dapat digunakan untuk jenis pesan lainnya seperti pesan terjadwal, tertunda, terurut, atau transaksional. Pastikan Anda membuat topik sesuai dengan tipe pesan yang akan dikirim.
Unduh dan instal SDK Klien TCP
SDK komersial menawarkan lebih banyak fitur dan stabilitas dibandingkan SDK sumber terbuka. Kami merekomendasikan penggunaan SDK komersial dari ApsaraMQ for RocketMQ untuk mengakses layanan tersebut. SDK sumber terbuka hanya disarankan jika Anda ingin memigrasikan data dari kluster Apache RocketMQ ke instance ApsaraMQ for RocketMQ tanpa memodifikasi kode.
ApsaraMQ for RocketMQ menyediakan SDK Klien TCP komersial berikut. Pilih SDK sesuai dengan bahasa pemrograman yang Anda gunakan.
Java SDK
C/C++ SDK
.NET SDK
Menggunakan SDK Klien TCP untuk Mengirim Pesan Normal
Setelah mendapatkan SDK klien untuk bahasa pemrograman tertentu, jalankan contoh kode untuk mengirim pesan normal.
Java
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan ID AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
properties.put(PropertyKeyConst.AccessKey, "XXX");
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan rahasia AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
properties.put(PropertyKeyConst.SecretKey, "XXX");
// Periode timeout untuk mengirim pesan. Unit: milidetik.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// Tentukan titik akhir TCP. Anda dapat melihat titik akhir di bagian Titik Akhir TCP halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
Producer producer = ONSFactory.createProducer(properties);
// Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
producer.start();
Message msg = new Message(
// Topik tempat pesan termasuk.
"TopicTestMQ",
// Tag pesan. Tag pesan mirip dengan tag Gmail dan digunakan untuk membantu konsumen menyaring pesan di broker ApsaraMQ for RocketMQ.
"TagA",
// Tubuh pesan. Tubuh pesan dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus sepakat pada metode serialisasi dan deserialisasi.
"Hello MQ".getBytes());
// Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan. // Kunci unik membantu Anda menanyakan dan mengirim ulang pesan di konsol ApsaraMQ for RocketMQ jika pesan gagal diterima.
// Catatan: Anda dapat mengirim dan menerima pesan meskipun Anda tidak menentukan kunci pesan.
msg.setKey("ORDERID_100");
// Kirim pesan dalam mode asinkron. Hasilnya dikembalikan ke klien dengan memanggil metode callback.
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// Pesan telah dikirim.
System.out.println("Pesan berhasil dikirim. topik=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim lagi.
System.out.println("Gagal mengirim pesan. topik=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// Dapatkan nilai parameter msgId sebelum Anda memanggil metode callback untuk mengembalikan hasil.
System.out.println("Kirim pesan async. topik=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
// Sebelum keluar dari aplikasi, hentikan produser. Catatan: Langkah ini opsional.
producer.shutdown();
}
} .NET
using System;
using ons;
public class ProducerExampleForEx
{
public ProducerExampleForEx()
{
}
static void Main(string[] args) {
// Konfigurasikan akun Anda berdasarkan informasi di Konsol Manajemen Alibaba Cloud.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan ID AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan rahasia AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// Tentukan titik akhir TCP. Anda dapat melihat titik akhir di bagian Titik Akhir TCP halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// Jalur log.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Buat instance produser.
// Catatan: Instance produser bersifat thread-safe dan dapat digunakan untuk mengirim pesan ke topik yang berbeda. Pada umumnya, setiap thread hanya memerlukan satu instance produser.
Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);
// Mulai instance produser.
producer.start();
// Buat pesan.
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Contoh isi pesan");
msg.setKey(Guid.NewGuid().ToString());
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg);
Console.WriteLine("Pengiriman sukses {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("Pengiriman gagal{0}", ex.ToString());
}
}
// Sebelum keluar dari thread Anda, hentikan instance produser.
producer.shutdown();
}
}C/C++
#include "ONSFactory.h"
#include "ONSClientException.h"
using namespace ons;
int main()
{
// Buat produser dan konfigurasikan parameter yang diperlukan untuk mengirim pesan.
ONSFactoryProperty factoryInfo;
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Tentukan titik akhir TCP. Anda dapat melihat titik akhir di bagian Titik Akhir TCP halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");// Isi pesan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan rahasia AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan rahasia AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
//buat produser;
Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);
// Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
pProducer->start();
Message msg(
//Topik Pesan
factoryInfo.getPublishTopics(),
// Tag pesan. Tag pesan mirip dengan tag Gmail dan digunakan untuk membantu konsumen menyaring pesan di broker ApsaraMQ for RocketMQ.
"TagA",
// Tubuh pesan. Anda tidak dapat meninggalkan parameter ini kosong. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus sepakat pada metode serialisasi dan deserialisasi.
factoryInfo.getMessageContent()
);
// Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
// Kunci dapat digunakan untuk menanyakan dan mengirim ulang pesan di konsol ApsaraMQ for RocketMQ jika pesan gagal diterima.
// Catatan: Anda dapat mengirim dan menerima pesan meskipun Anda tidak menentukan kunci pesan.
msg.setKey("ORDERID_100");
// Kirim pesan. Jika tidak ada pengecualian yang terjadi, pesan telah dikirim.
try
{
SendResultONS sendResult = pProducer->send(msg);
}
catch(ONSClientException & e)
{
// Tentukan logika untuk menangani pengecualian.
}
// Sebelum Anda keluar dari aplikasi, Anda harus menghentikan produser. Jika Anda tidak menghentikan produser, masalah seperti kebocoran memori mungkin terjadi.
pProducer->shutdown();
return 0;
}Untuk memulai instans, masuk ke konsol ApsaraMQ for RocketMQ, temukan instans yang telah dibuat, klik More di kolom Actions, lalu pilih Quick Start dari daftar drop-down.
Periksa apakah pesan telah dikirim
Setelah pesan dikirim, periksa statusnya di konsol ApsaraMQ for RocketMQ dengan langkah-langkah berikut:
- Di halaman Detail Instans, klik Message Query di panel navigasi sebelah kiri.
- Di halaman Message Query, pilih metode pencarian, tentukan parameter yang diperlukan, lalu klik Search.
Stored At menunjukkan waktu penyimpanan pesan oleh broker ApsaraMQ for RocketMQ. Jika pesan ditemukan, pesan tersebut telah berhasil dikirim ke Antrian Pesan broker Apache RocketMQ.
Menggunakan SDK Klien TCP untuk Berlangganan Pesan Normal
Setelah pesan normal dikirim, mulailah konsumen untuk berlangganan pesan. Gunakan contoh kode berikut sesuai dengan bahasa pemrograman yang digunakan untuk memulai konsumen dan menguji fitur berlangganan pesan. Pastikan untuk mengonfigurasi parameter sesuai petunjuk.
Java
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan rahasia AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
properties.put(PropertyKeyConst.AccessKey, "XXX");
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan rahasia AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
properties.put(PropertyKeyConst.SecretKey, "XXX");
// Tentukan titik akhir TCP. Anda dapat melihat titik akhir di bagian Titik Akhir TCP halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
// Mode konsumsi klustering. Ini adalah mode default.
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// Mode konsumsi siaran.
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Berlangganan beberapa tag.
public Action consume(Message message, ConsumeContext context) {
System.out.println("Terima: " + message);
return Action.CommitMessage;
}
});
// Berlangganan topik lain. Untuk berhenti berlangganan dari topik ini, hapus kode berlangganan dan mulai ulang konsumen.
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Berlangganan semua tag.
public Action consume(Message message, ConsumeContext context) {
System.out.println("Terima: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Konsumen Dimulai");
}
} .NET
using System;
using System.Threading;
using System.Text;
using ons;
// Fungsi callback yang dieksekusi ketika pesan ditarik dari broker ApsaraMQ for RocketMQ.
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
// Konfigurasikan akun Anda. Anda dapat memperoleh informasi akun di Konsol Manajemen Alibaba Cloud.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan ID AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan ID AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// Tentukan titik akhir TCP. Anda dapat melihat titik akhir di bagian Titik Akhir TCP halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// Jalur log.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Konsumsi klustering.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// Konsumsi siaran.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// Buat instance konsumen.
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// Berlangganan topik.
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// Mulai instance produser.
consumer.start();
// Pengaturan ini hanya untuk demo. Di lingkungan produksi sebenarnya, Anda tidak dapat keluar dari proses.
Thread.Sleep(300000);
// Sebelum keluar dari proses, hentikan instance konsumen.
consumer.shutdown();
}
} C/C++
#include "ONSFactory.h"
using namespace ons;
// Buat instance MyMsgListener untuk mengonsumsi pesan.
// Setelah konsumen push menarik pesan, fungsi konsumen dari instance dipanggil.
class MyMsgListener : public MessageListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual Action consume(Message &message, ConsumeContext &context)
{
// Tentukan logika untuk memproses pesan.
return CommitMessage; //CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
// Parameter yang diperlukan untuk membuat dan menjalankan konsumen push.
ONSFactoryProperty factoryInfo;
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Tentukan titik akhir TCP. Anda dapat melihat titik akhir di bagian Titik Akhir TCP halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan ID AccessKey, lihat Buat pasangan AccessKey di bagian "Prasyarat".
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara mendapatkan rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
// Mode konsumsi klustering. Ini adalah mode default.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
// Mode konsumsi siaran.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
//buat pushConsumer
PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
// Tentukan topik dan tag yang akan dilanggan oleh konsumen push. Daftarkan fungsi callback pesan.
MyMsgListener msglistener;
pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
//mulai pushConsumer
pushConsumer->start();
// Catatan: Metode shutdown() hanya dapat dipanggil ketika tidak ada pesan yang diterima. Setelah metode shutdown() dipanggil, konsumen keluar dan tidak lagi menerima pesan.
// Hentikan konsumen push. Sebelum Anda keluar dari aplikasi, Anda harus menghentikan konsumen. Jika tidak, masalah seperti kebocoran memori mungkin terjadi.
pushConsumer->shutdown();
return 0;
}Periksa apakah langganan pesan berhasil
- Di halaman Detail Instans, klik Groups di panel navigasi sebelah kiri.
- Di halaman Groups, klik tab TCP.
- Temukan Group ID yang ingin Anda periksa, lalu klik Details di kolom Actions.Jika nilai Consumer Status adalah Online dan parameter Apakah Langganan Konsisten bernilai Ya, langganan pesan berhasil.