すべてのプロダクト
Search
ドキュメントセンター

Simple Message Queue (formerly MNS):Java 用 SMQ SDK を使用して HTTP サブスクリプションを作成する

最終更新日:Jan 13, 2025

Simple Message Queue (SMQ) は、特定のトピックのメッセージを HTTP API サービスにプッシュできます。 このトピックでは、SMQ で特定のトピックのメッセージを受信して処理するための HTTP サブスクリプションを作成する方法について説明します。

前提条件

SMQ と HTTP サーバー間のネットワークが接続されているため、HTTP リクエストを HTTP サーバーにプッシュできます。

メッセージ本文のエンコード方式を選択する

  • SIMPLIFIED:メッセージ本文に特殊文字が含まれていない場合は、Base64 エンコードを使用しないことをお勧めします。

    • トピックにメッセージを送信するには、RawTopicMessage メソッドを使用してメッセージオブジェクトを初期化します。

    • キューからメッセージをコンシュームするには、message.getMessageBodyAsRawString() メソッドを使用してメッセージ本文を取得します。

  • JSON または XML:文字列が JSON や XML などのテキスト形式で送信される場合は、Base64 エンコードを使用することをお勧めします。

    • トピックにメッセージを送信するには、TopicMessage メソッドを使用してメッセージオブジェクトを初期化します。この場合、メッセージ本文は Base64 エンコードされ、送信のために Message フィールドに格納されます。

    • キューからメッセージをコンシュームするには、message.getMessageBodyAsRawString(); メソッドを使用して Message フィールドの値を取得し、Base64 デコードを実行します。

      JSONObject object = new JSONObject(message.getMessageBodyAsRawString());
      String jsonMessageData = String.valueOf(object.get("Message"));
      String messageBody = new String(Base64.decodeBase64(jsonMessageData));

サンプルコード

完全なサンプルコードについては、「HttpEndpointSubscription.java」をご参照ください。

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.BindException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.PublicKey;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import javax.net.ssl.SSLServerSocketFactory;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpConnectionFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpServerConnection;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.impl.DefaultBHttpServerConnection;
import org.apache.http.impl.DefaultBHttpServerConnectionFactory;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpProcessorBuilder;
import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.protocol.HttpService;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.http.protocol.UriHttpRequestHandlerMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

/**
 * /notifications ディレクトリに送信されたリクエストを処理する HTTP/1.1 ファイルサーバー。
 * サブスクリプションメッセージを受信するために使用されるサンプル HTTP サービス。
 */
public class HttpEndpointSubscription {
    public static Logger logger = LoggerFactory.getLogger(HttpEndpointSubscription.class);
    public static Thread t;
    private int port;

    /**
     * 静的メソッドを使用して、ホストの IP アドレスを使用してエンドポイントを生成します。
     *
     * @return http エンドポイント
     */
    public static String genEndpointLocal() {
        return HttpEndpointSubscription.genEndpointLocal(80);
    }

    /**
     * 静的メソッドを使用して、ホストの IP アドレスを使用してエンドポイントを生成します。
     *
     * @param port, http サーバーポート
     * @return http エンドポイント
     */
    public static String genEndpointLocal(int port) {
        try {
            InetAddress addr = InetAddress.getLocalHost();
            String ip = addr.getHostAddress().toString();
            return "http://" + ip + ":" + port;
        } catch (UnknownHostException e) {
            e.printStackTrace();
            logger.warn("ローカルホストの取得に失敗しました," + e.getMessage());
            return "http://127.0.0.1:" + port;
        }

    }

    /**
     * コンストラクターを使用して HttpEndpoint オブジェクトを構築するポートを指定します。
     *
     * @param port HTTP サーバーのポート。
     */
    public HttpEndpointSubscription(int port) {
        init(port);
    }

    /**
     * コンストラクターを使用して HttpEndpoint オブジェクトを構築するポートを指定します。デフォルトでは、ポート 80 が使用されます。
     */
    public HttpEndpointSubscription() {
        init(80);
    }

    private void init(int port) {
        this.port = port;
        t = null;
    }

    /**
     * http サーバーを開始する
     *
     * @throws Exception 例外
     */
    public void start() throws Exception {
        // ポートが使用されているかどうかを確認する
        try {
            new Socket(InetAddress.getLocalHost(), this.port);
            System.out.println("ポートは使用されています!");
            logger.error("ポートは既に使用されています。http サーバーの起動に失敗しました");
            throw new BindException("ポートは既に使用されています");
        } catch (IOException e) {
            //e.printStackTrace();

        }

        // HTTP プロトコルプロセッサを設定する
        HttpProcessor httpproc = HttpProcessorBuilder.create()
            .add(new ResponseDate())
            .add(new ResponseServer("MNS-Endpoint/1.1"))
            .add(new ResponseContent())
            .add(new ResponseConnControl()).build();

        // リクエストハンドラーを設定し、NSHandler クラスで /notifications リクエストをリッスンする
        UriHttpRequestHandlerMapper reqistry = new UriHttpRequestHandlerMapper();
        reqistry.register("/notifications", new NSHandler());
        reqistry.register("/simplified", new SimplifiedNSHandler());

        // HTTP サービスを設定する
        HttpService httpService = new HttpService(httpproc, reqistry);

        // http サーバーのスレッドを開始する
        t = new RequestListenerThread(port, httpService, null);
        t.setDaemon(false);
        t.start();
    }

    /**
     * http エンドポイントを停止する
     */
    public void stop() {
        if (t != null) {
            t.interrupt();
            try {
                t.join(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("エンドポイント停止");
    }

    /**
     * このリクエストが MNS サーバーからのものかどうかを確認する
     *
     * @param method,  http メソッド
     * @param uri,     http uri
     * @param headers, http ヘッダー
     * @param cert,    証明書 URL
     * @return 検証に合格した場合は true
     */
    private Boolean authenticate(String method, String uri, Map<String, String> headers, String cert) {
        String str2sign = getSignStr(method, uri, headers);
        //System.out.println(str2sign);
        String signature = headers.get("Authorization");
        byte[] decodedSign = Base64.decodeBase64(signature);
        // 証明書を取得し、この証明書でこのリクエストを検証する
        try {
            //String cert = "http://mnstest.oss-cn-hangzhou.aliyuncs.com/x509_public_certificate.pem";
            URL url = new URL(cert);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            DataInputStream in = new DataInputStream(conn.getInputStream());
            CertificateFactory cf = CertificateFactory.getInstance("X.509");

            Certificate c = cf.generateCertificate(in);
            PublicKey pk = c.getPublicKey();

            java.security.Signature signetcheck = java.security.Signature.getInstance("SHA1withRSA");
            signetcheck.initVerify(pk);
            signetcheck.update(str2sign.getBytes());
            Boolean res = signetcheck.verify(decodedSign);
            return res;
        } catch (Exception e) {
            e.printStackTrace();
            logger.warn("認証に失敗しました、" + e.getMessage());
            return false;
        }
    }

    /**
     * 署名用の文字列を作成する
     *
     * @param method,  http メソッド
     * @param uri,     http uri
     * @param headers, http ヘッダー
     * @return 署名用文字列
     */
    private String getSignStr(String method, String uri, Map<String, String> headers) {
        StringBuilder sb = new StringBuilder();
        sb.append(method);
        sb.append("\n");
        sb.append(safeGetHeader(headers, "Content-md5"));
        sb.append("\n");
        sb.append(safeGetHeader(headers, "Content-Type"));
        sb.append("\n");
        sb.append(safeGetHeader(headers, "Date"));
        sb.append("\n");

        List<String> tmp = new ArrayList<String>();
        for (Map.Entry<String, String> entry : headers.entrySet()) {
            if (entry.getKey().startsWith("x-mns-")) {
                tmp.add(entry.getKey() + ":" + entry.getValue());
            }
        }
        Collections.sort(tmp);

        for (String kv : tmp) {
            sb.append(kv);
            sb.append("\n");
        }

        sb.append(uri);
        return sb.toString();
    }

    private String safeGetHeader(Map<String, String> headers, String name) {
        if (headers.containsKey(name)) {
            return headers.get(name);
        } else {
            return "";
        }
    }

    public class SimplifiedNSHandler implements HttpRequestHandler {
        /**
         * NSHandler の処理メソッド
         *
         * @param request,  http リクエスト
         * @param response, http レスポンス
         * @param context,  http コンテキスト
         * @throws HttpException 例外
         * @throws IOException   例外
         */
        @Override
        public void handle(
            final HttpRequest request,
            final HttpResponse response,
            final HttpContext context) throws HttpException, IOException {

            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);

            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
                throw new MethodNotSupportedException(method + " メソッドはサポートされていません");
            }

            Header[] headers = request.getAllHeaders();
            Map<String, String> hm = new HashMap<String, String>();
            for (Header h : headers) {
                System.out.println(h.getName() + ":" + h.getValue());
                hm.put(h.getName(), h.getValue());
            }

            String target = request.getRequestLine().getUri();
            System.out.println(target);

            if (request instanceof HttpEntityEnclosingRequest) {
                HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();

                // リクエストを検証する
                Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
                if (certHeader == null) {
                    System.out.println("SigningCerURL ヘッダーが見つかりません");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }

                String cert = certHeader.getValue();
                if (cert.isEmpty()) {
                    System.out.println("SigningCertURL が空です");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }
                cert = new String(Base64.decodeBase64(cert));
                System.out.println("SigningCertURL:\t" + cert);
                logger.debug("SigningCertURL:\t" + cert);

                if (!authenticate(method, target, hm, cert)) {
                    System.out.println("認証に失敗しました");
                    logger.warn("認証に失敗しました");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }

                // 簡略化された通知のコンテンツを解析する
                InputStream is = entity.getContent();
                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
                StringBuffer buffer = new StringBuffer();
                String line = "";
                while ((line = reader.readLine()) != null) {
                    buffer.append(line);
                }
                String content = buffer.toString();

                System.out.println("簡略化された通知: \n" + content);
            }

            response.setStatusCode(HttpStatus.SC_NO_CONTENT);
        }
    }

    /**
     * /notifications リクエストを処理するための中核クラス
     */
    public class NSHandler implements HttpRequestHandler {
        public Logger logger = LoggerFactory.getLogger(HttpRequestHandler.class);

        public NSHandler() {
            super();
        }

        private String safeGetElementContent(Element element, String tag) {
            NodeList nl = element.getElementsByTagName(tag);
            if (nl != null && nl.getLength() > 0) {
                return nl.item(0).getTextContent();
            } else {
                logger.warn(tag + " の xml からの取得に失敗しました");
                return "";
            }
        }

        /**
         * /notifications メッセージコンテンツを解析する
         *
         * @param notify, xml 要素
         */
        private void parserContent(Element notify) {
            try {
                String topicOwner = safeGetElementContent(notify, "TopicOwner");
                System.out.println("TopicOwner:\t" + topicOwner);
                logger.debug("TopicOwner:\t" + topicOwner);

                String topicName = safeGetElementContent(notify, "TopicName");
                System.out.println("TopicName:\t" + topicName);
                logger.debug("TopicName:\t" + topicName);

                String subscriber = safeGetElementContent(notify, "Subscriber");
                System.out.println("Subscriber:\t" + subscriber);
                logger.debug("Subscriber:\t" + subscriber);

                String subscriptionName = safeGetElementContent(notify, "SubscriptionName");
                System.out.println("SubscriptionName:\t" + subscriptionName);
                logger.debug("SubscriptionName:\t" + subscriptionName);

                String msgid = safeGetElementContent(notify, "MessageId");
                System.out.println("MessageId:\t" + msgid);
                logger.debug("MessageId:\t" + msgid);

                // base64 メッセージを含む PublishMessage の場合
                String msg = safeGetElementContent(notify, "Message");
                System.out.println("Message:\t" + new String(Base64.decodeBase64(msg)));
                logger.debug("Message:\t" + new String(Base64.decodeBase64(msg)));

                // 文字列メッセージを含む PublishMessage の場合
                //String msg = safeGetElementContent(notify, "Message");
                //System.out.println("Message:\t" + msg);
                //logger.debug("Message:\t" + msg);

                String msgMD5 = safeGetElementContent(notify, "MessageMD5");
                System.out.println("MessageMD5:\t" + msgMD5);
                logger.debug("MessageMD5:\t" + msgMD5);

                String msgPublishTime = safeGetElementContent(notify, "PublishTime");
                Date d = new Date(Long.parseLong(msgPublishTime));
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String strdate = sdf.format(d);
                System.out.println("PublishTime:\t" + strdate);
                logger.debug("MessagePublishTime:\t" + strdate);

                String msgTag = safeGetElementContent(notify, "MessageTag");
                if (!msgTag.equals("")) {
                    System.out.println("MessageTag:\t" + msgTag);
                    logger.debug("MessageTag:\t" + msgTag);
                }

            } catch (Exception e) {
                System.out.println(e.getMessage());
                e.printStackTrace();
                logger.warn(e.getMessage());
            }

        }

        /**
         * NSHandler の処理メソッド
         *
         * @param request,  http リクエスト
         * @param response, http レスポンス
         * @param context,  http コンテキスト
         * @throws HttpException 例外
         * @throws IOException   例外
         */
        @Override
        public void handle(
            final HttpRequest request,
            final HttpResponse response,
            final HttpContext context) throws HttpException, IOException {

            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);

            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
                throw new MethodNotSupportedException(method + " メソッドはサポートされていません");
            }

            Header[] headers = request.getAllHeaders();
            Map<String, String> hm = new HashMap<String, String>();
            for (Header h : headers) {
                System.out.println(h.getName() + ":" + h.getValue());
                hm.put(h.getName(), h.getValue());
            }

            String target = request.getRequestLine().getUri();
            System.out.println(target);

            if (request instanceof HttpEntityEnclosingRequest) {
                HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();

                // xml コンテンツを解析する
                InputStream content = entity.getContent();
                DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
                Element notify = null;
                try {
                    DocumentBuilder db = dbf.newDocumentBuilder();
                    Document document = db.parse(content);
                    NodeList nl = document.getElementsByTagName("Notification");
                    if (nl == null || nl.getLength() == 0) {
                        System.out.println("xml タグエラー");
                        logger.warn("xml タグエラー");
                        response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                        return;
                    }
                    notify = (Element) nl.item(0);
                } catch (ParserConfigurationException e) {
                    e.printStackTrace();
                    logger.warn("xml 解析に失敗しました! " + e.getMessage());
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                } catch (SAXException e) {
                    e.printStackTrace();
                    logger.warn("xml 解析に失敗しました! " + e.getMessage());
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }

                // リクエストを検証する
                Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
                if (certHeader == null) {
                    System.out.println("SigningCerURL ヘッダーが見つかりません");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }

                String cert = certHeader.getValue();
                if (cert.isEmpty()) {
                    System.out.println("SigningCertURL が空です");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }
                cert = new String(Base64.decodeBase64(cert));
                System.out.println("SigningCertURL:\t" + cert);
                logger.debug("SigningCertURL:\t" + cert);

                if (!authenticate(method, target, hm, cert)) {
                    System.out.println("認証に失敗しました");
                    logger.warn("認証に失敗しました");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }
                parserContent(notify);

            }

            response.setStatusCode(HttpStatus.SC_NO_CONTENT);
        }

    }

    /**
     * http リッスンワーカースレッド
     */
    public class RequestListenerThread extends Thread {

        private final HttpConnectionFactory<DefaultBHttpServerConnection> connFactory;
        private final ServerSocket serversocket;
        private final HttpService httpService;

        public RequestListenerThread(
            final int port,
            final HttpService httpService,
            final SSLServerSocketFactory sf) throws IOException {
            this.connFactory = DefaultBHttpServerConnectionFactory.INSTANCE;
            this.serversocket = sf != null ?  sf.createServerSocket(port) : new ServerSocket(port);
            this.httpService = httpService;
        }

        @Override
        public void run() {
            System.out.println("ポート " + this.serversocket.getLocalPort() + " でリッスンしています");
            Thread t = null;
            while (!Thread.interrupted()) {
                try {
                    // HTTP 接続を設定する
                    Socket socket = this.serversocket.accept();
                    System.out.println(socket.getInetAddress() + " からの着信接続");
                    HttpServerConnection conn = this.connFactory.createConnection(socket);

                    // ワーカースレッドを開始する
                    t = new WorkerThread(this.httpService, conn);
                    t.setDaemon(true);
                    t.start();
                } catch (IOException e) {
                    System.err.println("エンドポイント http サーバーの停止または IO エラー: "
                        + e.getMessage());
                    try {
                        if (t != null) {
                            t.join(5 * 1000);
                        }
                    } catch (InterruptedException e1) {
                        //e1.printStackTrace();
                    }
                    break;
                }
            }
        }

        @Override
        public void interrupt() {
            super.interrupt();
            try {
                this.serversocket.close();
            } catch (IOException e) {
                //e.printStackTrace();
            }
        }
    }

    /**
     * http ワーカースレッド。/notifications を NSHandler にディスパッチします。
     */
    public class WorkerThread extends Thread {

        private final HttpService httpservice;
        private final HttpServerConnection conn;

        public WorkerThread(
            final HttpService httpservice,
            final HttpServerConnection conn) {
            super();
            this.httpservice = httpservice;
            this.conn = conn;
        }

        @Override
        public void run() {
            System.out.println("新しい接続スレッド");
            HttpContext context = new BasicHttpContext(null);
            try {
                while (!Thread.interrupted() && this.conn.isOpen()) {
                    this.httpservice.handleRequest(this.conn, context);
                }
            } catch (ConnectionClosedException ex) {
                System.err.println("クライアントが接続を閉じました");
            } catch (IOException ex) {
                System.err.println("I/O エラー: " + ex.getMessage());
            } catch (HttpException ex) {
                System.err.println("回復不能な HTTP プロトコル違反: " + ex.getMessage());
            } finally {
                try {
                    this.conn.shutdown();
                } catch (IOException ignore) {
                }
            }
        }

    }

    /**
     * main 関数を使用します。
     *
     * @param args 引数
     */
    public static void main(String[] args) {
        int port = 8080;
        HttpEndpointSubscription httpEndpointSubscription = null;
        try {
            httpEndpointSubscription = new HttpEndpointSubscription(port);
            httpEndpointSubscription.start();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (httpEndpointSubscription != null) {
                httpEndpointSubscription.stop();
            }
        }
    }

}