すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:HTTP クライアント SDK を使用した通常メッセージの送受信とサブスクライブ

最終更新日:Mar 12, 2026

ApsaraMQ for RocketMQ は、7 つのプログラミング言語に対応した HTTP クライアント SDK を提供しています。これらの SDK を使用して、トピックに通常メッセージを送信し、コンシューマーグループからメッセージを消費し、配信を承認できます。

重要

通常メッセージ用に作成されたトピックは、スケジュールされたメッセージ、遅延メッセージ、順序付きメッセージ、トランザクションメッセージなど、他のメッセージタイプには使用できません。メッセージタイプごとに個別のトピックを作成してください。

前提条件

開始する前に、次のものが揃っていることを確認してください:

  • ApsaraMQ for RocketMQ インスタンス、トピック (メッセージタイプ:通常)、およびグループ ID。詳細については、「リソースの作成」をご参照ください。

  • 認証用の AccessKey ペア (AccessKey ID と AccessKey Secret)。詳細については、「AccessKey ペアの作成」をご参照ください。

プレースホルダー

すべてのコード例で、次のプレースホルダーを実際の値に置き換えてください:

プレースホルダー説明
<your-http-endpoint>ご利用のインスタンスの HTTP エンドポイントhttp://1234567890123456.mqrest.cn-hangzhou.aliyuncs.com
<your-access-key-id>AccessKey IDLTAI5tXxx
<your-access-key-secret>AccessKey SecretxXxXxXx
<your-topic>トピック名normal-topic-http
<your-instance-id>インスタンス IDMQ_INST_1380xxx_BbXbx0Y4
<your-group-id>グループ ID (コンシューマー ID)GID_http_test

ステップ 1: SDK のインストール

ご利用のプログラミング言語に対応する SDK を選択し、インストールします。

Java

詳細については、「Java SDK の説明」および「リリースノート」をご参照ください。

Go

go get github.com/aliyunmq/mq-http-go-sdk

詳細については、「Go SDK の説明」および「リリースノート」をご参照ください。

Python

pip install mq_http_sdk

詳細については、「Python SDK の説明」と「リリースノート」をご参照ください。

PHP

composer require aliyunmq/mq-http-sdk

詳細については、「PHP SDK の説明」と「リリースノート」をご参照ください。

Node.js

npm install @aliyunmq/mq-http-sdk --save

詳細については、「Node.js SDK の説明」と「リリースノート」をご参照ください。

C#

詳細については、「C# SDK の説明」および「リリースノート」をご参照ください。

C++

SDK ソースをダウンロードし、CMake でビルドします。詳細については、「C++ SDK の説明」と「リリースノート」をご参照ください。

ステップ 2: 通常メッセージの送信

各プロデューサーの例は、次のパターンに従います:

  1. ご利用の HTTP エンドポイントと AccessKey ペアで MQClient を作成します。

  2. ご利用のインスタンスとトピックのプロデューサーを取得します。

  3. ループ内でメッセージをパブリッシュします。オプションで遅延配信も可能です。

publishMessage の呼び出しは同期的です。例外がスローされなければ、メッセージは送信されています。

Java

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                "<your-http-endpoint>",
                "<your-access-key-id>",
                "<your-access-key-secret>"
        );

        final String topic = "<your-topic>";
        final String instanceId = "<your-instance-id>";

        // 指定されたインスタンスとトピックのプロデューサーを取得します
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // タグ、プロパティ、メッセージキーを持つ通常メッセージを送信します
                    pubMsg = new TopicMessage(
                            "hello mq!".getBytes(),
                            "A"  // メッセージタグ
                    );
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    // 遅延メッセージを送信します (10 秒後に配信)
                    pubMsg = new TopicMessage(
                            "hello mq!".getBytes(),
                            "A"
                    );
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // publishMessage は同期的です。例外が発生しなければ成功です。
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                System.out.println(new Date() + " Send mq message success. Topic is:" + topic
                        + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // 送信失敗の処理:メッセージをリトライまたは永続化します
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}

Go

package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    endpoint := "<your-http-endpoint>"
    accessKey := "<your-access-key-id>"
    secretKey := "<your-access-key-secret>"
    topic := "<your-topic>"
    instanceId := "<your-instance-id>"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    mqProducer := client.GetProducer(instanceId, topic)

    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq!",
                MessageTag:  "",
                Properties:  map[string]string{},
            }
            msg.MessageKey = "MessageKey"
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            // 遅延メッセージ:10 秒後に配信されます
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer!",
                MessageTag:  "",
                Properties:  map[string]string{},
            }
            msg.Properties["a"] = strconv.Itoa(i)
            // StartDeliverTime はミリ秒単位の UNIX タイムスタンプです
            msg.StartDeliverTime = time.Now().UTC().Unix()*1000 + 10*1000
        }
        ret, err := mqProducer.PublishMessage(msg)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n",
            ret.MessageId, ret.MessageBodyMD5)
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}

Python

#!/usr/bin/env python
# coding=utf8
import sys
import time

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *

# クライアントを初期化します
mq_client = MQClient(
    "<your-http-endpoint>",
    "<your-access-key-id>",
    "<your-access-key-secret>"
)
topic_name = "<your-topic>"
instance_id = "<your-instance-id>"

producer = mq_client.get_producer(instance_id, topic_name)

# 4 つのメッセージを送信します
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n"
      % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            # タグ、プロパティ、メッセージキーを持つ通常メッセージ
            msg = TopicMessage(
                "I am test message %s. Hello" % i,
                ""  # メッセージタグ
            )
            msg.put_property("a", "i")
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s"
                  % (re_msg.message_id, re_msg.message_body_md5))
        else:
            # 遅延メッセージ:5 秒後に配信されます
            msg = TopicMessage(
                "I am test message %s." % i,
                ""
            )
            msg.put_property("a", i)
            # ミリ秒単位の絶対時間
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s"
                  % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
        );

        $topic = "<your-topic>";
        $instanceId = "<your-instance-id>";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx" // メッセージ本文
                );
                $publishMessage->putProperty("a", $i);
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // 遅延メッセージ:10 秒後に配信されます
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId()
                    . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}

$instance = new ProducerTest();
$instance->run();

?>

Node.js

const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

const topic = "<your-topic>";
const instanceId = "<your-instance-id>";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        // プロパティとメッセージキーを持つ通常メッセージ
        msgProps = new MessageProperties();
        msgProps.putProperty("a", i);
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        // 遅延メッセージ:10 秒後に配信されます
        msgProps = new MessageProperties();
        msgProps.putProperty("a", i);
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s",
          res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // 送信失敗の処理:メッセージをリトライまたは永続化します
    console.log(e)
  }
})();

C++

//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;

int main() {

    MQClient mqClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
            );

    string topic = "<your-topic>";
    string instanceId = "<your-instance-id>";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // 本文のみのメッセージ
                producer->publishMessage("Hello, mq!", pmResp);
            } else if (i % 4 == 1) {
                // 本文とタグを持つメッセージ
                producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
            } else if (i % 4 == 2) {
                // 本文、タグ、プロパティ、キーを持つメッセージ
                TopicMessage pubMsg("Hello, mq!have key!");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // 遅延メッセージ:10 秒後に配信されます
                // StartDeliverTime はミリ秒単位の絶対時間です
                TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Publish mq message success. Topic is: " << topic
                << ", msgId is:" << pmResp.getMessageId()
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode()
            << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}

C#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        private const string _endpoint = "<your-http-endpoint>";
        private const string _accessKeyId = "<your-access-key-id>";
        private const string _secretAccessKey = "<your-access-key-secret>";
        private const string _topicName = "<your-topic>";
        private const string _instanceId = "<your-instance-id>";

        private static MQClient _client = new Aliyun.MQ.MQClient(
            _accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        // プロパティとメッセージキーを持つ通常メッセージ
                        sendMsg = new TopicMessage("dfadfadfadf");
                        sendMsg.PutProperty("a", i.ToString());
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        // 遅延メッセージ:10 秒後に配信されます
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        sendMsg.PutProperty("a", i.ToString());
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp()
                            + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}
説明

コンソールからメッセージを送信するには、ApsaraMQ for RocketMQ コンソールにログインし、ご利用のインスタンスを見つけ、[操作] 列で [その他] > [クイックスタート] を選択します。

ステップ 3: 通常メッセージの消費

メッセージが送信された後、コンシューマーを起動してメッセージを受信し、処理します。各コンシューマーの例は、次のパターンに従います:

  1. ご利用の HTTP エンドポイントと AccessKey ペアで MQClient を作成します。

  2. ご利用のインスタンス、トピック、グループ ID のコンシューマーを取得します。

  3. ロングポーリングを使用してループ内でメッセージをポーリングします。

  4. 各メッセージを処理した後、受信ハンドルをブローカーに送信して承認します。

ロングポーリングは、指定された期間 (最大 30 秒) 接続を開いたままにします。そのウィンドウ内にメッセージが到着した場合、ブローカーはタイムアウトを待たずに即座に応答します。

重要

ブローカーがメッセージの NextConsumeTime までに確認応答 (ACK) を受信しない場合、メッセージは再配信されます。配信ごとに新しい受信ハンドルが生成されます。

パラメーター説明制限
バッチサイズリクエストあたりの最大メッセージ数最大 16
待機秒数ロングポーリングのタイムアウト最大 30 秒

Java

 import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;

 import java.util.ArrayList;
 import java.util.List;

 public class Consumer {

     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 "<your-http-endpoint>",
                 "<your-access-key-id>",
                 "<your-access-key-secret>"
         );

         final String topic = "<your-topic>";
         final String groupId = "<your-group-id>";
         final String instanceId = "<your-instance-id>";

         final MQConsumer consumer;
         if (instanceId != null && instanceId != "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }

         // 本番環境では、複数のスレッドを使用して同時消費を行います
         do {
             List<Message> messages = null;

             try {
                 // ロングポーリング:最大 3 秒間メッセージを待機します
                 messages = consumer.consumeMessage(
                         3, // バッチあたりの最大メッセージ数 (最大 16)
                         3  // ロングポーリングのタイムアウト (秒単位、最大 30)
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName()
                         + ": no new message, continue!");
                 continue;
             }

             // メッセージを処理します
             for (Message message : messages) {
                 System.out.println("Receive message: " + message);
             }

             // 再配信を防ぐためにメッセージを承認します
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }

                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Ack message fail, requestId is:"
                                 + errors.getRequestId() + ", fail handles:");
                         if (errors.getErrorMessages() != null) {
                             for (String errorHandle :
                                     errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle
                                     + ", ErrorCode:" + errors.getErrorMessages()
                                         .get(errorHandle).getErrorCode()
                                     + ", ErrorMsg:" + errors.getErrorMessages()
                                         .get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
}

Go

package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strings"
    "time"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    endpoint := "<your-http-endpoint>"
    accessKey := "<your-access-key-id>"
    secretKey := "<your-access-key-secret>"
    topic := "<your-topic>"
    instanceId := "<your-instance-id>"
    groupId := "<your-group-id>"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    var handles []string
                    fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
                            v.Properties)
                    }

                    // 処理済みのメッセージを承認します
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        fmt.Println(ackerr)
                        for _, errAckItem := range ackerr.(errors.ErrCode).
                                Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
                            fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                                errAckItem.ErrorHandle, errAckItem.ErrorCode,
                                errAckItem.ErrorMsg)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    if strings.Contains(err.(errors.ErrCode).Error(),
                            "MessageNotExist") {
                        fmt.Println("\nNo new message, continue!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout of consumer message ??")
                    endChan <- 1
                }
            }
        }()

        // ロングポーリング:最大 3 秒間メッセージを待機します
        mqConsumer.ConsumeMessage(respChan, errChan,
            3, // バッチあたりの最大メッセージ数 (最大 16)
            3, // ロングポーリングのタイムアウト (秒単位、最大 30)
        )
        <-endChan
    }
}

Python

#!/usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# クライアントを初期化します
mq_client = MQClient(
    "<your-http-endpoint>",
    "<your-access-key-id>",
    "<your-access-key-secret>"
)
topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# ロングポーリング:最大 3 秒間メッセージを待機します
wait_seconds = 3
# バッチあたりの最大メッセージ数 (最大 16)
batch = 3

print("%sConsume And Ack Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n"
      % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds))

while True:
    try:
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print("Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s"
                             % (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle))
    except MQExceptionBase as e:
        if e.type == "MessageNotExist":
            print("No new message! RequestId: %s" % e.req_id)
            continue

        print("Consume Message Fail! Exception:%s\n" % e)
        time.sleep(2)
        continue

    # 再配信を防ぐために処理済みのメッセージを承認します
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print("Ack %s Message Succeed.\n\n" % len(receipt_handle_list))
    except MQExceptionBase as e:
        print("\nAck Message Fail! Exception:%s" % e)
        if e.sub_errors:
            for sub_error in e.sub_errors:
                print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s"
                      % (sub_error["ReceiptHandle"],
                         sub_error["ErrorCode"],
                         sub_error["ErrorMessage"]))

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
        );

        $topic = "<your-topic>";
        $groupId = "<your-group-id>";
        $instanceId = "<your-instance-id>";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // 本番環境では、複数のスレッドを使用して同時消費を行います
        while (True) {
            try {
                // ロングポーリング:最大 3 秒間メッセージを待機します
                $messages = $this->consumer->consumeMessage(
                    3, // バッチあたりの最大メッセージ数 (最大 16)
                    3  // ロングポーリングのタイムアウト (秒単位、最大 30)
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    printf("No message, continue long polling!RequestId:%s\n",
                        $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "consume finish, messages:\n";

            // メッセージを処理します
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, "
                    . "FirstConsumeTime:%d, \nConsumedTimes:%d, "
                    . "NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(),
                    $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(),
                    $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // 処理済みのメッセージを承認します
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                            $errorItem->getReceiptHandle(),
                            $errorItem->getErrorCode(),
                            $errorItem->getErrorCode());
                    }
                }
            }
            print "ack finish\n";
        }

    }
}

$instance = new ConsumerTest();
$instance->run();

?>

Node.js

const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

const topic = "<your-topic>";
const groupId = "<your-group-id>";
const instanceId = "<your-instance-id>";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  while(true) {
    try {
      // ロングポーリング:最大 3 秒間メッセージを待機します
      res = await consumer.consumeMessage(
          3, // バッチあたりの最大メッセージ数 (最大 16)
          3  // ロングポーリングのタイムアウト (秒単位、最大 30)
          );

      if (res.code == 200) {
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,"
            + "FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s"
            + ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime,
              message.NextConsumeTime, message.FirstConsumeTime,
              message.ConsumedTimes,
              message.MessageBody, message.Properties, message.MessageKey,
              message.Properties.a);
          return message.ReceiptHandle;
        });

        // 処理済みのメッセージを承認します
        res = await consumer.ackMessage(handles);
        if (res.code != 204) {
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n",
                error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          console.log("Ack Message suc, RequestId:%s\n\t",
              res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        console.log("Consume Message: no new message, RequestId:%s, Code:%s",
            e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();

C++

#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;

int main() {

    MQClient mqClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
            );

    string topic = "<your-topic>";
    string groupId = "<your-group-id>";
    string instanceId = "<your-instance-id>";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // ロングポーリング:最大 3 秒間メッセージを待機します
            consumer->consumeMessage(
                    3, // バッチあたりの最大メッセージ数 (最大 16)
                    3, // ロングポーリングのタイムアウト (秒単位、最大 30)
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages!" << endl;

            // メッセージを処理します
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes()
                    << " Properties: " << iter->getPropertiesAsString()
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // 処理済みのメッセージを承認します
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter =
                        failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: "
                    + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode()
                + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}

C#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
    public class ConsumerSample
    {
        private const string _endpoint = "<your-http-endpoint>";
        private const string _accessKeyId = "<your-access-key-id>";
        private const string _secretAccessKey = "<your-access-key-secret>";
        private const string _topicName = "<your-topic>";
        private const string _instanceId = "<your-instance-id>";
        private const string _groupId = "<your-group-id>";

        private static MQClient _client = new Aliyun.MQ.MQClient(
            _accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(
            _instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // 本番環境では、複数のスレッドを使用して同時消費を行います
            while (true)
            {
                try
                {
                    List<Message> messages = null;

                    try
                    {
                        // ロングポーリング:最大 3 秒間メッセージを待機します
                        messages = consumer.ConsumeMessage(
                            3, // バッチあたりの最大メッセージ数 (最大 16)
                            3  // ロングポーリングのタイムアウト (秒単位、最大 30)
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name
                                + " No new message, "
                                + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name
                        + " Receive Messages:");
                    // メッセージを処理します
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:"
                            + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }

                    // 処理済みのメッセージを承認します
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp =
                                (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:"
                                + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem
                                    in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:"
                                    + errorItem.ReceiptHandle
                                    + ",ErrorCode:" + errorItem.ErrorCode
                                    + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}

結果の確認

プロデューサーとコンシューマーを実行した後、メッセージの配信を確認します:

  1. ApsaraMQ for RocketMQ コンソールにログインします。

  2. ご利用のインスタンスを見つけ、メッセージクエリページに移動します。

  3. トピック、メッセージ ID、またはメッセージキーで検索し、メッセージが送信されたことを確認します。

  4. メッセージトレースをチェックして、コンシューマーが各メッセージを受信し、承認したことを確認します。

詳細については、「メッセージのクエリ」および「メッセージトレースのクエリ」をご参照ください。