Simple Message Queue (SMQ) は、特定のトピックのメッセージを HTTP API サービスにプッシュできます。 このトピックでは、SMQ で特定のトピックのメッセージを受信して処理するための HTTP サブスクリプションを作成する方法について説明します。
前提条件
SMQ と HTTP サーバー間のネットワークが接続されているため、HTTP リクエストを HTTP サーバーにプッシュできます。
メッセージ本文のエンコード方式を選択する
SIMPLIFIED:メッセージ本文に特殊文字が含まれていない場合は、Base64 エンコードを使用しないことをお勧めします。
トピックにメッセージを送信するには、
RawTopicMessageメソッドを使用してメッセージオブジェクトを初期化します。キューからメッセージをコンシュームするには、
message.getMessageBodyAsRawString()メソッドを使用してメッセージ本文を取得します。
JSON または XML:文字列が JSON や XML などのテキスト形式で送信される場合は、Base64 エンコードを使用することをお勧めします。
トピックにメッセージを送信するには、TopicMessage メソッドを使用してメッセージオブジェクトを初期化します。この場合、メッセージ本文は Base64 エンコードされ、送信のために Message フィールドに格納されます。
キューからメッセージをコンシュームするには、
message.getMessageBodyAsRawString();メソッドを使用して Message フィールドの値を取得し、Base64 デコードを実行します。JSONObject object = new JSONObject(message.getMessageBodyAsRawString()); String jsonMessageData = String.valueOf(object.get("Message")); String messageBody = new String(Base64.decodeBase64(jsonMessageData));
サンプルコード
完全なサンプルコードについては、「HttpEndpointSubscription.java」をご参照ください。
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.BindException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.PublicKey;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import javax.net.ssl.SSLServerSocketFactory;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpConnectionFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpServerConnection;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.impl.DefaultBHttpServerConnection;
import org.apache.http.impl.DefaultBHttpServerConnectionFactory;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpProcessorBuilder;
import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.protocol.HttpService;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.http.protocol.UriHttpRequestHandlerMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
/**
* /notifications ディレクトリに送信されたリクエストを処理する HTTP/1.1 ファイルサーバー。
* サブスクリプションメッセージを受信するために使用されるサンプル HTTP サービス。
*/
public class HttpEndpointSubscription {
public static Logger logger = LoggerFactory.getLogger(HttpEndpointSubscription.class);
public static Thread t;
private int port;
/**
* 静的メソッドを使用して、ホストの IP アドレスを使用してエンドポイントを生成します。
*
* @return http エンドポイント
*/
public static String genEndpointLocal() {
return HttpEndpointSubscription.genEndpointLocal(80);
}
/**
* 静的メソッドを使用して、ホストの IP アドレスを使用してエンドポイントを生成します。
*
* @param port, http サーバーポート
* @return http エンドポイント
*/
public static String genEndpointLocal(int port) {
try {
InetAddress addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress().toString();
return "http://" + ip + ":" + port;
} catch (UnknownHostException e) {
e.printStackTrace();
logger.warn("ローカルホストの取得に失敗しました," + e.getMessage());
return "http://127.0.0.1:" + port;
}
}
/**
* コンストラクターを使用して HttpEndpoint オブジェクトを構築するポートを指定します。
*
* @param port HTTP サーバーのポート。
*/
public HttpEndpointSubscription(int port) {
init(port);
}
/**
* コンストラクターを使用して HttpEndpoint オブジェクトを構築するポートを指定します。デフォルトでは、ポート 80 が使用されます。
*/
public HttpEndpointSubscription() {
init(80);
}
private void init(int port) {
this.port = port;
t = null;
}
/**
* http サーバーを開始する
*
* @throws Exception 例外
*/
public void start() throws Exception {
// ポートが使用されているかどうかを確認する
try {
new Socket(InetAddress.getLocalHost(), this.port);
System.out.println("ポートは使用されています!");
logger.error("ポートは既に使用されています。http サーバーの起動に失敗しました");
throw new BindException("ポートは既に使用されています");
} catch (IOException e) {
//e.printStackTrace();
}
// HTTP プロトコルプロセッサを設定する
HttpProcessor httpproc = HttpProcessorBuilder.create()
.add(new ResponseDate())
.add(new ResponseServer("MNS-Endpoint/1.1"))
.add(new ResponseContent())
.add(new ResponseConnControl()).build();
// リクエストハンドラーを設定し、NSHandler クラスで /notifications リクエストをリッスンする
UriHttpRequestHandlerMapper reqistry = new UriHttpRequestHandlerMapper();
reqistry.register("/notifications", new NSHandler());
reqistry.register("/simplified", new SimplifiedNSHandler());
// HTTP サービスを設定する
HttpService httpService = new HttpService(httpproc, reqistry);
// http サーバーのスレッドを開始する
t = new RequestListenerThread(port, httpService, null);
t.setDaemon(false);
t.start();
}
/**
* http エンドポイントを停止する
*/
public void stop() {
if (t != null) {
t.interrupt();
try {
t.join(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("エンドポイント停止");
}
/**
* このリクエストが MNS サーバーからのものかどうかを確認する
*
* @param method, http メソッド
* @param uri, http uri
* @param headers, http ヘッダー
* @param cert, 証明書 URL
* @return 検証に合格した場合は true
*/
private Boolean authenticate(String method, String uri, Map<String, String> headers, String cert) {
String str2sign = getSignStr(method, uri, headers);
//System.out.println(str2sign);
String signature = headers.get("Authorization");
byte[] decodedSign = Base64.decodeBase64(signature);
// 証明書を取得し、この証明書でこのリクエストを検証する
try {
//String cert = "http://mnstest.oss-cn-hangzhou.aliyuncs.com/x509_public_certificate.pem";
URL url = new URL(cert);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
DataInputStream in = new DataInputStream(conn.getInputStream());
CertificateFactory cf = CertificateFactory.getInstance("X.509");
Certificate c = cf.generateCertificate(in);
PublicKey pk = c.getPublicKey();
java.security.Signature signetcheck = java.security.Signature.getInstance("SHA1withRSA");
signetcheck.initVerify(pk);
signetcheck.update(str2sign.getBytes());
Boolean res = signetcheck.verify(decodedSign);
return res;
} catch (Exception e) {
e.printStackTrace();
logger.warn("認証に失敗しました、" + e.getMessage());
return false;
}
}
/**
* 署名用の文字列を作成する
*
* @param method, http メソッド
* @param uri, http uri
* @param headers, http ヘッダー
* @return 署名用文字列
*/
private String getSignStr(String method, String uri, Map<String, String> headers) {
StringBuilder sb = new StringBuilder();
sb.append(method);
sb.append("\n");
sb.append(safeGetHeader(headers, "Content-md5"));
sb.append("\n");
sb.append(safeGetHeader(headers, "Content-Type"));
sb.append("\n");
sb.append(safeGetHeader(headers, "Date"));
sb.append("\n");
List<String> tmp = new ArrayList<String>();
for (Map.Entry<String, String> entry : headers.entrySet()) {
if (entry.getKey().startsWith("x-mns-")) {
tmp.add(entry.getKey() + ":" + entry.getValue());
}
}
Collections.sort(tmp);
for (String kv : tmp) {
sb.append(kv);
sb.append("\n");
}
sb.append(uri);
return sb.toString();
}
private String safeGetHeader(Map<String, String> headers, String name) {
if (headers.containsKey(name)) {
return headers.get(name);
} else {
return "";
}
}
public class SimplifiedNSHandler implements HttpRequestHandler {
/**
* NSHandler の処理メソッド
*
* @param request, http リクエスト
* @param response, http レスポンス
* @param context, http コンテキスト
* @throws HttpException 例外
* @throws IOException 例外
*/
@Override
public void handle(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " メソッドはサポートされていません");
}
Header[] headers = request.getAllHeaders();
Map<String, String> hm = new HashMap<String, String>();
for (Header h : headers) {
System.out.println(h.getName() + ":" + h.getValue());
hm.put(h.getName(), h.getValue());
}
String target = request.getRequestLine().getUri();
System.out.println(target);
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
// リクエストを検証する
Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
if (certHeader == null) {
System.out.println("SigningCerURL ヘッダーが見つかりません");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
String cert = certHeader.getValue();
if (cert.isEmpty()) {
System.out.println("SigningCertURL が空です");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
cert = new String(Base64.decodeBase64(cert));
System.out.println("SigningCertURL:\t" + cert);
logger.debug("SigningCertURL:\t" + cert);
if (!authenticate(method, target, hm, cert)) {
System.out.println("認証に失敗しました");
logger.warn("認証に失敗しました");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
// 簡略化された通知のコンテンツを解析する
InputStream is = entity.getContent();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
StringBuffer buffer = new StringBuffer();
String line = "";
while ((line = reader.readLine()) != null) {
buffer.append(line);
}
String content = buffer.toString();
System.out.println("簡略化された通知: \n" + content);
}
response.setStatusCode(HttpStatus.SC_NO_CONTENT);
}
}
/**
* /notifications リクエストを処理するための中核クラス
*/
public class NSHandler implements HttpRequestHandler {
public Logger logger = LoggerFactory.getLogger(HttpRequestHandler.class);
public NSHandler() {
super();
}
private String safeGetElementContent(Element element, String tag) {
NodeList nl = element.getElementsByTagName(tag);
if (nl != null && nl.getLength() > 0) {
return nl.item(0).getTextContent();
} else {
logger.warn(tag + " の xml からの取得に失敗しました");
return "";
}
}
/**
* /notifications メッセージコンテンツを解析する
*
* @param notify, xml 要素
*/
private void parserContent(Element notify) {
try {
String topicOwner = safeGetElementContent(notify, "TopicOwner");
System.out.println("TopicOwner:\t" + topicOwner);
logger.debug("TopicOwner:\t" + topicOwner);
String topicName = safeGetElementContent(notify, "TopicName");
System.out.println("TopicName:\t" + topicName);
logger.debug("TopicName:\t" + topicName);
String subscriber = safeGetElementContent(notify, "Subscriber");
System.out.println("Subscriber:\t" + subscriber);
logger.debug("Subscriber:\t" + subscriber);
String subscriptionName = safeGetElementContent(notify, "SubscriptionName");
System.out.println("SubscriptionName:\t" + subscriptionName);
logger.debug("SubscriptionName:\t" + subscriptionName);
String msgid = safeGetElementContent(notify, "MessageId");
System.out.println("MessageId:\t" + msgid);
logger.debug("MessageId:\t" + msgid);
// base64 メッセージを含む PublishMessage の場合
String msg = safeGetElementContent(notify, "Message");
System.out.println("Message:\t" + new String(Base64.decodeBase64(msg)));
logger.debug("Message:\t" + new String(Base64.decodeBase64(msg)));
// 文字列メッセージを含む PublishMessage の場合
//String msg = safeGetElementContent(notify, "Message");
//System.out.println("Message:\t" + msg);
//logger.debug("Message:\t" + msg);
String msgMD5 = safeGetElementContent(notify, "MessageMD5");
System.out.println("MessageMD5:\t" + msgMD5);
logger.debug("MessageMD5:\t" + msgMD5);
String msgPublishTime = safeGetElementContent(notify, "PublishTime");
Date d = new Date(Long.parseLong(msgPublishTime));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String strdate = sdf.format(d);
System.out.println("PublishTime:\t" + strdate);
logger.debug("MessagePublishTime:\t" + strdate);
String msgTag = safeGetElementContent(notify, "MessageTag");
if (!msgTag.equals("")) {
System.out.println("MessageTag:\t" + msgTag);
logger.debug("MessageTag:\t" + msgTag);
}
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
logger.warn(e.getMessage());
}
}
/**
* NSHandler の処理メソッド
*
* @param request, http リクエスト
* @param response, http レスポンス
* @param context, http コンテキスト
* @throws HttpException 例外
* @throws IOException 例外
*/
@Override
public void handle(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " メソッドはサポートされていません");
}
Header[] headers = request.getAllHeaders();
Map<String, String> hm = new HashMap<String, String>();
for (Header h : headers) {
System.out.println(h.getName() + ":" + h.getValue());
hm.put(h.getName(), h.getValue());
}
String target = request.getRequestLine().getUri();
System.out.println(target);
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
// xml コンテンツを解析する
InputStream content = entity.getContent();
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
Element notify = null;
try {
DocumentBuilder db = dbf.newDocumentBuilder();
Document document = db.parse(content);
NodeList nl = document.getElementsByTagName("Notification");
if (nl == null || nl.getLength() == 0) {
System.out.println("xml タグエラー");
logger.warn("xml タグエラー");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
notify = (Element) nl.item(0);
} catch (ParserConfigurationException e) {
e.printStackTrace();
logger.warn("xml 解析に失敗しました! " + e.getMessage());
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
} catch (SAXException e) {
e.printStackTrace();
logger.warn("xml 解析に失敗しました! " + e.getMessage());
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
// リクエストを検証する
Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
if (certHeader == null) {
System.out.println("SigningCerURL ヘッダーが見つかりません");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
String cert = certHeader.getValue();
if (cert.isEmpty()) {
System.out.println("SigningCertURL が空です");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
cert = new String(Base64.decodeBase64(cert));
System.out.println("SigningCertURL:\t" + cert);
logger.debug("SigningCertURL:\t" + cert);
if (!authenticate(method, target, hm, cert)) {
System.out.println("認証に失敗しました");
logger.warn("認証に失敗しました");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
parserContent(notify);
}
response.setStatusCode(HttpStatus.SC_NO_CONTENT);
}
}
/**
* http リッスンワーカースレッド
*/
public class RequestListenerThread extends Thread {
private final HttpConnectionFactory<DefaultBHttpServerConnection> connFactory;
private final ServerSocket serversocket;
private final HttpService httpService;
public RequestListenerThread(
final int port,
final HttpService httpService,
final SSLServerSocketFactory sf) throws IOException {
this.connFactory = DefaultBHttpServerConnectionFactory.INSTANCE;
this.serversocket = sf != null ? sf.createServerSocket(port) : new ServerSocket(port);
this.httpService = httpService;
}
@Override
public void run() {
System.out.println("ポート " + this.serversocket.getLocalPort() + " でリッスンしています");
Thread t = null;
while (!Thread.interrupted()) {
try {
// HTTP 接続を設定する
Socket socket = this.serversocket.accept();
System.out.println(socket.getInetAddress() + " からの着信接続");
HttpServerConnection conn = this.connFactory.createConnection(socket);
// ワーカースレッドを開始する
t = new WorkerThread(this.httpService, conn);
t.setDaemon(true);
t.start();
} catch (IOException e) {
System.err.println("エンドポイント http サーバーの停止または IO エラー: "
+ e.getMessage());
try {
if (t != null) {
t.join(5 * 1000);
}
} catch (InterruptedException e1) {
//e1.printStackTrace();
}
break;
}
}
}
@Override
public void interrupt() {
super.interrupt();
try {
this.serversocket.close();
} catch (IOException e) {
//e.printStackTrace();
}
}
}
/**
* http ワーカースレッド。/notifications を NSHandler にディスパッチします。
*/
public class WorkerThread extends Thread {
private final HttpService httpservice;
private final HttpServerConnection conn;
public WorkerThread(
final HttpService httpservice,
final HttpServerConnection conn) {
super();
this.httpservice = httpservice;
this.conn = conn;
}
@Override
public void run() {
System.out.println("新しい接続スレッド");
HttpContext context = new BasicHttpContext(null);
try {
while (!Thread.interrupted() && this.conn.isOpen()) {
this.httpservice.handleRequest(this.conn, context);
}
} catch (ConnectionClosedException ex) {
System.err.println("クライアントが接続を閉じました");
} catch (IOException ex) {
System.err.println("I/O エラー: " + ex.getMessage());
} catch (HttpException ex) {
System.err.println("回復不能な HTTP プロトコル違反: " + ex.getMessage());
} finally {
try {
this.conn.shutdown();
} catch (IOException ignore) {
}
}
}
}
/**
* main 関数を使用します。
*
* @param args 引数
*/
public static void main(String[] args) {
int port = 8080;
HttpEndpointSubscription httpEndpointSubscription = null;
try {
httpEndpointSubscription = new HttpEndpointSubscription(port);
httpEndpointSubscription.start();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (httpEndpointSubscription != null) {
httpEndpointSubscription.stop();
}
}
}
}