ApsaraMQ for RocketMQ menyediakan SDK klien HTTP untuk tujuh bahasa pemrograman. Gunakan SDK ini untuk mengirim pesan normal ke topik, mengonsumsinya dari kelompok konsumen, dan mengakui pengiriman tersebut.
Topik yang dibuat untuk pesan normal tidak dapat digunakan untuk jenis pesan lain seperti pesan terjadwal, pesan tertunda, pesan terurut, atau pesan transaksional. Buat topik terpisah untuk setiap jenis pesan.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Instans ApsaraMQ for RocketMQ, topik (jenis pesan: normal), dan ID grup. Untuk informasi selengkapnya, lihat Buat sumber daya
Pasangan AccessKey (ID AccessKey dan Rahasia AccessKey) untuk autentikasi. Untuk informasi selengkapnya, lihat Buat pasangan AccessKey
Placeholder
Ganti placeholder berikut dalam semua contoh kode dengan nilai aktual Anda:
| Placeholder | Deskripsi | Contoh |
|---|---|---|
<your-http-endpoint> | Titik akhir HTTP instans Anda | http://1234567890123456.mqrest.cn-hangzhou.aliyuncs.com |
<your-access-key-id> | ID AccessKey | LTAI5tXxx |
<your-access-key-secret> | Rahasia AccessKey | xXxXxXx |
<your-topic> | Nama topik | normal-topic-http |
<your-instance-id> | ID instans | MQ_INST_1380xxx_BbXbx0Y4 |
<your-group-id> | ID grup (ID konsumen) | GID_http_test |
Langkah 1: Instal SDK
Pilih SDK sesuai bahasa pemrograman Anda dan instal SDK tersebut.
Java SDK
PHP SDK
Go SDK
Python SDK
Node.js SDK
C# SDK
C++ SDK
Java
Untuk informasi selengkapnya, lihat deskripsi SDK Java dan catatan rilis.
Go
go get github.com/aliyunmq/mq-http-go-sdkUntuk informasi selengkapnya, lihat deskripsi SDK Go dan catatan rilis.
Python
pip install mq_http_sdkUntuk informasi selengkapnya, lihat deskripsi SDK Python dan catatan rilis.
PHP
composer require aliyunmq/mq-http-sdkUntuk informasi selengkapnya, lihat deskripsi SDK PHP dan catatan rilis.
Node.js
npm install @aliyunmq/mq-http-sdk --saveUntuk informasi selengkapnya, lihat deskripsi SDK Node.js dan catatan rilis.
C\#
Untuk informasi selengkapnya, lihat deskripsi SDK C# dan catatan rilis.
C++
Unduh kode sumber SDK dan build menggunakan CMake. Untuk informasi selengkapnya, lihat deskripsi SDK C++ dan catatan rilis.
Langkah 2: Kirim pesan normal
Setiap contoh produsen mengikuti pola berikut:
Buat
MQClientdengan titik akhir HTTP dan pasangan AccessKey Anda.Dapatkan produsen untuk instans dan topik Anda.
Publikasikan pesan dalam loop, dengan pengiriman tertunda opsional.
Pemanggilan publishMessage bersifat sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan berhasil dikirim.
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class Producer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
final String topic = "<your-topic>";
final String instanceId = "<your-instance-id>";
// Dapatkan produsen untuk instans dan topik yang ditentukan
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// Kirim pesan normal dengan tag, properti, dan kunci pesan
pubMsg = new TopicMessage(
"hello mq!".getBytes(),
"A" // Tag pesan
);
pubMsg.getProperties().put("a", String.valueOf(i));
pubMsg.setMessageKey("MessageKey");
} else {
// Kirim pesan tertunda (dikirimkan 10 detik kemudian)
pubMsg = new TopicMessage(
"hello mq!".getBytes(),
"A"
);
pubMsg.getProperties().put("a", String.valueOf(i));
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// publishMessage bersifat sinkron. Tidak ada pengecualian berarti sukses.
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
System.out.println(new Date() + " Kirim pesan mq berhasil. Topik:" + topic
+ ", msgId:" + pubResultMsg.getMessageId()
+ ", bodyMD5:" + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// Tangani kegagalan pengiriman: coba ulang atau simpan pesan
System.out.println(new Date() + " Kirim pesan mq gagal. Topik:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}Go
package main
import (
"fmt"
"time"
"strconv"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
endpoint := "<your-http-endpoint>"
accessKey := "<your-access-key-id>"
secretKey := "<your-access-key-secret>"
topic := "<your-topic>"
instanceId := "<your-instance-id>"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
if i%2 == 0 {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!",
MessageTag: "",
Properties: map[string]string{},
}
msg.MessageKey = "MessageKey"
msg.Properties["a"] = strconv.Itoa(i)
} else {
// Pesan tertunda: dikirimkan 10 detik kemudian
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq timer!",
MessageTag: "",
Properties: map[string]string{},
}
msg.Properties["a"] = strconv.Itoa(i)
// StartDeliverTime adalah stempel waktu UNIX dalam milidetik
msg.StartDeliverTime = time.Now().UTC().Unix()*1000 + 10*1000
}
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n",
ret.MessageId, ret.MessageBodyMD5)
time.Sleep(time.Duration(100) * time.Millisecond)
}
}Python
#!/usr/bin/env python
# coding=utf8
import sys
import time
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
# Inisialisasi klien
mq_client = MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
)
topic_name = "<your-topic>"
instance_id = "<your-instance-id>"
producer = mq_client.get_producer(instance_id, topic_name)
# Kirim 4 pesan
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n"
% (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
if i % 2 == 0:
# Pesan normal dengan tag, properti, dan kunci pesan
msg = TopicMessage(
"I am test message %s. Hello" % i,
"" # Tag pesan
)
msg.put_property("a", "i")
msg.set_message_key("MessageKey")
re_msg = producer.publish_message(msg)
print("Publish Message Succeed. MessageID:%s, BodyMD5:%s"
% (re_msg.message_id, re_msg.message_body_md5))
else:
# Pesan tertunda: dikirimkan 5 detik kemudian
msg = TopicMessage(
"I am test message %s." % i,
""
)
msg.put_property("a", i)
# Waktu absolut dalam milidetik
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
re_msg = producer.publish_message(msg)
print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s"
% (re_msg.message_id, re_msg.message_body_md5))
time.sleep(1)
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topik tidak ada, harap buat terlebih dahulu.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
$topic = "<your-topic>";
$instanceId = "<your-instance-id>";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i=1; $i<=4; $i++)
{
$publishMessage = new TopicMessage(
"xxxxxxxx" // Isi pesan
);
$publishMessage->putProperty("a", $i);
$publishMessage->setMessageKey("MessageKey");
if ($i % 2 == 0) {
// Pesan tertunda: dikirimkan 10 detik kemudian
$publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
}
$result = $this->producer->publishMessage($publishMessage);
print "Kirim pesan mq berhasil. msgId:" . $result->getMessageId()
. ", bodyMD5:" . $result->getMessageBodyMD5() . "\n";
}
} catch (\Exception $e) {
print_r($e->getMessage() . "\n");
}
}
}
$instance = new ProducerTest();
$instance->run();
?>Node.js
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
const topic = "<your-topic>";
const instanceId = "<your-instance-id>";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
for(var i = 0; i < 4; i++) {
let res;
if (i % 2 == 0) {
// Pesan normal dengan properti dan kunci pesan
msgProps = new MessageProperties();
msgProps.putProperty("a", i);
msgProps.messageKey("MessageKey");
res = await producer.publishMessage("hello mq.", "", msgProps);
} else {
// Pesan tertunda: dikirimkan 10 detik kemudian
msgProps = new MessageProperties();
msgProps.putProperty("a", i);
msgProps.startDeliverTime(Date.now() + 10 * 1000);
res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
}
console.log("Publish message: MessageID:%s,BodyMD5:%s",
res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
// Tangani kegagalan pengiriman: coba ulang atau simpan pesan
console.log(e)
}
})();C++
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
string topic = "<your-topic>";
string instanceId = "<your-instance-id>";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
if (i % 4 == 0) {
// Pesan hanya dengan isi
producer->publishMessage("Hello, mq!", pmResp);
} else if (i % 4 == 1) {
// Pesan dengan isi dan tag
producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
} else if (i % 4 == 2) {
// Pesan dengan isi, tag, properti, dan kunci
TopicMessage pubMsg("Hello, mq!have key!");
pubMsg.putProperty("a",std::to_string(i));
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
} else {
// Pesan tertunda: dikirimkan 10 detik kemudian
// StartDeliverTime adalah waktu absolut dalam milidetik
TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
pubMsg.putProperty("b",std::to_string(i));
pubMsg.putProperty("c",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
}
cout << "Kirim pesan mq berhasil. Topik:" << topic
<< ", msgId:" << pmResp.getMessageId()
<< ", bodyMD5:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Permintaan Gagal: " + me.GetErrorCode()
<< ", requestId:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Permintaan Gagal: " + mb.ToString() << endl;
return -2;
}
return 0;
}C\#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class ProducerSample
{
private const string _endpoint = "<your-http-endpoint>";
private const string _accessKeyId = "<your-access-key-id>";
private const string _secretAccessKey = "<your-access-key-secret>";
private const string _topicName = "<your-topic>";
private const string _instanceId = "<your-instance-id>";
private static MQClient _client = new Aliyun.MQ.MQClient(
_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg;
if (i % 2 == 0)
{
// Pesan normal dengan properti dan kunci pesan
sendMsg = new TopicMessage("dfadfadfadf");
sendMsg.PutProperty("a", i.ToString());
sendMsg.MessageKey = "MessageKey";
}
else
{
// Pesan tertunda: dikirimkan 10 detik kemudian
sendMsg = new TopicMessage("dfadfadfadf", "tag");
sendMsg.PutProperty("a", i.ToString());
sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp()
+ 10 * 1000;
}
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publis message success:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}Untuk mengirim pesan dari konsol, login ke Konsol ApsaraMQ for RocketMQ, temukan instans Anda, lalu pilih More > Quick Start di kolom Actions.
Langkah 3: Konsumsi pesan normal
Setelah pesan dikirim, mulai konsumen untuk menerima dan memprosesnya. Setiap contoh konsumen mengikuti pola berikut:
Buat
MQClientdengan titik akhir HTTP dan pasangan AccessKey Anda.Dapatkan konsumen untuk instans, topik, dan ID grup Anda.
Ambil pesan dalam loop menggunakan long polling.
Proses setiap pesan, lalu akui dengan mengirimkan receipt handle kembali ke broker.
Long polling menjaga koneksi tetap terbuka selama durasi tertentu (maksimal 30 detik). Jika pesan tiba selama periode tersebut, broker akan segera merespons tanpa menunggu timeout.
Jika broker tidak menerima acknowledgment (ACK) sebelum NextConsumeTime untuk suatu pesan, pesan tersebut akan dikirimkan kembali. Setiap pengiriman menghasilkan receipt handle baru.
| Parameter | Deskripsi | Batas |
|---|---|---|
| Ukuran batch | Jumlah maksimum pesan per permintaan | Maksimal 16 |
| Detik tunggu | Timeout long polling | Maksimal 30 detik |
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
final String topic = "<your-topic>";
final String groupId = "<your-group-id>";
final String instanceId = "<your-instance-id>";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Gunakan beberapa thread untuk konsumsi konkuren di lingkungan produksi
do {
List<Message> messages = null;
try {
// Long polling: tunggu hingga 3 detik untuk pesan
messages = consumer.consumeMessage(
3, // Jumlah maksimum pesan per batch (maksimal 16)
3 // Timeout long polling dalam detik (maksimal 30)
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName()
+ ": tidak ada pesan baru, lanjutkan!");
continue;
}
// Proses pesan
for (Message message : messages) {
System.out.println("Terima pesan: " + message);
}
// Akui pesan untuk mencegah pengiriman ulang
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Gagal mengakui pesan, requestId:"
+ errors.getRequestId() + ", handle gagal:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :
errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle
+ ", KodeError:" + errors.getErrorMessages()
.get(errorHandle).getErrorCode()
+ ", PesanError:" + errors.getErrorMessages()
.get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}Go
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
endpoint := "<your-http-endpoint>"
accessKey := "<your-access-key-id>"
secretKey := "<your-access-key-secret>"
topic := "<your-topic>"
instanceId := "<your-instance-id>"
groupId := "<your-group-id>"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
var handles []string
fmt.Printf("Konsumsi %d pesan---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
v.Properties)
}
// Akui pesan yang telah diproses
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
fmt.Println(ackerr)
for _, errAckItem := range ackerr.(errors.ErrCode).
Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode,
errAckItem.ErrorMsg)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
if strings.Contains(err.(errors.ErrCode).Error(),
"MessageNotExist") {
fmt.Println("\nTidak ada pesan baru, lanjutkan!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout konsumsi pesan ??")
endChan <- 1
}
}
}()
// Long polling: tunggu hingga 3 detik untuk pesan
mqConsumer.ConsumeMessage(respChan, errChan,
3, // Jumlah maksimum pesan per batch (maksimal 16)
3, // Timeout long polling dalam detik (maksimal 30)
)
<-endChan
}
}Python
#!/usr/bin/env python
# coding=utf8
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Inisialisasi klien
mq_client = MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
)
topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# Long polling: tunggu hingga 3 detik untuk pesan
wait_seconds = 3
# Jumlah maksimum pesan per batch (maksimal 16)
batch = 3
print("%sKonsumsi Dan Akui Pesan Dari Topik%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n"
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds))
while True:
try:
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print("Terima, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s"
% (msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle))
except MQExceptionBase as e:
if e.type == "MessageNotExist":
print("Tidak ada pesan baru! RequestId: %s" % e.req_id)
continue
print("Gagal Konsumsi Pesan! Exception:%s\n" % e)
time.sleep(2)
continue
// Akui pesan yang telah diproses untuk mencegah pengiriman ulang
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print("Akui %s Pesan Berhasil.\n\n" % len(receipt_handle_list))
except MQExceptionBase as e:
print("\nGagal Akui Pesan! Exception:%s" % e)
if e.sub_errors:
for sub_error in e.sub_errors:
print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s"
% (sub_error["ReceiptHandle"],
sub_error["ErrorCode"],
sub_error["ErrorMessage"]))PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
$topic = "<your-topic>";
$groupId = "<your-group-id>";
$instanceId = "<your-instance-id>";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function run()
{
// Gunakan beberapa thread untuk konsumsi konkuren di lingkungan produksi
while (True) {
try {
// Long polling: tunggu hingga 3 detik untuk pesan
$messages = $this->consumer->consumeMessage(
3, // Jumlah maksimum pesan per batch (maksimal 16)
3 // Timeout long polling dalam detik (maksimal 30)
);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\MessageNotExistException) {
printf("Tidak ada pesan, lanjutkan long polling!RequestId:%s\n",
$e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "konsumsi selesai, pesan:\n";
// Proses pesan
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, "
. "FirstConsumeTime:%d, \nConsumedTimes:%d, "
. "NextConsumeTime:%d,MessageKey:%s\n",
$message->getMessageId(), $message->getMessageTag(),
$message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(),
$message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// Akui pesan yang telah diproses
print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\AckMessageException) {
printf("Kesalahan Ack, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
$errorItem->getReceiptHandle(),
$errorItem->getErrorCode(),
$errorItem->getErrorCode());
}
}
}
print "ack selesai\n";
}
}
}
$instance = new ConsumerTest();
$instance->run();
?>Node.js
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
const topic = "<your-topic>";
const groupId = "<your-group-id>";
const instanceId = "<your-instance-id>";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
while(true) {
try {
// Long polling: tunggu hingga 3 detik untuk pesan
res = await consumer.consumeMessage(
3, // Jumlah maksimum pesan per batch (maksimal 16)
3 // Timeout long polling dalam detik (maksimal 30)
);
if (res.code == 200) {
console.log("Konsumsi Pesan, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,"
+ "FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s"
+ ",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime,
message.NextConsumeTime, message.FirstConsumeTime,
message.ConsumedTimes,
message.MessageBody, message.Properties, message.MessageKey,
message.Properties.a);
return message.ReceiptHandle;
});
// Akui pesan yang telah diproses
res = await consumer.ackMessage(handles);
if (res.code != 204) {
console.log("Gagal Akui Pesan:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n",
error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
console.log("Akui Pesan berhasil, RequestId:%s\n\t",
res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
console.log("Konsumsi Pesan: tidak ada pesan baru, RequestId:%s, Code:%s",
e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();C++
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
string topic = "<your-topic>";
string groupId = "<your-group-id>";
string instanceId = "<your-instance-id>";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Long polling: tunggu hingga 3 detik untuk pesan
consumer->consumeMessage(
3, // Jumlah maksimum pesan per batch (maksimal 16)
3, // Timeout long polling dalam detik (maksimal 30)
messages
);
cout << "Konsumsi: " << messages.size() << " Pesan!" << endl;
// Proses pesan
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Akui pesan yang telah diproses
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter =
failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " pesan berhasil!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "Tidak ada pesan untuk dikonsumsi! RequestId: "
+ me.GetRequestId() << endl;
continue;
}
cout << "Permintaan Gagal: " + me.GetErrorCode()
+ ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Permintaan Gagal: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}C\#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class ConsumerSample
{
private const string _endpoint = "<your-http-endpoint>";
private const string _accessKeyId = "<your-access-key-id>";
private const string _secretAccessKey = "<your-access-key-secret>";
private const string _topicName = "<your-topic>";
private const string _instanceId = "<your-instance-id>";
private const string _groupId = "<your-group-id>";
private static MQClient _client = new Aliyun.MQ.MQClient(
_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(
_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// Gunakan beberapa thread untuk konsumsi konkuren di lingkungan produksi
while (true)
{
try
{
List<Message> messages = null;
try
{
// Long polling: tunggu hingga 3 detik untuk pesan
messages = consumer.ConsumeMessage(
3, // Jumlah maksimum pesan per batch (maksimal 16)
3 // Timeout long polling dalam detik (maksimal 30)
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name
+ " Tidak ada pesan baru, "
+ ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name
+ " Terima Pesan:");
// Proses pesan
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Properti a adalah:"
+ message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Akui pesan yang telah diproses
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Akui pesan berhasil:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
if (exp2 is AckMessageException)
{
AckMessageException ackExp =
(AckMessageException)exp2;
Console.WriteLine("Gagal akui pesan, RequestId:"
+ ackExp.RequestId);
foreach (AckMessageErrorItem errorItem
in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:"
+ errorItem.ReceiptHandle
+ ",ErrorCode:" + errorItem.ErrorCode
+ ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}Verifikasi hasil
Setelah menjalankan produsen dan konsumen, verifikasi pengiriman pesan:
Login ke Konsol ApsaraMQ for RocketMQ.
Temukan instans Anda dan buka halaman kueri pesan.
Cari berdasarkan topik, ID pesan, atau kunci pesan untuk memastikan pesan telah dikirim.
Periksa jejak pesan untuk memverifikasi bahwa konsumen menerima dan mengakui setiap pesan.
Untuk informasi selengkapnya, lihat Kueri pesan dan Kueri jejak pesan.