This article describes an important snippet of sample code that is used to enable HTTP endpoints. For more information about sample code, see the simple_http_notify_endpoint.py file in the SDK for Python.
Note The sample code includes an authentication method. However, HTTP endpoints still have
the possibility to receive forged MNS requests. To defend against request forgery
attacks, you must use a high-security method to verify requests that are received
by HTTP endpoints.
class SimpleHttpNotifyEndpoint(BaseHTTPServer.BaseHTTPRequestHandler):
server_version = "SimpleHttpNotifyEndpoint/" + __version__
access_log_file = "access_log"
msg_type = "XML"
def do_POST(self):
content_length = int(self.headers.getheader('content-length', 0))
self.req_body = self.rfile.read(content_length)
self.msg = NotifyMessage()
logger.info("Headers:%s\nBody:%s" % (self.headers, self.req_body))
if not self.authenticate():
res_code = 403
res_content = "Access Forbidden"
logger.warning("Access Forbidden!\nHeaders:%s\nReqBody:%s\n" % (self.headers, self.req_body))
elif not self.validateBody(self.req_body, self.msg, self.msg_type):
res_code = 400
res_content = "Invalid Notify Message"
logger.warning("Invalid Notify Message!\nHeaders:%s\nReqBody:%s\n" % (self.headers, self.req_body))
else:
res_code = 201
res_content = ""
logger.info("Notify Message Succeed!\n%s" % self.msg)
self.access_log(res_code)
self.response(res_code, res_content)
def authenticate(self):
#get string to signature
service_str = "\n".join(sorted(["%s:%s" % (k,v) for k,v in self.headers.items() if k.startswith("x-mns-")]))
sign_header_list = []
for key in ["content-md5", "content-type", "date"]:
if key in self.headers.keys():
sign_header_list.append(self.headers.getheader(key))
else:
sign_header_list.append("")
str2sign = "%s\n%s\n%s\n%s" % (self.command, "\n".join(sign_header_list), service_str, self.path)
#verify
authorization = self.headers.getheader('Authorization')
signature = base64.b64decode(authorization)
cert_str = urllib2.urlopen(base64.b64decode(self.headers.getheader('x-mns-signing-cert-url'))).read()
pubkey = M2Crypto.X509.load_cert_string(cert_str).get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
pubkey.verify_update(str2sign)
return pubkey.verify_final(signature)
def validateBody(self, data, msg, type):
if type == "XML":
return self.xml_decode(data, msg)
else:
msg.message = data
return True
def xml_decode(self, data, msg):
if data == "":
logger.error("Data is \"\".")
return False
try:
dom = xml.dom.minidom.parseString(data)
except Exception, e:
logger.error("Parse string fail, exception:%s" % e)
return False
node_list = dom.getElementsByTagName("Notification")
if not node_list:
logger.error("Get node of \"Notification\" fail:%s" % e)
return False
data_dic = {}
for node in node_list[0].childNodes:
if node.nodeName != "#text" and node.childNodes != []:
data_dic[node.nodeName] = str(node.childNodes[0].nodeValue.strip())
key_list = ["TopicOwner", "TopicName", "Subscriber", "SubscriptionName", "MessageId", "MessageMD5", "Message", "PublishTime"]
for key in key_list:
if key not in data_dic.keys():
logger.error("Check item fail. Need \"%s\"." % key)
return False
msg.topic_owner = data_dic["TopicOwner"]
msg.topic_name = data_dic["TopicName"]
msg.subscriber = data_dic["Subscriber"]
msg.subscription_name = data_dic["SubscriptionName"]
msg.message_id = data_dic["MessageId"]
msg.message_md5 = data_dic["MessageMD5"]
msg.message_tag = data_dic["MessageTag"] if data_dic.has_key("MessageTag") else ""
msg.message = data_dic["Message"]
msg.publish_time = data_dic["PublishTime"]
return True