All Products
Search
Document Center

Simple Message Queue (formerly MNS):Use SMQ SDK for Java to create an HTTP subscription

Last Updated:Oct 22, 2024

Simple Message Queue (SMQ) can push messages of a specific topic to your HTTP API service. This topic describes how to create an HTTP subscription to receive and process messages of a specific topic in SMQ.

Prerequisites

The network between SMQ and the HTTP server is connected, so that HTTP requests can be pushed to the HTTP server.

Select an encoding method for the message body

  • SIMPLIFIED: If the message body does not contain special characters, we recommend that you do not use Base64 encoding.

    • To send a message to a topic, use the RawTopicMessage method to initialize the message object.

    • To consume a message from a queue, use the message.getMessageBodyAsRawString() method to obtain the message body.

  • JSON or XML: If the strings are transmitted in a text format such as JSON or XML, we recommend that you use Base64 encoding.

    • To send a message to a topic, use the TopicMessage method to initialize the message object. In this case, the message body is Base64-encoded and stored in the Message field for transmission.

    • To consume a message from a queue, use the message.getMessageBodyAsRawString(); method to obtain the value of the Message field, and then perform Base64 decoding.

      JSONObject object = new JSONObject(message.getMessageBodyAsRawString());
      String jsonMessageData = String.valueOf(object.get("Message"));
      String messageBody = new String(Base64.decodeBase64(jsonMessageData));

Sample code

For more information about the complete sample code, see HttpEndpointSubscription.java.

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.BindException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.PublicKey;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import javax.net.ssl.SSLServerSocketFactory;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpConnectionFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpServerConnection;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.impl.DefaultBHttpServerConnection;
import org.apache.http.impl.DefaultBHttpServerConnectionFactory;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpProcessorBuilder;
import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.protocol.HttpService;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.http.protocol.UriHttpRequestHandlerMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

/**
 * The HTTP/1.1 file server that processes requests sent to the /notifications directory.
 * The sample HTTP service, which is used to receive subscription messages.
 */
public class HttpEndpointSubscription {
    public static Logger logger = LoggerFactory.getLogger(HttpEndpointSubscription.class);
    public static Thread t;
    private int port;

    /**
     * Use the IP address of your host to generate an endpoint by using the static method.
     *
     * @return http endpoint
     */
    public static String genEndpointLocal() {
        return HttpEndpointSubscription.genEndpointLocal(80);
    }

    /**
     * Use the IP address of your host to generate an endpoint by using the static method.
     *
     * @param port, http server port
     * @return http endpoint
     */
    public static String genEndpointLocal(int port) {
        try {
            InetAddress addr = InetAddress.getLocalHost();
            String ip = addr.getHostAddress().toString();
            return "http://" + ip + ":" + port;
        } catch (UnknownHostException e) {
            e.printStackTrace();
            logger.warn("get local host fail," + e.getMessage());
            return "http://127.0.0.1:" + port;
        }

    }

    /**
     * Specify the port to construct the HttpEndpoint object by using the constructor.
     *
     * @param port The port of the HTTP server.
     */
    public HttpEndpointSubscription(int port) {
        init(port);
    }

    /**
     * Specify the port to construct the HttpEndpoint object by using the constructor. By default, port 80 is used.
     */
    public HttpEndpointSubscription() {
        init(80);
    }

    private void init(int port) {
        this.port = port;
        t = null;
    }

    /**
     * start http server
     *
     * @throws Exception exception
     */
    public void start() throws Exception {
        //check port if used
        try {
            new Socket(InetAddress.getLocalHost(), this.port);
            System.out.println("port is used!");
            logger.error("port already in use, http server start failed");
            throw new BindException("port already in use");
        } catch (IOException e) {
            //e.printStackTrace();

        }

        // Set up the HTTP protocol processor
        HttpProcessor httpproc = HttpProcessorBuilder.create()
            .add(new ResponseDate())
            .add(new ResponseServer("MNS-Endpoint/1.1"))
            .add(new ResponseContent())
            .add(new ResponseConnControl()).build();

        // Set up request handlers, listen /notifications request whit NSHandler class
        UriHttpRequestHandlerMapper reqistry = new UriHttpRequestHandlerMapper();
        reqistry.register("/notifications", new NSHandler());
        reqistry.register("/simplified", new SimplifiedNSHandler());

        // Set up the HTTP service
        HttpService httpService = new HttpService(httpproc, reqistry);

        //start thread for http server
        t = new RequestListenerThread(port, httpService, null);
        t.setDaemon(false);
        t.start();
    }

    /**
     * stop http endpoint
     */
    public void stop() {
        if (t != null) {
            t.interrupt();
            try {
                t.join(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("endpoint stop");
    }

    /**
     * check if this request comes from MNS Server
     *
     * @param method,  http method
     * @param uri,     http uri
     * @param headers, http headers
     * @param cert,    cert url
     * @return true if verify pass
     */
    private Boolean authenticate(String method, String uri, Map<String, String> headers, String cert) {
        String str2sign = getSignStr(method, uri, headers);
        //System.out.println(str2sign);
        String signature = headers.get("Authorization");
        byte[] decodedSign = Base64.decodeBase64(signature);
        //get cert, and verify this request with this cert
        try {
            //String cert = "http://mnstest.oss-cn-hangzhou.aliyuncs.com/x509_public_certificate.pem";
            URL url = new URL(cert);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            DataInputStream in = new DataInputStream(conn.getInputStream());
            CertificateFactory cf = CertificateFactory.getInstance("X.509");

            Certificate c = cf.generateCertificate(in);
            PublicKey pk = c.getPublicKey();

            java.security.Signature signetcheck = java.security.Signature.getInstance("SHA1withRSA");
            signetcheck.initVerify(pk);
            signetcheck.update(str2sign.getBytes());
            Boolean res = signetcheck.verify(decodedSign);
            return res;
        } catch (Exception e) {
            e.printStackTrace();
            logger.warn("authenticate fail, " + e.getMessage());
            return false;
        }
    }

    /**
     * build string for sign
     *
     * @param method,  http method
     * @param uri,     http uri
     * @param headers, http headers
     * @return String fro sign
     */
    private String getSignStr(String method, String uri, Map<String, String> headers) {
        StringBuilder sb = new StringBuilder();
        sb.append(method);
        sb.append("\n");
        sb.append(safeGetHeader(headers, "Content-md5"));
        sb.append("\n");
        sb.append(safeGetHeader(headers, "Content-Type"));
        sb.append("\n");
        sb.append(safeGetHeader(headers, "Date"));
        sb.append("\n");

        List<String> tmp = new ArrayList<String>();
        for (Map.Entry<String, String> entry : headers.entrySet()) {
            if (entry.getKey().startsWith("x-mns-")) {
                tmp.add(entry.getKey() + ":" + entry.getValue());
            }
        }
        Collections.sort(tmp);

        for (String kv : tmp) {
            sb.append(kv);
            sb.append("\n");
        }

        sb.append(uri);
        return sb.toString();
    }

    private String safeGetHeader(Map<String, String> headers, String name) {
        if (headers.containsKey(name)) {
            return headers.get(name);
        } else {
            return "";
        }
    }

    public class SimplifiedNSHandler implements HttpRequestHandler {
        /**
         * process method for NSHandler
         *
         * @param request,  http request
         * @param response, http responst
         * @param context,  http context
         * @throws HttpException exception
         * @throws IOException   exception
         */
        @Override
        public void handle(
            final HttpRequest request,
            final HttpResponse response,
            final HttpContext context) throws HttpException, IOException {

            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);

            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
                throw new MethodNotSupportedException(method + " method not supported");
            }

            Header[] headers = request.getAllHeaders();
            Map<String, String> hm = new HashMap<String, String>();
            for (Header h : headers) {
                System.out.println(h.getName() + ":" + h.getValue());
                hm.put(h.getName(), h.getValue());
            }

            String target = request.getRequestLine().getUri();
            System.out.println(target);

            if (request instanceof HttpEntityEnclosingRequest) {
                HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();

                //verify request
                Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
                if (certHeader == null) {
                    System.out.println("SigningCerURL Header not found");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }

                String cert = certHeader.getValue();
                if (cert.isEmpty()) {
                    System.out.println("SigningCertURL empty");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }
                cert = new String(Base64.decodeBase64(cert));
                System.out.println("SigningCertURL:\t" + cert);
                logger.debug("SigningCertURL:\t" + cert);

                if (!authenticate(method, target, hm, cert)) {
                    System.out.println("authenticate fail");
                    logger.warn("authenticate fail");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }

                //parser content of simplified notification
                InputStream is = entity.getContent();
                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
                StringBuffer buffer = new StringBuffer();
                String line = "";
                while ((line = reader.readLine()) != null) {
                    buffer.append(line);
                }
                String content = buffer.toString();

                System.out.println("Simplified Notification: \n" + content);
            }

            response.setStatusCode(HttpStatus.SC_NO_CONTENT);
        }
    }

    /**
     * core class for processing /notifications request
     */
    public class NSHandler implements HttpRequestHandler {
        public Logger logger = LoggerFactory.getLogger(HttpRequestHandler.class);

        public NSHandler() {
            super();
        }

        private String safeGetElementContent(Element element, String tag) {
            NodeList nl = element.getElementsByTagName(tag);
            if (nl != null && nl.getLength() > 0) {
                return nl.item(0).getTextContent();
            } else {
                logger.warn("get " + tag + " from xml fail");
                return "";
            }
        }

        /**
         * parser /notifications message content
         *
         * @param notify, xml element
         */
        private void parserContent(Element notify) {
            try {
                String topicOwner = safeGetElementContent(notify, "TopicOwner");
                System.out.println("TopicOwner:\t" + topicOwner);
                logger.debug("TopicOwner:\t" + topicOwner);

                String topicName = safeGetElementContent(notify, "TopicName");
                System.out.println("TopicName:\t" + topicName);
                logger.debug("TopicName:\t" + topicName);

                String subscriber = safeGetElementContent(notify, "Subscriber");
                System.out.println("Subscriber:\t" + subscriber);
                logger.debug("Subscriber:\t" + subscriber);

                String subscriptionName = safeGetElementContent(notify, "SubscriptionName");
                System.out.println("SubscriptionName:\t" + subscriptionName);
                logger.debug("SubscriptionName:\t" + subscriptionName);

                String msgid = safeGetElementContent(notify, "MessageId");
                System.out.println("MessageId:\t" + msgid);
                logger.debug("MessageId:\t" + msgid);

                // if PublishMessage with base64 message
                String msg = safeGetElementContent(notify, "Message");
                System.out.println("Message:\t" + new String(Base64.decodeBase64(msg)));
                logger.debug("Message:\t" + new String(Base64.decodeBase64(msg)));

                //if PublishMessage with string message
                //String msg = safeGetElementContent(notify, "Message");
                //System.out.println("Message:\t" + msg);
                //logger.debug("Message:\t" + msg);

                String msgMD5 = safeGetElementContent(notify, "MessageMD5");
                System.out.println("MessageMD5:\t" + msgMD5);
                logger.debug("MessageMD5:\t" + msgMD5);

                String msgPublishTime = safeGetElementContent(notify, "PublishTime");
                Date d = new Date(Long.parseLong(msgPublishTime));
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String strdate = sdf.format(d);
                System.out.println("PublishTime:\t" + strdate);
                logger.debug("MessagePublishTime:\t" + strdate);

                String msgTag = safeGetElementContent(notify, "MessageTag");
                if (!msgTag.equals("")) {
                    System.out.println("MessageTag:\t" + msgTag);
                    logger.debug("MessageTag:\t" + msgTag);
                }

            } catch (Exception e) {
                System.out.println(e.getMessage());
                e.printStackTrace();
                logger.warn(e.getMessage());
            }

        }

        /**
         * process method for NSHandler
         *
         * @param request,  http request
         * @param response, http responst
         * @param context,  http context
         * @throws HttpException exception
         * @throws IOException   exception
         */
        @Override
        public void handle(
            final HttpRequest request,
            final HttpResponse response,
            final HttpContext context) throws HttpException, IOException {

            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);

            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
                throw new MethodNotSupportedException(method + " method not supported");
            }

            Header[] headers = request.getAllHeaders();
            Map<String, String> hm = new HashMap<String, String>();
            for (Header h : headers) {
                System.out.println(h.getName() + ":" + h.getValue());
                hm.put(h.getName(), h.getValue());
            }

            String target = request.getRequestLine().getUri();
            System.out.println(target);

            if (request instanceof HttpEntityEnclosingRequest) {
                HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();

                //parser xml content
                InputStream content = entity.getContent();
                DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
                Element notify = null;
                try {
                    DocumentBuilder db = dbf.newDocumentBuilder();
                    Document document = db.parse(content);
                    NodeList nl = document.getElementsByTagName("Notification");
                    if (nl == null || nl.getLength() == 0) {
                        System.out.println("xml tag error");
                        logger.warn("xml tag error");
                        response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                        return;
                    }
                    notify = (Element) nl.item(0);
                } catch (ParserConfigurationException e) {
                    e.printStackTrace();
                    logger.warn("xml parser fail!  " + e.getMessage());
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                } catch (SAXException e) {
                    e.printStackTrace();
                    logger.warn("xml parser fail!  " + e.getMessage());
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }

                //verify request
                Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
                if (certHeader == null) {
                    System.out.println("SigningCerURL Header not found");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }

                String cert = certHeader.getValue();
                if (cert.isEmpty()) {
                    System.out.println("SigningCertURL empty");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }
                cert = new String(Base64.decodeBase64(cert));
                System.out.println("SigningCertURL:\t" + cert);
                logger.debug("SigningCertURL:\t" + cert);

                if (!authenticate(method, target, hm, cert)) {
                    System.out.println("authenticate fail");
                    logger.warn("authenticate fail");
                    response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
                    return;
                }
                parserContent(notify);

            }

            response.setStatusCode(HttpStatus.SC_NO_CONTENT);
        }

    }

    /**
     * http listen work thread
     */
    public class RequestListenerThread extends Thread {

        private final HttpConnectionFactory<DefaultBHttpServerConnection> connFactory;
        private final ServerSocket serversocket;
        private final HttpService httpService;

        public RequestListenerThread(
            final int port,
            final HttpService httpService,
            final SSLServerSocketFactory sf) throws IOException {
            this.connFactory = DefaultBHttpServerConnectionFactory.INSTANCE;
            this.serversocket = sf != null ?  sf.createServerSocket(port) : new ServerSocket(port);
            this.httpService = httpService;
        }

        @Override
        public void run() {
            System.out.println("Listening on port " + this.serversocket.getLocalPort());
            Thread t = null;
            while (!Thread.interrupted()) {
                try {
                    // Set up HTTP connection
                    Socket socket = this.serversocket.accept();
                    System.out.println("Incoming connection from " + socket.getInetAddress());
                    HttpServerConnection conn = this.connFactory.createConnection(socket);

                    // Start worker thread
                    t = new WorkerThread(this.httpService, conn);
                    t.setDaemon(true);
                    t.start();
                } catch (IOException e) {
                    System.err.println("Endpoint http server stop or IO error: "
                        + e.getMessage());
                    try {
                        if (t != null) {
                            t.join(5 * 1000);
                        }
                    } catch (InterruptedException e1) {
                        //e1.printStackTrace();
                    }
                    break;
                }
            }
        }

        @Override
        public void interrupt() {
            super.interrupt();
            try {
                this.serversocket.close();
            } catch (IOException e) {
                //e.printStackTrace();
            }
        }
    }

    /**
     * http work thread, it will dispatch /notifications to NSHandler
     */
    public class WorkerThread extends Thread {

        private final HttpService httpservice;
        private final HttpServerConnection conn;

        public WorkerThread(
            final HttpService httpservice,
            final HttpServerConnection conn) {
            super();
            this.httpservice = httpservice;
            this.conn = conn;
        }

        @Override
        public void run() {
            System.out.println("New connection thread");
            HttpContext context = new BasicHttpContext(null);
            try {
                while (!Thread.interrupted() && this.conn.isOpen()) {
                    this.httpservice.handleRequest(this.conn, context);
                }
            } catch (ConnectionClosedException ex) {
                System.err.println("Client closed connection");
            } catch (IOException ex) {
                System.err.println("I/O error: " + ex.getMessage());
            } catch (HttpException ex) {
                System.err.println("Unrecoverable HTTP protocol violation: " + ex.getMessage());
            } finally {
                try {
                    this.conn.shutdown();
                } catch (IOException ignore) {
                }
            }
        }

    }

    /**
     * Use the main function.
     *
     * @param args args
     */
    public static void main(String[] args) {
        int port = 8080;
        HttpEndpointSubscription httpEndpointSubscription = null;
        try {
            httpEndpointSubscription = new HttpEndpointSubscription(port);
            httpEndpointSubscription.start();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (httpEndpointSubscription != null) {
                httpEndpointSubscription.stop();
            }
        }
    }

}