本文仅展示HTTP Endpoint部分核心代码,完整的代码请参见Python SDK中simple_http_notify_endpoint.py

说明 示例中HTTP Endpoint的针对Authorization的校验不能完全规避掉恶意伪造的MNS请求,需要从业务层面上设计从发送端到HTTP Endpoint的安全校验机制。
class SimpleHttpNotifyEndpoint(BaseHTTPServer.BaseHTTPRequestHandler):
    server_version = "SimpleHttpNotifyEndpoint/" + __version__
    access_log_file = "access_log"
    msg_type = "XML"

    def do_POST(self):
        content_length = int(self.headers.getheader('content-length', 0))
        self.req_body = self.rfile.read(content_length)
        self.msg = NotifyMessage()
        logger.info("Headers:%s\nBody:%s" % (self.headers, self.req_body))
        if not self.authenticate():
            res_code = 403
            res_content = "Access Forbidden"
            logger.warning("Access Forbidden!\nHeaders:%s\nReqBody:%s\n" % (self.headers, self.req_body))
        elif not self.validateBody(self.req_body, self.msg, self.msg_type):
            res_code = 400
            res_content = "Invalid Notify Message"
            logger.warning("Invalid Notify Message!\nHeaders:%s\nReqBody:%s\n" % (self.headers, self.req_body))
        else:
            res_code = 201
            res_content = ""
            logger.info("Notify Message Succeed!\n%s" % self.msg)
        self.access_log(res_code)
        self.response(res_code, res_content)

    def authenticate(self):
        #get string to signature
        service_str = "\n".join(sorted(["%s:%s" % (k,v) for k,v in self.headers.items() if k.startswith("x-mns-")]))
        sign_header_list = []
        for key in ["content-md5", "content-type", "date"]:
            if key in self.headers.keys():
                sign_header_list.append(self.headers.getheader(key))
            else:
                sign_header_list.append("")
        str2sign = "%s\n%s\n%s\n%s" % (self.command, "\n".join(sign_header_list), service_str, self.path)

        #verify
        authorization = self.headers.getheader('Authorization')
        signature = base64.b64decode(authorization)
        cert_str = urllib2.urlopen(base64.b64decode(self.headers.getheader('x-mns-signing-cert-url'))).read()
        pubkey = M2Crypto.X509.load_cert_string(cert_str).get_pubkey()
        pubkey.reset_context(md='sha1')
        pubkey.verify_init()
        pubkey.verify_update(str2sign)
        return pubkey.verify_final(signature)

    def validateBody(self, data, msg, type):
        if type == "XML":
            return self.xml_decode(data, msg)
        else:
            msg.message = data
            return True

    def xml_decode(self, data, msg):
        if data == "":
            logger.error("Data is \"\".")
            return False
        try:
            dom = xml.dom.minidom.parseString(data)
        except Exception, e:
            logger.error("Parse string fail, exception:%s" % e)
            return False

        node_list = dom.getElementsByTagName("Notification")
        if not node_list:
            logger.error("Get node of \"Notification\" fail:%s" % e)
            return False

        data_dic = {}
        for node in node_list[0].childNodes:
            if node.nodeName != "#text" and node.childNodes != []:
                data_dic[node.nodeName] = str(node.childNodes[0].nodeValue.strip())

        key_list = ["TopicOwner", "TopicName", "Subscriber", "SubscriptionName", "MessageId", "MessageMD5", "Message", "PublishTime"]
        for key in key_list:
            if key not in data_dic.keys():
                logger.error("Check item fail. Need \"%s\"." % key)
                return False

        msg.topic_owner = data_dic["TopicOwner"]
        msg.topic_name = data_dic["TopicName"]
        msg.subscriber = data_dic["Subscriber"]
        msg.subscription_name = data_dic["SubscriptionName"]
        msg.message_id = data_dic["MessageId"]
        msg.message_md5 = data_dic["MessageMD5"]
        msg.message_tag = data_dic["MessageTag"] if data_dic.has_key("MessageTag") else ""
        msg.message = data_dic["Message"]
        msg.publish_time = data_dic["PublishTime"]
        return True