Simple Message Queue (SMQ) can push messages of a specific topic to your HTTP API service. This topic describes how to create an HTTP subscription to receive and process messages of a specific topic in SMQ.
Prerequisites
The network between SMQ and the HTTP server is connected, so that HTTP requests can be pushed to the HTTP server.
Select an encoding method for the message body
SIMPLIFIED: If the message body does not contain special characters, we recommend that you do not use Base64 encoding.
To send a message to a topic, use the
RawTopicMessage
method to initialize the message object.To consume a message from a queue, use the
message.getMessageBodyAsRawString()
method to obtain the message body.
JSON or XML: If the strings are transmitted in a text format such as JSON or XML, we recommend that you use Base64 encoding.
To send a message to a topic, use the TopicMessage method to initialize the message object. In this case, the message body is Base64-encoded and stored in the Message field for transmission.
To consume a message from a queue, use the
message.getMessageBodyAsRawString();
method to obtain the value of the Message field, and then perform Base64 decoding.JSONObject object = new JSONObject(message.getMessageBodyAsRawString()); String jsonMessageData = String.valueOf(object.get("Message")); String messageBody = new String(Base64.decodeBase64(jsonMessageData));
Sample code
For more information about the complete sample code, see HttpEndpointSubscription.java.
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.BindException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.PublicKey;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import javax.net.ssl.SSLServerSocketFactory;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpConnectionFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpServerConnection;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.impl.DefaultBHttpServerConnection;
import org.apache.http.impl.DefaultBHttpServerConnectionFactory;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpProcessorBuilder;
import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.protocol.HttpService;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.http.protocol.UriHttpRequestHandlerMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
/**
* The HTTP/1.1 file server that processes requests sent to the /notifications directory.
* The sample HTTP service, which is used to receive subscription messages.
*/
public class HttpEndpointSubscription {
public static Logger logger = LoggerFactory.getLogger(HttpEndpointSubscription.class);
public static Thread t;
private int port;
/**
* Use the IP address of your host to generate an endpoint by using the static method.
*
* @return http endpoint
*/
public static String genEndpointLocal() {
return HttpEndpointSubscription.genEndpointLocal(80);
}
/**
* Use the IP address of your host to generate an endpoint by using the static method.
*
* @param port, http server port
* @return http endpoint
*/
public static String genEndpointLocal(int port) {
try {
InetAddress addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress().toString();
return "http://" + ip + ":" + port;
} catch (UnknownHostException e) {
e.printStackTrace();
logger.warn("get local host fail," + e.getMessage());
return "http://127.0.0.1:" + port;
}
}
/**
* Specify the port to construct the HttpEndpoint object by using the constructor.
*
* @param port The port of the HTTP server.
*/
public HttpEndpointSubscription(int port) {
init(port);
}
/**
* Specify the port to construct the HttpEndpoint object by using the constructor. By default, port 80 is used.
*/
public HttpEndpointSubscription() {
init(80);
}
private void init(int port) {
this.port = port;
t = null;
}
/**
* start http server
*
* @throws Exception exception
*/
public void start() throws Exception {
//check port if used
try {
new Socket(InetAddress.getLocalHost(), this.port);
System.out.println("port is used!");
logger.error("port already in use, http server start failed");
throw new BindException("port already in use");
} catch (IOException e) {
//e.printStackTrace();
}
// Set up the HTTP protocol processor
HttpProcessor httpproc = HttpProcessorBuilder.create()
.add(new ResponseDate())
.add(new ResponseServer("MNS-Endpoint/1.1"))
.add(new ResponseContent())
.add(new ResponseConnControl()).build();
// Set up request handlers, listen /notifications request whit NSHandler class
UriHttpRequestHandlerMapper reqistry = new UriHttpRequestHandlerMapper();
reqistry.register("/notifications", new NSHandler());
reqistry.register("/simplified", new SimplifiedNSHandler());
// Set up the HTTP service
HttpService httpService = new HttpService(httpproc, reqistry);
//start thread for http server
t = new RequestListenerThread(port, httpService, null);
t.setDaemon(false);
t.start();
}
/**
* stop http endpoint
*/
public void stop() {
if (t != null) {
t.interrupt();
try {
t.join(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("endpoint stop");
}
/**
* check if this request comes from MNS Server
*
* @param method, http method
* @param uri, http uri
* @param headers, http headers
* @param cert, cert url
* @return true if verify pass
*/
private Boolean authenticate(String method, String uri, Map<String, String> headers, String cert) {
String str2sign = getSignStr(method, uri, headers);
//System.out.println(str2sign);
String signature = headers.get("Authorization");
byte[] decodedSign = Base64.decodeBase64(signature);
//get cert, and verify this request with this cert
try {
//String cert = "http://mnstest.oss-cn-hangzhou.aliyuncs.com/x509_public_certificate.pem";
URL url = new URL(cert);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
DataInputStream in = new DataInputStream(conn.getInputStream());
CertificateFactory cf = CertificateFactory.getInstance("X.509");
Certificate c = cf.generateCertificate(in);
PublicKey pk = c.getPublicKey();
java.security.Signature signetcheck = java.security.Signature.getInstance("SHA1withRSA");
signetcheck.initVerify(pk);
signetcheck.update(str2sign.getBytes());
Boolean res = signetcheck.verify(decodedSign);
return res;
} catch (Exception e) {
e.printStackTrace();
logger.warn("authenticate fail, " + e.getMessage());
return false;
}
}
/**
* build string for sign
*
* @param method, http method
* @param uri, http uri
* @param headers, http headers
* @return String fro sign
*/
private String getSignStr(String method, String uri, Map<String, String> headers) {
StringBuilder sb = new StringBuilder();
sb.append(method);
sb.append("\n");
sb.append(safeGetHeader(headers, "Content-md5"));
sb.append("\n");
sb.append(safeGetHeader(headers, "Content-Type"));
sb.append("\n");
sb.append(safeGetHeader(headers, "Date"));
sb.append("\n");
List<String> tmp = new ArrayList<String>();
for (Map.Entry<String, String> entry : headers.entrySet()) {
if (entry.getKey().startsWith("x-mns-")) {
tmp.add(entry.getKey() + ":" + entry.getValue());
}
}
Collections.sort(tmp);
for (String kv : tmp) {
sb.append(kv);
sb.append("\n");
}
sb.append(uri);
return sb.toString();
}
private String safeGetHeader(Map<String, String> headers, String name) {
if (headers.containsKey(name)) {
return headers.get(name);
} else {
return "";
}
}
public class SimplifiedNSHandler implements HttpRequestHandler {
/**
* process method for NSHandler
*
* @param request, http request
* @param response, http responst
* @param context, http context
* @throws HttpException exception
* @throws IOException exception
*/
@Override
public void handle(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}
Header[] headers = request.getAllHeaders();
Map<String, String> hm = new HashMap<String, String>();
for (Header h : headers) {
System.out.println(h.getName() + ":" + h.getValue());
hm.put(h.getName(), h.getValue());
}
String target = request.getRequestLine().getUri();
System.out.println(target);
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
//verify request
Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
if (certHeader == null) {
System.out.println("SigningCerURL Header not found");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
String cert = certHeader.getValue();
if (cert.isEmpty()) {
System.out.println("SigningCertURL empty");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
cert = new String(Base64.decodeBase64(cert));
System.out.println("SigningCertURL:\t" + cert);
logger.debug("SigningCertURL:\t" + cert);
if (!authenticate(method, target, hm, cert)) {
System.out.println("authenticate fail");
logger.warn("authenticate fail");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
//parser content of simplified notification
InputStream is = entity.getContent();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
StringBuffer buffer = new StringBuffer();
String line = "";
while ((line = reader.readLine()) != null) {
buffer.append(line);
}
String content = buffer.toString();
System.out.println("Simplified Notification: \n" + content);
}
response.setStatusCode(HttpStatus.SC_NO_CONTENT);
}
}
/**
* core class for processing /notifications request
*/
public class NSHandler implements HttpRequestHandler {
public Logger logger = LoggerFactory.getLogger(HttpRequestHandler.class);
public NSHandler() {
super();
}
private String safeGetElementContent(Element element, String tag) {
NodeList nl = element.getElementsByTagName(tag);
if (nl != null && nl.getLength() > 0) {
return nl.item(0).getTextContent();
} else {
logger.warn("get " + tag + " from xml fail");
return "";
}
}
/**
* parser /notifications message content
*
* @param notify, xml element
*/
private void parserContent(Element notify) {
try {
String topicOwner = safeGetElementContent(notify, "TopicOwner");
System.out.println("TopicOwner:\t" + topicOwner);
logger.debug("TopicOwner:\t" + topicOwner);
String topicName = safeGetElementContent(notify, "TopicName");
System.out.println("TopicName:\t" + topicName);
logger.debug("TopicName:\t" + topicName);
String subscriber = safeGetElementContent(notify, "Subscriber");
System.out.println("Subscriber:\t" + subscriber);
logger.debug("Subscriber:\t" + subscriber);
String subscriptionName = safeGetElementContent(notify, "SubscriptionName");
System.out.println("SubscriptionName:\t" + subscriptionName);
logger.debug("SubscriptionName:\t" + subscriptionName);
String msgid = safeGetElementContent(notify, "MessageId");
System.out.println("MessageId:\t" + msgid);
logger.debug("MessageId:\t" + msgid);
// if PublishMessage with base64 message
String msg = safeGetElementContent(notify, "Message");
System.out.println("Message:\t" + new String(Base64.decodeBase64(msg)));
logger.debug("Message:\t" + new String(Base64.decodeBase64(msg)));
//if PublishMessage with string message
//String msg = safeGetElementContent(notify, "Message");
//System.out.println("Message:\t" + msg);
//logger.debug("Message:\t" + msg);
String msgMD5 = safeGetElementContent(notify, "MessageMD5");
System.out.println("MessageMD5:\t" + msgMD5);
logger.debug("MessageMD5:\t" + msgMD5);
String msgPublishTime = safeGetElementContent(notify, "PublishTime");
Date d = new Date(Long.parseLong(msgPublishTime));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String strdate = sdf.format(d);
System.out.println("PublishTime:\t" + strdate);
logger.debug("MessagePublishTime:\t" + strdate);
String msgTag = safeGetElementContent(notify, "MessageTag");
if (!msgTag.equals("")) {
System.out.println("MessageTag:\t" + msgTag);
logger.debug("MessageTag:\t" + msgTag);
}
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
logger.warn(e.getMessage());
}
}
/**
* process method for NSHandler
*
* @param request, http request
* @param response, http responst
* @param context, http context
* @throws HttpException exception
* @throws IOException exception
*/
@Override
public void handle(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}
Header[] headers = request.getAllHeaders();
Map<String, String> hm = new HashMap<String, String>();
for (Header h : headers) {
System.out.println(h.getName() + ":" + h.getValue());
hm.put(h.getName(), h.getValue());
}
String target = request.getRequestLine().getUri();
System.out.println(target);
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
//parser xml content
InputStream content = entity.getContent();
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
Element notify = null;
try {
DocumentBuilder db = dbf.newDocumentBuilder();
Document document = db.parse(content);
NodeList nl = document.getElementsByTagName("Notification");
if (nl == null || nl.getLength() == 0) {
System.out.println("xml tag error");
logger.warn("xml tag error");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
notify = (Element) nl.item(0);
} catch (ParserConfigurationException e) {
e.printStackTrace();
logger.warn("xml parser fail! " + e.getMessage());
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
} catch (SAXException e) {
e.printStackTrace();
logger.warn("xml parser fail! " + e.getMessage());
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
//verify request
Header certHeader = request.getFirstHeader("x-mns-signing-cert-url");
if (certHeader == null) {
System.out.println("SigningCerURL Header not found");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
String cert = certHeader.getValue();
if (cert.isEmpty()) {
System.out.println("SigningCertURL empty");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
cert = new String(Base64.decodeBase64(cert));
System.out.println("SigningCertURL:\t" + cert);
logger.debug("SigningCertURL:\t" + cert);
if (!authenticate(method, target, hm, cert)) {
System.out.println("authenticate fail");
logger.warn("authenticate fail");
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
return;
}
parserContent(notify);
}
response.setStatusCode(HttpStatus.SC_NO_CONTENT);
}
}
/**
* http listen work thread
*/
public class RequestListenerThread extends Thread {
private final HttpConnectionFactory<DefaultBHttpServerConnection> connFactory;
private final ServerSocket serversocket;
private final HttpService httpService;
public RequestListenerThread(
final int port,
final HttpService httpService,
final SSLServerSocketFactory sf) throws IOException {
this.connFactory = DefaultBHttpServerConnectionFactory.INSTANCE;
this.serversocket = sf != null ? sf.createServerSocket(port) : new ServerSocket(port);
this.httpService = httpService;
}
@Override
public void run() {
System.out.println("Listening on port " + this.serversocket.getLocalPort());
Thread t = null;
while (!Thread.interrupted()) {
try {
// Set up HTTP connection
Socket socket = this.serversocket.accept();
System.out.println("Incoming connection from " + socket.getInetAddress());
HttpServerConnection conn = this.connFactory.createConnection(socket);
// Start worker thread
t = new WorkerThread(this.httpService, conn);
t.setDaemon(true);
t.start();
} catch (IOException e) {
System.err.println("Endpoint http server stop or IO error: "
+ e.getMessage());
try {
if (t != null) {
t.join(5 * 1000);
}
} catch (InterruptedException e1) {
//e1.printStackTrace();
}
break;
}
}
}
@Override
public void interrupt() {
super.interrupt();
try {
this.serversocket.close();
} catch (IOException e) {
//e.printStackTrace();
}
}
}
/**
* http work thread, it will dispatch /notifications to NSHandler
*/
public class WorkerThread extends Thread {
private final HttpService httpservice;
private final HttpServerConnection conn;
public WorkerThread(
final HttpService httpservice,
final HttpServerConnection conn) {
super();
this.httpservice = httpservice;
this.conn = conn;
}
@Override
public void run() {
System.out.println("New connection thread");
HttpContext context = new BasicHttpContext(null);
try {
while (!Thread.interrupted() && this.conn.isOpen()) {
this.httpservice.handleRequest(this.conn, context);
}
} catch (ConnectionClosedException ex) {
System.err.println("Client closed connection");
} catch (IOException ex) {
System.err.println("I/O error: " + ex.getMessage());
} catch (HttpException ex) {
System.err.println("Unrecoverable HTTP protocol violation: " + ex.getMessage());
} finally {
try {
this.conn.shutdown();
} catch (IOException ignore) {
}
}
}
}
/**
* Use the main function.
*
* @param args args
*/
public static void main(String[] args) {
int port = 8080;
HttpEndpointSubscription httpEndpointSubscription = null;
try {
httpEndpointSubscription = new HttpEndpointSubscription(port);
httpEndpointSubscription.start();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (httpEndpointSubscription != null) {
httpEndpointSubscription.stop();
}
}
}
}