All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim dan berlangganan pesan normal dengan SDK klien HTTP

Last Updated:Mar 12, 2026

ApsaraMQ for RocketMQ menyediakan SDK klien HTTP untuk tujuh bahasa pemrograman. Gunakan SDK ini untuk mengirim pesan normal ke topik, mengonsumsinya dari kelompok konsumen, dan mengakui pengiriman tersebut.

Penting

Topik yang dibuat untuk pesan normal tidak dapat digunakan untuk jenis pesan lain seperti pesan terjadwal, pesan tertunda, pesan terurut, atau pesan transaksional. Buat topik terpisah untuk setiap jenis pesan.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Instans ApsaraMQ for RocketMQ, topik (jenis pesan: normal), dan ID grup. Untuk informasi selengkapnya, lihat Buat sumber daya

  • Pasangan AccessKey (ID AccessKey dan Rahasia AccessKey) untuk autentikasi. Untuk informasi selengkapnya, lihat Buat pasangan AccessKey

Placeholder

Ganti placeholder berikut dalam semua contoh kode dengan nilai aktual Anda:

PlaceholderDeskripsiContoh
<your-http-endpoint>Titik akhir HTTP instans Andahttp://1234567890123456.mqrest.cn-hangzhou.aliyuncs.com
<your-access-key-id>ID AccessKeyLTAI5tXxx
<your-access-key-secret>Rahasia AccessKeyxXxXxXx
<your-topic>Nama topiknormal-topic-http
<your-instance-id>ID instansMQ_INST_1380xxx_BbXbx0Y4
<your-group-id>ID grup (ID konsumen)GID_http_test

Langkah 1: Instal SDK

Pilih SDK sesuai bahasa pemrograman Anda dan instal SDK tersebut.

Java

Untuk informasi selengkapnya, lihat deskripsi SDK Java dan catatan rilis.

Go

go get github.com/aliyunmq/mq-http-go-sdk

Untuk informasi selengkapnya, lihat deskripsi SDK Go dan catatan rilis.

Python

pip install mq_http_sdk

Untuk informasi selengkapnya, lihat deskripsi SDK Python dan catatan rilis.

PHP

composer require aliyunmq/mq-http-sdk

Untuk informasi selengkapnya, lihat deskripsi SDK PHP dan catatan rilis.

Node.js

npm install @aliyunmq/mq-http-sdk --save

Untuk informasi selengkapnya, lihat deskripsi SDK Node.js dan catatan rilis.

C\#

Untuk informasi selengkapnya, lihat deskripsi SDK C# dan catatan rilis.

C++

Unduh kode sumber SDK dan build menggunakan CMake. Untuk informasi selengkapnya, lihat deskripsi SDK C++ dan catatan rilis.

Langkah 2: Kirim pesan normal

Setiap contoh produsen mengikuti pola berikut:

  1. Buat MQClient dengan titik akhir HTTP dan pasangan AccessKey Anda.

  2. Dapatkan produsen untuk instans dan topik Anda.

  3. Publikasikan pesan dalam loop, dengan pengiriman tertunda opsional.

Pemanggilan publishMessage bersifat sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan berhasil dikirim.

Java

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                "<your-http-endpoint>",
                "<your-access-key-id>",
                "<your-access-key-secret>"
        );

        final String topic = "<your-topic>";
        final String instanceId = "<your-instance-id>";

        // Dapatkan produsen untuk instans dan topik yang ditentukan
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // Kirim pesan normal dengan tag, properti, dan kunci pesan
                    pubMsg = new TopicMessage(
                            "hello mq!".getBytes(),
                            "A"  // Tag pesan
                    );
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    // Kirim pesan tertunda (dikirimkan 10 detik kemudian)
                    pubMsg = new TopicMessage(
                            "hello mq!".getBytes(),
                            "A"
                    );
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // publishMessage bersifat sinkron. Tidak ada pengecualian berarti sukses.
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                System.out.println(new Date() + " Kirim pesan mq berhasil. Topik:" + topic
                        + ", msgId:" + pubResultMsg.getMessageId()
                        + ", bodyMD5:" + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // Tangani kegagalan pengiriman: coba ulang atau simpan pesan
            System.out.println(new Date() + " Kirim pesan mq gagal. Topik:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}

Go

package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    endpoint := "<your-http-endpoint>"
    accessKey := "<your-access-key-id>"
    secretKey := "<your-access-key-secret>"
    topic := "<your-topic>"
    instanceId := "<your-instance-id>"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    mqProducer := client.GetProducer(instanceId, topic)

    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq!",
                MessageTag:  "",
                Properties:  map[string]string{},
            }
            msg.MessageKey = "MessageKey"
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            // Pesan tertunda: dikirimkan 10 detik kemudian
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer!",
                MessageTag:  "",
                Properties:  map[string]string{},
            }
            msg.Properties["a"] = strconv.Itoa(i)
            // StartDeliverTime adalah stempel waktu UNIX dalam milidetik
            msg.StartDeliverTime = time.Now().UTC().Unix()*1000 + 10*1000
        }
        ret, err := mqProducer.PublishMessage(msg)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n",
            ret.MessageId, ret.MessageBodyMD5)
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}

Python

#!/usr/bin/env python
# coding=utf8
import sys
import time

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *

# Inisialisasi klien
mq_client = MQClient(
    "<your-http-endpoint>",
    "<your-access-key-id>",
    "<your-access-key-secret>"
)
topic_name = "<your-topic>"
instance_id = "<your-instance-id>"

producer = mq_client.get_producer(instance_id, topic_name)

# Kirim 4 pesan
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n"
      % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            # Pesan normal dengan tag, properti, dan kunci pesan
            msg = TopicMessage(
                "I am test message %s. Hello" % i,
                ""  # Tag pesan
            )
            msg.put_property("a", "i")
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s"
                  % (re_msg.message_id, re_msg.message_body_md5))
        else:
            # Pesan tertunda: dikirimkan 5 detik kemudian
            msg = TopicMessage(
                "I am test message %s." % i,
                ""
            )
            msg.put_property("a", i)
            # Waktu absolut dalam milidetik
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s"
                  % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topik tidak ada, harap buat terlebih dahulu.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
        );

        $topic = "<your-topic>";
        $instanceId = "<your-instance-id>";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx" // Isi pesan
                );
                $publishMessage->putProperty("a", $i);
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // Pesan tertunda: dikirimkan 10 detik kemudian
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Kirim pesan mq berhasil. msgId:" . $result->getMessageId()
                    . ", bodyMD5:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}

$instance = new ProducerTest();
$instance->run();

?>

Node.js

const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

const topic = "<your-topic>";
const instanceId = "<your-instance-id>";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        // Pesan normal dengan properti dan kunci pesan
        msgProps = new MessageProperties();
        msgProps.putProperty("a", i);
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        // Pesan tertunda: dikirimkan 10 detik kemudian
        msgProps = new MessageProperties();
        msgProps.putProperty("a", i);
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s",
          res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // Tangani kegagalan pengiriman: coba ulang atau simpan pesan
    console.log(e)
  }
})();

C++

//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;

int main() {

    MQClient mqClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
            );

    string topic = "<your-topic>";
    string instanceId = "<your-instance-id>";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // Pesan hanya dengan isi
                producer->publishMessage("Hello, mq!", pmResp);
            } else if (i % 4 == 1) {
                // Pesan dengan isi dan tag
                producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
            } else if (i % 4 == 2) {
                // Pesan dengan isi, tag, properti, dan kunci
                TopicMessage pubMsg("Hello, mq!have key!");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // Pesan tertunda: dikirimkan 10 detik kemudian
                // StartDeliverTime adalah waktu absolut dalam milidetik
                TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Kirim pesan mq berhasil. Topik:" << topic
                << ", msgId:" << pmResp.getMessageId()
                << ", bodyMD5:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Permintaan Gagal: " + me.GetErrorCode()
            << ", requestId:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Permintaan Gagal: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}

C\#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        private const string _endpoint = "<your-http-endpoint>";
        private const string _accessKeyId = "<your-access-key-id>";
        private const string _secretAccessKey = "<your-access-key-secret>";
        private const string _topicName = "<your-topic>";
        private const string _instanceId = "<your-instance-id>";

        private static MQClient _client = new Aliyun.MQ.MQClient(
            _accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        // Pesan normal dengan properti dan kunci pesan
                        sendMsg = new TopicMessage("dfadfadfadf");
                        sendMsg.PutProperty("a", i.ToString());
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        // Pesan tertunda: dikirimkan 10 detik kemudian
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        sendMsg.PutProperty("a", i.ToString());
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp()
                            + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}
Catatan

Untuk mengirim pesan dari konsol, login ke Konsol ApsaraMQ for RocketMQ, temukan instans Anda, lalu pilih More > Quick Start di kolom Actions.

Langkah 3: Konsumsi pesan normal

Setelah pesan dikirim, mulai konsumen untuk menerima dan memprosesnya. Setiap contoh konsumen mengikuti pola berikut:

  1. Buat MQClient dengan titik akhir HTTP dan pasangan AccessKey Anda.

  2. Dapatkan konsumen untuk instans, topik, dan ID grup Anda.

  3. Ambil pesan dalam loop menggunakan long polling.

  4. Proses setiap pesan, lalu akui dengan mengirimkan receipt handle kembali ke broker.

Long polling menjaga koneksi tetap terbuka selama durasi tertentu (maksimal 30 detik). Jika pesan tiba selama periode tersebut, broker akan segera merespons tanpa menunggu timeout.

Penting

Jika broker tidak menerima acknowledgment (ACK) sebelum NextConsumeTime untuk suatu pesan, pesan tersebut akan dikirimkan kembali. Setiap pengiriman menghasilkan receipt handle baru.

ParameterDeskripsiBatas
Ukuran batchJumlah maksimum pesan per permintaanMaksimal 16
Detik tungguTimeout long pollingMaksimal 30 detik

Java

 import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;

 import java.util.ArrayList;
 import java.util.List;

 public class Consumer {

     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 "<your-http-endpoint>",
                 "<your-access-key-id>",
                 "<your-access-key-secret>"
         );

         final String topic = "<your-topic>";
         final String groupId = "<your-group-id>";
         final String instanceId = "<your-instance-id>";

         final MQConsumer consumer;
         if (instanceId != null && instanceId != "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }

         // Gunakan beberapa thread untuk konsumsi konkuren di lingkungan produksi
         do {
             List<Message> messages = null;

             try {
                 // Long polling: tunggu hingga 3 detik untuk pesan
                 messages = consumer.consumeMessage(
                         3, // Jumlah maksimum pesan per batch (maksimal 16)
                         3  // Timeout long polling dalam detik (maksimal 30)
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName()
                         + ": tidak ada pesan baru, lanjutkan!");
                 continue;
             }

             // Proses pesan
             for (Message message : messages) {
                 System.out.println("Terima pesan: " + message);
             }

             // Akui pesan untuk mencegah pengiriman ulang
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }

                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Gagal mengakui pesan, requestId:"
                                 + errors.getRequestId() + ", handle gagal:");
                         if (errors.getErrorMessages() != null) {
                             for (String errorHandle :
                                     errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle
                                     + ", KodeError:" + errors.getErrorMessages()
                                         .get(errorHandle).getErrorCode()
                                     + ", PesanError:" + errors.getErrorMessages()
                                         .get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
}

Go

package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strings"
    "time"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    endpoint := "<your-http-endpoint>"
    accessKey := "<your-access-key-id>"
    secretKey := "<your-access-key-secret>"
    topic := "<your-topic>"
    instanceId := "<your-instance-id>"
    groupId := "<your-group-id>"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    var handles []string
                    fmt.Printf("Konsumsi %d pesan---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
                            v.Properties)
                    }

                    // Akui pesan yang telah diproses
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        fmt.Println(ackerr)
                        for _, errAckItem := range ackerr.(errors.ErrCode).
                                Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
                            fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                                errAckItem.ErrorHandle, errAckItem.ErrorCode,
                                errAckItem.ErrorMsg)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    if strings.Contains(err.(errors.ErrCode).Error(),
                            "MessageNotExist") {
                        fmt.Println("\nTidak ada pesan baru, lanjutkan!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout konsumsi pesan ??")
                    endChan <- 1
                }
            }
        }()

        // Long polling: tunggu hingga 3 detik untuk pesan
        mqConsumer.ConsumeMessage(respChan, errChan,
            3, // Jumlah maksimum pesan per batch (maksimal 16)
            3, // Timeout long polling dalam detik (maksimal 30)
        )
        <-endChan
    }
}

Python

#!/usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Inisialisasi klien
mq_client = MQClient(
    "<your-http-endpoint>",
    "<your-access-key-id>",
    "<your-access-key-secret>"
)
topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# Long polling: tunggu hingga 3 detik untuk pesan
wait_seconds = 3
# Jumlah maksimum pesan per batch (maksimal 16)
batch = 3

print("%sKonsumsi Dan Akui Pesan Dari Topik%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n"
      % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds))

while True:
    try:
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print("Terima, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s"
                             % (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle))
    except MQExceptionBase as e:
        if e.type == "MessageNotExist":
            print("Tidak ada pesan baru! RequestId: %s" % e.req_id)
            continue

        print("Gagal Konsumsi Pesan! Exception:%s\n" % e)
        time.sleep(2)
        continue

    // Akui pesan yang telah diproses untuk mencegah pengiriman ulang
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print("Akui %s Pesan Berhasil.\n\n" % len(receipt_handle_list))
    except MQExceptionBase as e:
        print("\nGagal Akui Pesan! Exception:%s" % e)
        if e.sub_errors:
            for sub_error in e.sub_errors:
                print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s"
                      % (sub_error["ReceiptHandle"],
                         sub_error["ErrorCode"],
                         sub_error["ErrorMessage"]))

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
        );

        $topic = "<your-topic>";
        $groupId = "<your-group-id>";
        $instanceId = "<your-instance-id>";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // Gunakan beberapa thread untuk konsumsi konkuren di lingkungan produksi
        while (True) {
            try {
                // Long polling: tunggu hingga 3 detik untuk pesan
                $messages = $this->consumer->consumeMessage(
                    3, // Jumlah maksimum pesan per batch (maksimal 16)
                    3  // Timeout long polling dalam detik (maksimal 30)
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    printf("Tidak ada pesan, lanjutkan long polling!RequestId:%s\n",
                        $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "konsumsi selesai, pesan:\n";

            // Proses pesan
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, "
                    . "FirstConsumeTime:%d, \nConsumedTimes:%d, "
                    . "NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(),
                    $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(),
                    $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // Akui pesan yang telah diproses
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    printf("Kesalahan Ack, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                            $errorItem->getReceiptHandle(),
                            $errorItem->getErrorCode(),
                            $errorItem->getErrorCode());
                    }
                }
            }
            print "ack selesai\n";
        }

    }
}

$instance = new ConsumerTest();
$instance->run();

?>

Node.js

const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

const topic = "<your-topic>";
const groupId = "<your-group-id>";
const instanceId = "<your-instance-id>";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  while(true) {
    try {
      // Long polling: tunggu hingga 3 detik untuk pesan
      res = await consumer.consumeMessage(
          3, // Jumlah maksimum pesan per batch (maksimal 16)
          3  // Timeout long polling dalam detik (maksimal 30)
          );

      if (res.code == 200) {
        console.log("Konsumsi Pesan, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,"
            + "FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s"
            + ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime,
              message.NextConsumeTime, message.FirstConsumeTime,
              message.ConsumedTimes,
              message.MessageBody, message.Properties, message.MessageKey,
              message.Properties.a);
          return message.ReceiptHandle;
        });

        // Akui pesan yang telah diproses
        res = await consumer.ackMessage(handles);
        if (res.code != 204) {
          console.log("Gagal Akui Pesan:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n",
                error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          console.log("Akui Pesan berhasil, RequestId:%s\n\t",
              res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        console.log("Konsumsi Pesan: tidak ada pesan baru, RequestId:%s, Code:%s",
            e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();

C++

#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;

int main() {

    MQClient mqClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
            );

    string topic = "<your-topic>";
    string groupId = "<your-group-id>";
    string instanceId = "<your-instance-id>";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // Long polling: tunggu hingga 3 detik untuk pesan
            consumer->consumeMessage(
                    3, // Jumlah maksimum pesan per batch (maksimal 16)
                    3, // Timeout long polling dalam detik (maksimal 30)
                    messages
            );
            cout << "Konsumsi: " << messages.size() << " Pesan!" << endl;

            // Proses pesan
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes()
                    << " Properties: " << iter->getPropertiesAsString()
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // Akui pesan yang telah diproses
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter =
                        failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " pesan berhasil!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "Tidak ada pesan untuk dikonsumsi! RequestId: "
                    + me.GetRequestId() << endl;
                continue;
            }
            cout << "Permintaan Gagal: " + me.GetErrorCode()
                + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Permintaan Gagal: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}

C\#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
    public class ConsumerSample
    {
        private const string _endpoint = "<your-http-endpoint>";
        private const string _accessKeyId = "<your-access-key-id>";
        private const string _secretAccessKey = "<your-access-key-secret>";
        private const string _topicName = "<your-topic>";
        private const string _instanceId = "<your-instance-id>";
        private const string _groupId = "<your-group-id>";

        private static MQClient _client = new Aliyun.MQ.MQClient(
            _accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(
            _instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Gunakan beberapa thread untuk konsumsi konkuren di lingkungan produksi
            while (true)
            {
                try
                {
                    List<Message> messages = null;

                    try
                    {
                        // Long polling: tunggu hingga 3 detik untuk pesan
                        messages = consumer.ConsumeMessage(
                            3, // Jumlah maksimum pesan per batch (maksimal 16)
                            3  // Timeout long polling dalam detik (maksimal 30)
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name
                                + " Tidak ada pesan baru, "
                                + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name
                        + " Terima Pesan:");
                    // Proses pesan
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Properti a adalah:"
                            + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }

                    // Akui pesan yang telah diproses
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Akui pesan berhasil:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp =
                                (AckMessageException)exp2;
                            Console.WriteLine("Gagal akui pesan, RequestId:"
                                + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem
                                    in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:"
                                    + errorItem.ReceiptHandle
                                    + ",ErrorCode:" + errorItem.ErrorCode
                                    + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}

Verifikasi hasil

Setelah menjalankan produsen dan konsumen, verifikasi pengiriman pesan:

  1. Login ke Konsol ApsaraMQ for RocketMQ.

  2. Temukan instans Anda dan buka halaman kueri pesan.

  3. Cari berdasarkan topik, ID pesan, atau kunci pesan untuk memastikan pesan telah dikirim.

  4. Periksa jejak pesan untuk memverifikasi bahwa konsumen menerima dan mengakui setiap pesan.

Untuk informasi selengkapnya, lihat Kueri pesan dan Kueri jejak pesan.