ApsaraMQ for RocketMQ は、7 つのプログラミング言語に対応した HTTP クライアント SDK を提供しています。これらの SDK を使用して、トピックに通常メッセージを送信し、コンシューマーグループからメッセージを消費し、配信を承認できます。
通常メッセージ用に作成されたトピックは、スケジュールされたメッセージ、遅延メッセージ、順序付きメッセージ、トランザクションメッセージなど、他のメッセージタイプには使用できません。メッセージタイプごとに個別のトピックを作成してください。
前提条件
開始する前に、次のものが揃っていることを確認してください:
ApsaraMQ for RocketMQ インスタンス、トピック (メッセージタイプ:通常)、およびグループ ID。詳細については、「リソースの作成」をご参照ください。
認証用の AccessKey ペア (AccessKey ID と AccessKey Secret)。詳細については、「AccessKey ペアの作成」をご参照ください。
プレースホルダー
すべてのコード例で、次のプレースホルダーを実際の値に置き換えてください:
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-http-endpoint> | ご利用のインスタンスの HTTP エンドポイント | http://1234567890123456.mqrest.cn-hangzhou.aliyuncs.com |
<your-access-key-id> | AccessKey ID | LTAI5tXxx |
<your-access-key-secret> | AccessKey Secret | xXxXxXx |
<your-topic> | トピック名 | normal-topic-http |
<your-instance-id> | インスタンス ID | MQ_INST_1380xxx_BbXbx0Y4 |
<your-group-id> | グループ ID (コンシューマー ID) | GID_http_test |
ステップ 1: SDK のインストール
ご利用のプログラミング言語に対応する SDK を選択し、インストールします。
Java SDK
PHP SDK
Go SDK
Python SDK
Node.js SDK
C# SDK
C++ SDK
Java
詳細については、「Java SDK の説明」および「リリースノート」をご参照ください。
Go
go get github.com/aliyunmq/mq-http-go-sdk詳細については、「Go SDK の説明」および「リリースノート」をご参照ください。
Python
pip install mq_http_sdk詳細については、「Python SDK の説明」と「リリースノート」をご参照ください。
PHP
composer require aliyunmq/mq-http-sdk詳細については、「PHP SDK の説明」と「リリースノート」をご参照ください。
Node.js
npm install @aliyunmq/mq-http-sdk --save詳細については、「Node.js SDK の説明」と「リリースノート」をご参照ください。
C#
詳細については、「C# SDK の説明」および「リリースノート」をご参照ください。
C++
SDK ソースをダウンロードし、CMake でビルドします。詳細については、「C++ SDK の説明」と「リリースノート」をご参照ください。
ステップ 2: 通常メッセージの送信
各プロデューサーの例は、次のパターンに従います:
ご利用の HTTP エンドポイントと AccessKey ペアで
MQClientを作成します。ご利用のインスタンスとトピックのプロデューサーを取得します。
ループ内でメッセージをパブリッシュします。オプションで遅延配信も可能です。
publishMessage の呼び出しは同期的です。例外がスローされなければ、メッセージは送信されています。
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class Producer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
final String topic = "<your-topic>";
final String instanceId = "<your-instance-id>";
// 指定されたインスタンスとトピックのプロデューサーを取得します
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// タグ、プロパティ、メッセージキーを持つ通常メッセージを送信します
pubMsg = new TopicMessage(
"hello mq!".getBytes(),
"A" // メッセージタグ
);
pubMsg.getProperties().put("a", String.valueOf(i));
pubMsg.setMessageKey("MessageKey");
} else {
// 遅延メッセージを送信します (10 秒後に配信)
pubMsg = new TopicMessage(
"hello mq!".getBytes(),
"A"
);
pubMsg.getProperties().put("a", String.valueOf(i));
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// publishMessage は同期的です。例外が発生しなければ成功です。
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
System.out.println(new Date() + " Send mq message success. Topic is:" + topic
+ ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 送信失敗の処理:メッセージをリトライまたは永続化します
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}Go
package main
import (
"fmt"
"time"
"strconv"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
endpoint := "<your-http-endpoint>"
accessKey := "<your-access-key-id>"
secretKey := "<your-access-key-secret>"
topic := "<your-topic>"
instanceId := "<your-instance-id>"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
if i%2 == 0 {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!",
MessageTag: "",
Properties: map[string]string{},
}
msg.MessageKey = "MessageKey"
msg.Properties["a"] = strconv.Itoa(i)
} else {
// 遅延メッセージ:10 秒後に配信されます
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq timer!",
MessageTag: "",
Properties: map[string]string{},
}
msg.Properties["a"] = strconv.Itoa(i)
// StartDeliverTime はミリ秒単位の UNIX タイムスタンプです
msg.StartDeliverTime = time.Now().UTC().Unix()*1000 + 10*1000
}
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n",
ret.MessageId, ret.MessageBodyMD5)
time.Sleep(time.Duration(100) * time.Millisecond)
}
}Python
#!/usr/bin/env python
# coding=utf8
import sys
import time
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
# クライアントを初期化します
mq_client = MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
)
topic_name = "<your-topic>"
instance_id = "<your-instance-id>"
producer = mq_client.get_producer(instance_id, topic_name)
# 4 つのメッセージを送信します
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n"
% (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
if i % 2 == 0:
# タグ、プロパティ、メッセージキーを持つ通常メッセージ
msg = TopicMessage(
"I am test message %s. Hello" % i,
"" # メッセージタグ
)
msg.put_property("a", "i")
msg.set_message_key("MessageKey")
re_msg = producer.publish_message(msg)
print("Publish Message Succeed. MessageID:%s, BodyMD5:%s"
% (re_msg.message_id, re_msg.message_body_md5))
else:
# 遅延メッセージ:5 秒後に配信されます
msg = TopicMessage(
"I am test message %s." % i,
""
)
msg.put_property("a", i)
# ミリ秒単位の絶対時間
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
re_msg = producer.publish_message(msg)
print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s"
% (re_msg.message_id, re_msg.message_body_md5))
time.sleep(1)
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
$topic = "<your-topic>";
$instanceId = "<your-instance-id>";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i=1; $i<=4; $i++)
{
$publishMessage = new TopicMessage(
"xxxxxxxx" // メッセージ本文
);
$publishMessage->putProperty("a", $i);
$publishMessage->setMessageKey("MessageKey");
if ($i % 2 == 0) {
// 遅延メッセージ:10 秒後に配信されます
$publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
}
$result = $this->producer->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId()
. ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
}
} catch (\Exception $e) {
print_r($e->getMessage() . "\n");
}
}
}
$instance = new ProducerTest();
$instance->run();
?>Node.js
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
const topic = "<your-topic>";
const instanceId = "<your-instance-id>";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
for(var i = 0; i < 4; i++) {
let res;
if (i % 2 == 0) {
// プロパティとメッセージキーを持つ通常メッセージ
msgProps = new MessageProperties();
msgProps.putProperty("a", i);
msgProps.messageKey("MessageKey");
res = await producer.publishMessage("hello mq.", "", msgProps);
} else {
// 遅延メッセージ:10 秒後に配信されます
msgProps = new MessageProperties();
msgProps.putProperty("a", i);
msgProps.startDeliverTime(Date.now() + 10 * 1000);
res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
}
console.log("Publish message: MessageID:%s,BodyMD5:%s",
res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
// 送信失敗の処理:メッセージをリトライまたは永続化します
console.log(e)
}
})();C++
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
string topic = "<your-topic>";
string instanceId = "<your-instance-id>";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
if (i % 4 == 0) {
// 本文のみのメッセージ
producer->publishMessage("Hello, mq!", pmResp);
} else if (i % 4 == 1) {
// 本文とタグを持つメッセージ
producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
} else if (i % 4 == 2) {
// 本文、タグ、プロパティ、キーを持つメッセージ
TopicMessage pubMsg("Hello, mq!have key!");
pubMsg.putProperty("a",std::to_string(i));
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
} else {
// 遅延メッセージ:10 秒後に配信されます
// StartDeliverTime はミリ秒単位の絶対時間です
TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
pubMsg.putProperty("b",std::to_string(i));
pubMsg.putProperty("c",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
}
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode()
<< ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}
return 0;
}C#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class ProducerSample
{
private const string _endpoint = "<your-http-endpoint>";
private const string _accessKeyId = "<your-access-key-id>";
private const string _secretAccessKey = "<your-access-key-secret>";
private const string _topicName = "<your-topic>";
private const string _instanceId = "<your-instance-id>";
private static MQClient _client = new Aliyun.MQ.MQClient(
_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg;
if (i % 2 == 0)
{
// プロパティとメッセージキーを持つ通常メッセージ
sendMsg = new TopicMessage("dfadfadfadf");
sendMsg.PutProperty("a", i.ToString());
sendMsg.MessageKey = "MessageKey";
}
else
{
// 遅延メッセージ:10 秒後に配信されます
sendMsg = new TopicMessage("dfadfadfadf", "tag");
sendMsg.PutProperty("a", i.ToString());
sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp()
+ 10 * 1000;
}
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publis message success:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}コンソールからメッセージを送信するには、ApsaraMQ for RocketMQ コンソールにログインし、ご利用のインスタンスを見つけ、[操作] 列で [その他] > [クイックスタート] を選択します。
ステップ 3: 通常メッセージの消費
メッセージが送信された後、コンシューマーを起動してメッセージを受信し、処理します。各コンシューマーの例は、次のパターンに従います:
ご利用の HTTP エンドポイントと AccessKey ペアで
MQClientを作成します。ご利用のインスタンス、トピック、グループ ID のコンシューマーを取得します。
ロングポーリングを使用してループ内でメッセージをポーリングします。
各メッセージを処理した後、受信ハンドルをブローカーに送信して承認します。
ロングポーリングは、指定された期間 (最大 30 秒) 接続を開いたままにします。そのウィンドウ内にメッセージが到着した場合、ブローカーはタイムアウトを待たずに即座に応答します。
ブローカーがメッセージの NextConsumeTime までに確認応答 (ACK) を受信しない場合、メッセージは再配信されます。配信ごとに新しい受信ハンドルが生成されます。
| パラメーター | 説明 | 制限 |
|---|---|---|
| バッチサイズ | リクエストあたりの最大メッセージ数 | 最大 16 |
| 待機秒数 | ロングポーリングのタイムアウト | 最大 30 秒 |
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
final String topic = "<your-topic>";
final String groupId = "<your-group-id>";
final String instanceId = "<your-instance-id>";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// 本番環境では、複数のスレッドを使用して同時消費を行います
do {
List<Message> messages = null;
try {
// ロングポーリング:最大 3 秒間メッセージを待機します
messages = consumer.consumeMessage(
3, // バッチあたりの最大メッセージ数 (最大 16)
3 // ロングポーリングのタイムアウト (秒単位、最大 30)
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName()
+ ": no new message, continue!");
continue;
}
// メッセージを処理します
for (Message message : messages) {
System.out.println("Receive message: " + message);
}
// 再配信を防ぐためにメッセージを承認します
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:"
+ errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :
errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle
+ ", ErrorCode:" + errors.getErrorMessages()
.get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages()
.get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}Go
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
endpoint := "<your-http-endpoint>"
accessKey := "<your-access-key-id>"
secretKey := "<your-access-key-secret>"
topic := "<your-topic>"
instanceId := "<your-instance-id>"
groupId := "<your-group-id>"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
v.Properties)
}
// 処理済みのメッセージを承認します
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
fmt.Println(ackerr)
for _, errAckItem := range ackerr.(errors.ErrCode).
Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode,
errAckItem.ErrorMsg)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
if strings.Contains(err.(errors.ErrCode).Error(),
"MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// ロングポーリング:最大 3 秒間メッセージを待機します
mqConsumer.ConsumeMessage(respChan, errChan,
3, // バッチあたりの最大メッセージ数 (最大 16)
3, // ロングポーリングのタイムアウト (秒単位、最大 30)
)
<-endChan
}
}Python
#!/usr/bin/env python
# coding=utf8
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# クライアントを初期化します
mq_client = MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
)
topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# ロングポーリング:最大 3 秒間メッセージを待機します
wait_seconds = 3
# バッチあたりの最大メッセージ数 (最大 16)
batch = 3
print("%sConsume And Ack Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n"
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds))
while True:
try:
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print("Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s"
% (msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle))
except MQExceptionBase as e:
if e.type == "MessageNotExist":
print("No new message! RequestId: %s" % e.req_id)
continue
print("Consume Message Fail! Exception:%s\n" % e)
time.sleep(2)
continue
# 再配信を防ぐために処理済みのメッセージを承認します
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print("Ack %s Message Succeed.\n\n" % len(receipt_handle_list))
except MQExceptionBase as e:
print("\nAck Message Fail! Exception:%s" % e)
if e.sub_errors:
for sub_error in e.sub_errors:
print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s"
% (sub_error["ReceiptHandle"],
sub_error["ErrorCode"],
sub_error["ErrorMessage"]))PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
$topic = "<your-topic>";
$groupId = "<your-group-id>";
$instanceId = "<your-instance-id>";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function run()
{
// 本番環境では、複数のスレッドを使用して同時消費を行います
while (True) {
try {
// ロングポーリング:最大 3 秒間メッセージを待機します
$messages = $this->consumer->consumeMessage(
3, // バッチあたりの最大メッセージ数 (最大 16)
3 // ロングポーリングのタイムアウト (秒単位、最大 30)
);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\MessageNotExistException) {
printf("No message, continue long polling!RequestId:%s\n",
$e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "consume finish, messages:\n";
// メッセージを処理します
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, "
. "FirstConsumeTime:%d, \nConsumedTimes:%d, "
. "NextConsumeTime:%d,MessageKey:%s\n",
$message->getMessageId(), $message->getMessageTag(),
$message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(),
$message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// 処理済みのメッセージを承認します
print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\AckMessageException) {
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
$errorItem->getReceiptHandle(),
$errorItem->getErrorCode(),
$errorItem->getErrorCode());
}
}
}
print "ack finish\n";
}
}
}
$instance = new ConsumerTest();
$instance->run();
?>Node.js
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
const topic = "<your-topic>";
const groupId = "<your-group-id>";
const instanceId = "<your-instance-id>";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
while(true) {
try {
// ロングポーリング:最大 3 秒間メッセージを待機します
res = await consumer.consumeMessage(
3, // バッチあたりの最大メッセージ数 (最大 16)
3 // ロングポーリングのタイムアウト (秒単位、最大 30)
);
if (res.code == 200) {
console.log("Consume Messages, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,"
+ "FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s"
+ ",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime,
message.NextConsumeTime, message.FirstConsumeTime,
message.ConsumedTimes,
message.MessageBody, message.Properties, message.MessageKey,
message.Properties.a);
return message.ReceiptHandle;
});
// 処理済みのメッセージを承認します
res = await consumer.ackMessage(handles);
if (res.code != 204) {
console.log("Ack Message Fail:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n",
error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
console.log("Ack Message suc, RequestId:%s\n\t",
res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
console.log("Consume Message: no new message, RequestId:%s, Code:%s",
e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();C++
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
string topic = "<your-topic>";
string groupId = "<your-group-id>";
string instanceId = "<your-instance-id>";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// ロングポーリング:最大 3 秒間メッセージを待機します
consumer->consumeMessage(
3, // バッチあたりの最大メッセージ数 (最大 16)
3, // ロングポーリングのタイムアウト (秒単位、最大 30)
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// メッセージを処理します
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// 処理済みのメッセージを承認します
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter =
failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: "
+ me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode()
+ ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}C#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class ConsumerSample
{
private const string _endpoint = "<your-http-endpoint>";
private const string _accessKeyId = "<your-access-key-id>";
private const string _secretAccessKey = "<your-access-key-secret>";
private const string _topicName = "<your-topic>";
private const string _instanceId = "<your-instance-id>";
private const string _groupId = "<your-group-id>";
private static MQClient _client = new Aliyun.MQ.MQClient(
_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(
_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// 本番環境では、複数のスレッドを使用して同時消費を行います
while (true)
{
try
{
List<Message> messages = null;
try
{
// ロングポーリング:最大 3 秒間メッセージを待機します
messages = consumer.ConsumeMessage(
3, // バッチあたりの最大メッセージ数 (最大 16)
3 // ロングポーリングのタイムアウト (秒単位、最大 30)
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name
+ " No new message, "
+ ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name
+ " Receive Messages:");
// メッセージを処理します
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Property a is:"
+ message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// 処理済みのメッセージを承認します
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
if (exp2 is AckMessageException)
{
AckMessageException ackExp =
(AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:"
+ ackExp.RequestId);
foreach (AckMessageErrorItem errorItem
in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:"
+ errorItem.ReceiptHandle
+ ",ErrorCode:" + errorItem.ErrorCode
+ ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}結果の確認
プロデューサーとコンシューマーを実行した後、メッセージの配信を確認します:
ApsaraMQ for RocketMQ コンソールにログインします。
ご利用のインスタンスを見つけ、メッセージクエリページに移動します。
トピック、メッセージ ID、またはメッセージキーで検索し、メッセージが送信されたことを確認します。
メッセージトレースをチェックして、コンシューマーが各メッセージを受信し、承認したことを確認します。
詳細については、「メッセージのクエリ」および「メッセージトレースのクエリ」をご参照ください。