Setelah membuat sumber daya yang diperlukan di konsol ApsaraMQ for RocketMQ, Anda dapat menggunakan SDK Klien HTTP ApsaraMQ for RocketMQ untuk mengirim dan berlangganan pesan normal.
Prasyarat
- Catatan
Pesan normal digunakan dalam contoh yang disediakan. Topik yang dibuat untuk pesan normal tidak dapat digunakan untuk mengirim atau berlangganan jenis pesan lainnya, seperti pesan terjadwal, pesan tertunda, pesan terurut, dan pesan transaksional. Buat topik sesuai dengan tipe pesan yang Anda gunakan.
Unduh dan instal SDK klien HTTP
ApsaraMQ for RocketMQ menyediakan SDK Klien HTTP untuk beberapa bahasa pemrograman. Unduh dan instal SDK sesuai dengan kebutuhan bisnis Anda.
Java SDK
PHP SDK
Go SDK
Python SDK
Node.js SDK
C# SDK
C++ SDK
Gunakan SDK klien HTTP untuk mengirim pesan normal
Setelah mendapatkan SDK Klien untuk bahasa pemrograman tertentu, jalankan kode contoh untuk mengirim pesan normal:
Java
// Komentar dalam bahasa Indonesia
// Impor perpustakaan yang diperlukan.
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class Producer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// Tentukan titik akhir HTTP.
"${HTTP_ENDPOINT}",
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${ACCESS_KEY}",
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${SECRET_KEY}"
);
// Topik tempat pesan milik.
final String topic = "${TOPIC}";
// ID instance tempat topik milik. Nilai default: null.
final String instanceId = "${INSTANCE_ID}";
// Dapatkan produsen yang mengirim pesan ke topik.
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// Kirim empat pesan secara siklik.
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// Pesan normal.
pubMsg = new TopicMessage(
// Isi pesan.
"hello mq!".getBytes(),
// Tag pesan.
"A"
);
// Atribut pesan.
pubMsg.getProperties().put("a", String.valueOf(i));
// Kunci pesan.
pubMsg.setMessageKey("MessageKey");
} else {
pubMsg = new TopicMessage(
// Isi pesan.
"hello mq!".getBytes(),
// Tag pesan.
"A"
);
// Atribut pesan.
pubMsg.getProperties().put("a", String.valueOf(i));
// Jadwalkan pengiriman pesan 10 detik kemudian.
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// Kirim pesan dalam mode sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// Kirim pesan dalam mode sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
System.out.println(new Date() + " Kirim pesan mq berhasil. Topik adalah:" + topic + ", msgId adalah: " + pubResultMsg.getMessageId()
+ ", bodyMD5 adalah: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim lagi.
System.out.println(new Date() + " Kirim pesan mq gagal. Topik adalah:" + topic);
e.printStackTrace();
}
mqClient.close();
}
} Go
// Komentar dalam bahasa Indonesia
package main
import (
"fmt"
"time"
"strconv"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// Tentukan titik akhir HTTP.
endpoint := "${HTTP_ENDPOINT}"
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
accessKey := "${ACCESS_KEY}"
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
secretKey := "${SECRET_KEY}"
// Topik tempat pesan milik.
topic := "${TOPIC}"
// ID instance tempat topik milik. Nilai default: null.
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// Kirim empat pesan secara siklik.
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
if i%2 == 0 {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", // Isi pesan.
MessageTag: "", // Tag pesan.
Properties: map[string]string{}, // Atribut pesan.
}
// Kunci pesan.
msg.MessageKey = "MessageKey"
// Atribut pesan.
msg.Properties["a"] = strconv.Itoa(i)
} else {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq timer!", // Isi pesan.
MessageTag: "", // Tag pesan.
Properties: map[string]string{}, // Atribut pesan.
}
// Atribut pesan.
msg.Properties["a"] = strconv.Itoa(i)
// Jadwalkan pengiriman pesan 10 detik kemudian. Nilainya adalah timestamp UNIX dalam milidetik.
msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
}
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
} PHP
// Komentar dalam bahasa Indonesia
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
// Tentukan titik akhir HTTP.
"${HTTP_ENDPOINT}",
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${ACCESS_KEY}",
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${SECRET_KEY}"
);
// Topik tempat pesan milik.
$topic = "${TOPIC}";
// ID instance tempat topik milik. Nilai default: null.
$instanceId = "${INSTANCE_ID}";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i=1; $i<=4; $i++)
{
$publishMessage = new TopicMessage(
"xxxxxxxx"// Isi pesan.
);
// Atribut pesan.
$publishMessage->putProperty("a", $i);
// Kunci pesan.
$publishMessage->setMessageKey("MessageKey");
if ($i % 2 == 0) {
// Jadwalkan pengiriman pesan 10 detik kemudian.
$publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
}
$result = $this->producer->publishMessage($publishMessage);
print "Kirim pesan mq berhasil. msgId adalah:" . $result->getMessageId() . ", bodyMD5 adalah:" . $result->getMessageBodyMD5() . "\n";
}
} catch (\Exception $e) {
print_r($e->getMessage() . "\n");
}
}
}
$instance = new ProducerTest();
$instance->run();
?> Python
// Komentar dalam bahasa Indonesia
#!/usr/bin/env python
# coding=utf8
import sys
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time
# Inisialisasi klien.
mq_client = MQClient(
# Tentukan titik akhir HTTP.
"${HTTP_ENDPOINT}",
# ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${ACCESS_KEY}",
# Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${SECRET_KEY}"
)
# Topik tempat pesan milik.
topic_name = "${TOPIC}"
# ID instance tempat topik milik. Nilai default: None.
instance_id = "${INSTANCE_ID}"
producer = mq_client.get_producer(instance_id, topic_name)
# Kirim beberapa pesan secara siklik.
msg_count = 4
print("%sPublikasikan Pesan Ke %s\nNamaTopik:%s\nJumlahPesan:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
if i % 2 == 0:
msg = TopicMessage(
# Isi pesan.
"Saya adalah pesan uji %s.Halo" % i,
# Tag pesan.
""
)
# Atribut pesan.
msg.put_property("a", "i")
# Kunci pesan.
msg.set_message_key("MessageKey")
re_msg = producer.publish_message(msg)
print("Publikasikan Pesan Berhasil. IDPesan:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
else:
msg = TopicMessage(
# Isi pesan.
"Saya adalah pesan uji %s." % i,
# Tag pesan.
""
)
msg.put_property("a", i)
# Jadwalkan waktu absolut dalam milidetik untuk mengirim pesan.
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
re_msg = producer.publish_message(msg)
print("Publikasikan Pesan Timer Berhasil. IDPesan:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
time.sleep(1)
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topik tidak ada, silakan buat.")
sys.exit(1)
print("Publikasikan Pesan Gagal. Pengecualian:%s" % e) Node.js
// Komentar dalam bahasa Indonesia
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
// Tentukan titik akhir HTTP.
const endpoint = "${HTTP_ENDPOINT}";
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
const accessKeyId = "${ACCESS_KEY}";
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
const accessKeySecret = "${SECRET_KEY}";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topik tempat pesan milik.
const topic = "${TOPIC}";
// ID instance tempat topik milik. Nilai default: null.
const instanceId = "${INSTANCE_ID}";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
// Kirim empat pesan secara siklik.
for(var i = 0; i < 4; i++) {
let res;
if (i % 2 == 0) {
msgProps = new MessageProperties();
// Atribut pesan.
msgProps.putProperty("a", i);
// Kunci pesan.
msgProps.messageKey("MessageKey");
res = await producer.publishMessage("hello mq.", "", msgProps);
} else {
msgProps = new MessageProperties();
// Atribut pesan.
msgProps.putProperty("a", i);
// Jadwalkan pengiriman pesan 10 detik kemudian.
msgProps.startDeliverTime(Date.now() + 10 * 1000);
res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
}
console.log("Publikasikan pesan: IDPesan:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
// Tentukan logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim lagi.
console.log(e)
}
})(); C++
// Komentar dalam bahasa Indonesia
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// Tentukan titik akhir HTTP.
"${HTTP_ENDPOINT}",
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${ACCESS_KEY}",
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${SECRET_KEY}"
);
// Topik tempat pesan milik.
string topic = "${TOPIC}";
// ID instance tempat topik milik. Nilai default: null.
string instanceId = "${INSTANCE_ID}";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
if (i % 4 == 0) {
// Publikasikan pesan, hanya memiliki tubuh.
producer->publishMessage("Halo, mq!", pmResp);
} else if (i % 4 == 1) {
// Publikasikan pesan, hanya memiliki tubuh dan tag.
producer->publishMessage("Halo, mq!memiliki tag!", "tag", pmResp);
} else if (i % 4 == 2) {
// Publikasikan pesan, memiliki tubuh, tag, properti, dan kunci.
TopicMessage pubMsg("Halo, mq!memiliki kunci!");
pubMsg.putProperty("a",std::to_string(i));
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
} else {
// Publikasikan pesan timer, pesan akan dikonsumsi setelah StartDeliverTime
TopicMessage pubMsg("Halo, mq!pesan timer!", "tag");
// StartDeliverTime adalah waktu absolut dalam milidetik.
pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
pubMsg.putProperty("b",std::to_string(i));
pubMsg.putProperty("c",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
}
cout << "Publikasikan pesan mq berhasil. Topik adalah: " << topic
<< ", msgId adalah:" << pmResp.getMessageId()
<< ", bodyMD5 adalah:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Permintaan Gagal: " + me.GetErrorCode() << ", requestId adalah:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Permintaan Gagal: " + mb.ToString() << endl;
return -2;
}
return 0;
} C#
// Komentar dalam bahasa Indonesia
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class ProducerSample
{
// Tentukan titik akhir HTTP.
private const string _endpoint = "${HTTP_ENDPOINT}";
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
private const string _accessKeyId = "${ACCESS_KEY}";
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
private const string _secretAccessKey = "${SECRET_KEY}";
// Topik tempat pesan milik.
private const string _topicName = "${TOPIC}";
// ID instance tempat topik milik. Nilai default: null.
private const string _instanceId = "${INSTANCE_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
// Kirim empat pesan secara siklik.
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg;
if (i % 2 == 0)
{
sendMsg = new TopicMessage("dfadfadfadf");
// Atribut pesan.
sendMsg.PutProperty("a", i.ToString());
// Kunci pesan.
sendMsg.MessageKey = "MessageKey";
}
else
{
sendMsg = new TopicMessage("dfadfadfadf", "tag");
// Atribut pesan.
sendMsg.PutProperty("a", i.ToString());
// Jadwalkan pengiriman pesan 10 detik kemudian.
sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
}
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publis pesan berhasil:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
} Anda juga dapat memulai instans dengan langkah-langkah berikut: Masuk ke konsol ApsaraMQ for RocketMQ. Temukan instans yang telah dibuat dan klik More di kolom Actions. Pilih Quick Start dari daftar drop-down.
Gunakan SDK klien HTTP untuk mengonsumsi pesan normal
Setelah pesan normal dikirim, mulailah konsumen untuk mengonsumsi pesan tersebut. Gunakan kode contoh berikut untuk bahasa pemrograman tertentu sesuai kebutuhan bisnis Anda. Tentukan parameter berdasarkan komentar dalam kode:
Java
// Komentar dalam bahasa Indonesia
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// Tentukan titik akhir HTTP.
"${HTTP_ENDPOINT}",
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${ACCESS_KEY}",
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${SECRET_KEY}"
);
// Topik tempat pesan milik.
final String topic = "${TOPIC}";
// ID grup yang Anda buat di konsol ApsaraMQ forRocketMQ. ID grup juga dikenal sebagai ID konsumen.
final String groupId = "${GROUP_ID}";
// ID instance tempat topik milik. Nilai default: null.
final String instanceId = "${INSTANCE_ID}";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Konsumsi pesan secara siklik dalam thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan.
do {
List<Message> messages = null;
try {
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, permintaan digantung di server selama 3 detik. Jika pesan tersedia untuk dikonsumsi dalam durasi tersebut, respons segera dikirim ke klien.
messages = consumer.consumeMessage(
3,// Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3// Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// Tidak ada pesan.
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": tidak ada pesan baru, lanjutkan!");
continue;
}
// Logika konsumsi pesan.
for (Message message : messages) {
System.out.println("Terima pesan: " + message);
}
// Jika broker gagal menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter Message.nextConsumeTime berakhir, pesan dikonsumsi lagi.
// Timestamp unik ditentukan untuk penanganan pesan setiap kali pesan dikonsumsi.
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// Jika penanganan beberapa pesan kedaluwarsa, broker gagal menerima ACK dari konsumen untuk pesan tersebut.
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Pengakuan pesan gagal, requestId adalah:" + errors.getRequestId() + ", penanganan gagal:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Penanganan:" + errorHandle + ", Kode Kesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", Pesan Kesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
} Go
// Komentar dalam bahasa Indonesia
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// Tentukan titik akhir HTTP.
endpoint := "${HTTP_ENDPOINT}"
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
accessKey := "${ACCESS_KEY}"
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
secretKey := "${SECRET_KEY}"
// Topik tempat pesan milik.
topic := "${TOPIC}"
// ID instance tempat topik milik. Nilai default: null.
instanceId := "${INSTANCE_ID}"
// ID grup yang Anda buat di konsol ApsaraMQ forRocketMQ. ID grup juga dikenal sebagai ID konsumen.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// Logika konsumsi pesan.
var handles []string
fmt.Printf("Konsumsi %d pesan---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, WaktuPublikasi: %d, MessageTag: %s\n"+
"\tJumlahDikonsumsi: %d, WaktuKonsumsiPertama: %d, WaktuKonsumsiBerikutnya: %d\n"+
"\tIsi: %s\n"+
"\tProperti: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
}
// Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter NextConsumeTime berakhir, pesan dikonsumsi lagi.
// Timestamp unik ditentukan untuk penanganan pesan setiap kali pesan dikonsumsi.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// Jika penanganan beberapa pesan kedaluwarsa, broker gagal menerima ACK dari konsumen untuk pesan tersebut.
fmt.Println(ackerr)
for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
fmt.Printf("\tErrorHandle:%s, KodeKesalahan:%s, PesanKesalahan:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// Tidak ada pesan.
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nTidak ada pesan baru, lanjutkan!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Waktu habis saat mengonsumsi pesan ??")
endChan <- 1
}
}
}()
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, permintaan digantung di server selama 3 detik. Jika pesan tersedia untuk dikonsumsi dalam durasi tersebut, respons segera dikirim ke klien.
mqConsumer.ConsumeMessage(respChan, errChan,
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3, // Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
)
<-endChan
}
} PHP
// Komentar dalam bahasa Indonesia
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
// Tentukan titik akhir HTTP.
"${HTTP_ENDPOINT}",
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${ACCESS_KEY}",
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${SECRET_KEY}"
);
// Topik tempat pesan milik.
$topic = "${TOPIC}";
// ID grup yang Anda buat di konsol ApsaraMQ forRocketMQ. ID grup juga dikenal sebagai ID konsumen.
$groupId = "${GROUP_ID}";
// ID instance tempat topik milik. Nilai default: null.
$instanceId = "${INSTANCE_ID}";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function run()
{
// Konsumsi pesan secara siklik dalam thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan.
while (True) {
try {
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, permintaan digantung di server selama 3 detik. Jika pesan tersedia untuk dikonsumsi dalam durasi tersebut, respons segera dikirim ke klien.
$messages = $this->consumer->consumeMessage(
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3 // Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\MessageNotExistException) {
// Jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, mode polling panjang terus berlaku.
printf("Tidak ada pesan, lanjutkan polling panjang!RequestId:%s\n", $e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "konsumsi selesai, pesan:\n";
// Logika konsumsi pesan.
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nWaktuPublikasi:%d, WaktuKonsumsiPertama:%d, \nJumlahDikonsumsi:%d, WaktuKonsumsiBerikutnya:%d,MessageKey:%s\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter $message->getNextConsumeTime() berakhir, pesan dikonsumsi lagi.
// Timestamp unik ditentukan untuk penanganan pesan setiap kali pesan dikonsumsi.
print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\AckMessageException) {
// Jika penanganan beberapa pesan kedaluwarsa, broker gagal menerima ACK dari konsumen untuk pesan tersebut.
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, KodeKesalahan:%s, PesanKesalahan:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
print "ack selesai\n";
}
}
}
$instance = new ConsumerTest();
$instance->run();
?> Python
// Komentar dalam bahasa Indonesia
#!/usr/bin/env python
# coding=utf8
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Inisialisasi klien.
mq_client = MQClient(
# Tentukan titik akhir HTTP.
"",
# ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${ACCESS_KEY}",
# Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${SECRET_KEY}"
)
# Topik tempat pesan milik.
topic_name = "${TOPIC}"
# ID grup yang Anda buat di konsol ApsaraMQ forRocketMQ.
group_id = "GID_test"
# ID instance tempat topik milik. Nilai default: None.
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, permintaan digantung di server selama 3 detik. Jika pesan tersedia untuk dikonsumsi dalam durasi tersebut, respons segera dikirim ke klien.
# Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
wait_seconds = 3
# Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
batch = 3
print "%sKonsumsi Dan Ak Pesan Dari Topik%s\nNamaTopik:%s\nMQConsumer:%s\nDurasiTunggu:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
try:
# Konsumsi pesan dalam mode polling panjang.
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print "Terima, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nJumlahDikonsumsi: %s \
\nWaktuPublikasi: %s\nIsi: %s \
\nWaktuKonsumsiBerikutnya: %s \
\nReceiptHandle: %s" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle)
except MQExceptionBase, e:
if e.type == "MessageNotExist":
print "Tidak ada pesan baru! RequestId: %s" % e.req_id
continue
print "Konsumsi Pesan Gagal! Pengecualian:%s\n" % e
time.sleep(2)
continue
# Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter #msg.next_consume_time berakhir, pesan dikonsumsi lagi.
// Timestamp unik ditentukan untuk penanganan pesan setiap kali pesan dikonsumsi.
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print "Ak %s Pesan Berhasil.\n\n" % len(receipt_handle_list)
except MQExceptionBase, e:
print "\nAk Pesan Gagal! Pengecualian:%s" % e
# Jika penanganan beberapa pesan kedaluwarsa, broker gagal menerima ACK dari konsumen untuk pesan tersebut.
if e.sub_errors:
for sub_error in e.sub_errors:
print "\tErrorHandle:%s,KodeKesalahan:%s,PesanKesalahan:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])Node.js
// Komentar dalam bahasa Indonesia
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
// Tentukan titik akhir HTTP.
const endpoint = "${HTTP_ENDPOINT}";
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
const accessKeyId = "${ACCESS_KEY}";
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
const accessKeySecret = "${SECRET_KEY}";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topik tempat pesan milik.
const topic = "${TOPIC}";
// ID grup yang Anda buat di konsol ApsaraMQ forRocketMQ. ID grup juga dikenal sebagai ID konsumen.
const groupId = "${GROUP_ID}";
// ID instance tempat topik milik. Nilai default: null.
const instanceId = "${INSTANCE_ID}";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
// Konsumsi pesan secara siklik.
while(true) {
try {
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, permintaan digantung di server selama 3 detik. Jika pesan tersedia untuk dikonsumsi dalam durasi tersebut, respons segera dikirim ke klien.
res = await consumer.consumeMessage(
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3 // Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
if (res.code == 200) {
// Logika konsumsi pesan.
console.log("Konsumsi Pesan, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,WaktuPublikasi:%d,WaktuKonsumsiBerikutnya:%d,WaktuKonsumsiPertama:%d,JumlahDikonsumsi:%d,Isi:%s" +
",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
return message.ReceiptHandle;
});
// Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter message.NextConsumeTime berakhir, pesan dikonsumsi lagi.
// Timestamp unik ditentukan untuk penanganan pesan setiap kali pesan dikonsumsi.
res = await consumer.ackMessage(handles);
if (res.code != 204) {
// Jika penanganan beberapa pesan kedaluwarsa, broker gagal menerima ACK dari konsumen untuk pesan tersebut.
console.log("Ack Pesan Gagal:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Kode:%s, Alasan:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
// Dapatkan ACK dari konsumen.
console.log("Ack Pesan berhasil, RequestId:%s\n\t", res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
// Jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, mode polling panjang terus berlaku.
console.log("Konsumsi Pesan: tidak ada pesan baru, RequestId:%s, Kode:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})(); C++
// Komentar dalam bahasa Indonesia
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// Tentukan titik akhir HTTP.
"${HTTP_ENDPOINT}",
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${ACCESS_KEY}",
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
"${SECRET_KEY}"
);
// Topik tempat pesan milik.
string topic = "${TOPIC}";
// ID grup yang Anda buat di konsol ApsaraMQ forRocketMQ. ID grup juga dikenal sebagai ID konsumen.
string groupId = "${GROUP_ID}";
// ID instance tempat topik milik. Nilai default: null.
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, permintaan digantung di server selama 3 detik. Jika pesan tersedia untuk dikonsumsi dalam durasi tersebut, respons segera dikirim ke klien.
consumer->consumeMessage(
3,// Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3,// Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
messages
);
cout << "Konsumsi: " << messages.size() << " Pesan!" << endl;
// Logika konsumsi pesan.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " WaktuPublikasi: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Isi: " << iter->getMessageBody()
<< " WaktuKonsumsiPertama: " << iter->getFirstConsumeTime()
<< " WaktuKonsumsiBerikutnya: " << iter->getNextConsumeTime()
<< " JumlahDikonsumsi: " << iter->getConsumedTimes()
<< " Properti: " << iter->getPropertiesAsString()
<< " Kunci: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Dapatkan ACK dari konsumen.
// Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter Message.NextConsumeTime berakhir, pesan dikonsumsi lagi.
// Timestamp unik ditentukan untuk penanganan pesan setiap kali pesan dikonsumsi.
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// Jika penanganan beberapa pesan kedaluwarsa, broker gagal menerima ACK dari konsumen untuk pesan tersebut.
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " pesan berhasil!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "Tidak ada pesan untuk dikonsumsi! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Permintaan Gagal: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Permintaan Gagal: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
} C#
// Komentar dalam bahasa Indonesia
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class ConsumerSample
{
// Tentukan titik akhir HTTP.
private const string _endpoint = "${HTTP_ENDPOINT}";
// ID AccessKey. ID AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
private const string _accessKeyId = "${ACCESS_KEY}";
// Rahasia AccessKey. Rahasia AccessKey digunakan untuk otentikasi identitas. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey di bagian Prasyarat.
private const string _secretAccessKey = "${SECRET_KEY}";
// Topik tempat pesan milik.
private const string _topicName = "${TOPIC}";
// ID instance tempat topik milik. Nilai default: null.
private const string _instanceId = "${INSTANCE_ID}";
// ID grup yang Anda buat di konsol ApsaraMQ forRocketMQ. ID grup juga dikenal sebagai ID konsumen.
private const string _groupId = "${GROUP_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// Konsumsi pesan secara siklik dalam thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan.
while (true)
{
try
{
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, permintaan digantung di server selama 3 detik. Jika pesan tersedia untuk dikonsumsi dalam durasi tersebut, respons segera dikirim ke klien.
List<Message> messages = null;
try
{
messages = consumer.ConsumeMessage(
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3 // Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilai diatur ke 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " Tidak ada pesan baru, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name + " Terima Pesan:");
// Logika konsumsi pesan.
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Properti a adalah:" + message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter Message.nextConsumeTime berakhir, pesan dikonsumsi lagi.
// Timestamp unik ditentukan untuk penanganan pesan setiap kali pesan dikonsumsi.
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack pesan berhasil:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// Jika penanganan beberapa pesan kedaluwarsa, broker gagal menerima ACK dari konsumen untuk pesan tersebut.
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack pesan gagal, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",KodeKesalahan:" + errorItem.ErrorCode + ",PesanKesalahan:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
} Apa yang harus dilakukan selanjutnya
Anda dapat menanyakan pesan dan jejaknya untuk memverifikasi apakah pesan telah dikonsumsi. Untuk informasi lebih lanjut, lihat Kueri Pesan dan Kueri Jejak Pesan.