このトピックでは、Message Service (MNS) (SMQ) SDK for Javaを使用してトピックにサブスクライブし、メッセージを受信する方法について説明します。
前提条件
SMQ SDK for Javaがインストールされていること。詳細については、SDK for Javaのインストールをご参照ください。
エンドポイントとアクセス認証情報が構成されていること。詳細については、エンドポイントとアクセス認証情報の構成をご参照ください。
メッセージ本文のエンコード方式
SIMPLIFIED:メッセージ本文に特殊文字が含まれていない場合は、Base64エンコードを使用しないことをお勧めします。
トピックにメッセージを送信するには、
RawTopicMessage
メソッドを使用してメッセージオブジェクトを初期化します。キューからメッセージをコンシュームするには、
message.getMessageBodyAsRawString()
メソッドを使用してメッセージ本文を取得します。
JSONまたはXML:文字列がJSONやXMLなどのテキスト形式で送信される場合は、Base64エンコードを使用することをお勧めします。
トピックにメッセージを送信するには、TopicMessageメソッドを使用してメッセージオブジェクトを初期化します。この場合、メッセージ本文はBase64エンコードされ、送信のためにMessageフィールドに格納されます。
キューからメッセージをコンシュームするには、
message.getMessageBodyAsRawString();
メソッドを使用してMessageフィールドの値を取得し、Base64デコードを実行します。JSONObject object = new JSONObject(message.getMessageBodyAsRawString()); String jsonMessageData = String.valueOf(object.get("Message")); String messageBody = new String(Base64.decodeBase64(jsonMessageData));
サンプルコード
サンプルコードのダウンロード方法については、ConsumerQueueForTopicDemo.java をご参照ください。
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.io.StringReader;
import java.util.List;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.codec.binary.Base64;
import org.json.JSONException;
import org.json.JSONObject;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
/**
* トピックベースのメッセージングでは、キューはXML、JSON、およびシンプルの3つのタイプに分類され、Base64暗号化の有無が異なります。詳細については、以下のセクションを参照してください。
* 1. Alibaba Cloud の仕様に基づいて、環境に AccessKey ID と AccessKey Secret を構成します。
* 2. 次の内容に基づいて、${"user.home"}/.aliyun-mns.properties ファイルを構成します。
* mns.endpoint=http://xxxxxxx
* mns.msgBodyBase64Switch=true/false
*/
public class ConsumerQueueForTopicDemo {
/**
* メッセージ本文を Base64 でエンコードするかどうかを指定します。
*/
private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
public static void main(String[] args) {
String QUEUE_NAME = "TestQueue";
// Alibaba Cloud の仕様に基づいて、環境に AccessKey ID と AccessKey Secret を構成します。
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
//this client need only initialize once // このクライアントは一度だけ初期化する必要があります
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(QUEUE_NAME);
try {
longPollingBatchReceive(queue);
} catch (ClientException ce) {
System.out.println("Something wrong with the network connection between client and MNS service." // クライアントと MNS サービス間のネットワーク接続に問題があります。
+ "Please check your network and DNS availablity."); // ネットワークと DNS の可用性をご確認ください。
ce.printStackTrace();
} catch (ServiceException se) {
if (se.getErrorCode().equals("QueueNotExist")) {
System.out.println("Queue is not exist. Please create queue before use"); // キューが存在しません。使用する前にキューを作成してください
} else if (se.getErrorCode().equals("TimeExpired")) {
System.out.println("The request is time expired. Please check your local machine timeclock"); // 要求がタイムアウトしました。ローカルマシンの時計をご確認ください
}
se.printStackTrace();
} catch (Exception e) {
System.out.println("Unknown exception happened!"); // 不明な例外が発生しました!
e.printStackTrace();
}
client.close();
}
private static void longPollingBatchReceive(CloudQueue queue) {
System.out.println("=============start longPollingBatchReceive============="); // =============longPollingBatchReceive 開始=============
// The maximum number of messages that can be received at a time. // 一度に受信できるメッセージの最大数。
int batchSize = 15;
// The long polling period. Unit: seconds. // ロングポーリング期間。単位:秒。
int waitSeconds = 15;
List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
if (messages != null && messages.size() > 0) {
for (Message message : messages) {
System.out.println("message handle: " + message.getReceiptHandle()); // メッセージハンドル:
System.out.println("message body: " + message.getOriginalMessageBody()); // メッセージ本文:
System.out.println("message body real data: " + getMessageBodyData(message)); // メッセージ本文の実データ:
System.out.println("message id: " + message.getMessageId()); // メッセージID:
System.out.println("message dequeue count:" + message.getDequeueCount()); // メッセージデキューカウント:
//<<to add your special logic.>> // <<特別なロジックを追加します。>>
//remember to delete message when consume message successfully. // メッセージの消費に成功したら、メッセージを削除することを忘れないでください。
queue.deleteMessage(message.getReceiptHandle());
System.out.println("delete message successfully.\n"); // メッセージの削除に成功しました。\n
}
}
System.out.println("=============end longPollingBatchReceive============="); // =============longPollingBatchReceive 終了=============
}
private static String getMessageBodyData(Message message){
if (message == null){
return null;
}
String originalMessageBody = message.getOriginalMessageBody();
// 1. Parse the message body into a JSON object. // 1. メッセージ本文を JSON オブジェクトに解析します。
try {
JSONObject object = new JSONObject(originalMessageBody);
String jsonMessageData = String.valueOf(object.get("Message"));
System.out.println("message body type: JSON,value:"+jsonMessageData ); // メッセージ本文タイプ:JSON、値:
return IS_BASE64? new String(Base64.decodeBase64(jsonMessageData)): jsonMessageData;
} catch (JSONException ex1) {
// If the message body is not parsed into a JSON object, check whether the message body can be parsed into an XML file. // メッセージ本文が JSON オブジェクトに解析されない場合は、メッセージ本文が XML ファイルに解析できるかどうかを確認します。
}
// 2. Parse the message body into an XML file. // 2. メッセージ本文を XML ファイルに解析します。
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document doc = builder.parse(new InputSource(new StringReader(originalMessageBody)));
Element root = doc.getDocumentElement();
NodeList nodeList = root.getElementsByTagName("Message");
String content = nodeList.item(0).getTextContent();
System.out.println("message body type: XML,value:"+content ); // メッセージ本文タイプ:XML、値:
return IS_BASE64? new String(Base64.decodeBase64(content)): content;
} catch (Exception ex) {
// The XML file is invalid. // XML ファイルが無効です。
}
// If the message body is not parsed into a JSON object or an XML file, the parsed message body is considered a simple text. // メッセージ本文が JSON オブジェクトまたは XML ファイルに解析されない場合、解析されたメッセージ本文は単純なテキストと見なされます。
System.out.println("message body type: SIMPLE" ); // メッセージ本文タイプ:SIMPLE
return IS_BASE64 ? message.getMessageBody() : message.getMessageBodyAsRawString();
}
}