阿里雲為您提供邊緣指令碼CLI工具,可以將您本地編寫的邊緣指令碼規則發布到類比環境進行測試,並發布至生產環境,也可以對類比環境和生產環境的邊緣指令碼規則進行查詢、修改和刪除。本文為您介紹邊緣指令碼CLI工具的使用方法。
在本地建立CLI工具指令檔
在本地建立一個檔案夾,例如命名為
cdn-api。在檔案夾內建立一個命名為
aliyun.ini的文字檔,用於記錄阿里雲RAM使用者的鑒權資訊,檔案內容如下:[Credentials] accesskeyid = $accesskeyid accesskeysecret = $accesskeysecret在檔案夾內建立一個命名為
es.py的文字檔,用於記錄CLI工具的python指令碼代碼,檔案內容如下:#!/usr/bin/python # -*- coding:utf-8 -*- import sys, os import urllib, urllib2 import base64 import hmac import hashlib from hashlib import sha1 import time import uuid import json from optparse import OptionParser import ConfigParser import traceback import urllib2 access_key_id = ''; access_key_secret = ''; cdn_server_address = 'https://cdn.aliyuncs.com' CONFIGFILE = os.getcwd() + '/aliyun.ini' CONFIGSECTION = 'Credentials' cmdlist = ''' 1. Publish the ES rule to the simulated environment or production environment ./es.py action=push_test_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>"}' ./es.py action=push_product_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>","configid":"<configid>"}' 2. Query the ES rule in the simulated environment or production environment ./es.py action=query_test_env domain=<domain> ./es.py action=query_product_env domain=<domain> 3. Delete the ES rule in the simulated environment or production environment ./es.py action=del_test_env domain=<domain> configid=<configid> ./es.py action=del_product_env domain=<domain> configid=<configid> 4. Publish the ES rule from the simulated to production environment, or Rollback the ES rule in the simulated environment ./es.py action=publish_test_env domain=<domain> ./es.py action=rollback_test_env domain=<domain> ''' def percent_encode(str): res = urllib.quote(str.decode(sys.stdin.encoding).encode('utf8'), '') res = res.replace('+', '%20') res = res.replace('*', '%2A') res = res.replace('%7E', '~') return res def compute_signature(parameters, access_key_secret): sortedParameters = sorted(parameters.items(), key=lambda parameters: parameters[0]) canonicalizedQueryString = '' for (k,v) in sortedParameters: canonicalizedQueryString += '&' + percent_encode(k) + '=' + percent_encode(v) stringToSign = 'GET&%2F&' + percent_encode(canonicalizedQueryString[1:]) h = hmac.new(access_key_secret + "&", stringToSign, sha1) signature = base64.encodestring(h.digest()).strip() return signature def compose_url(user_params): timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) parameters = { \ 'Format' : 'JSON', \ 'Version' : '2018-05-10', \ 'AccessKeyId' : access_key_id, \ 'SignatureVersion' : '1.0', \ 'SignatureMethod' : 'HMAC-SHA1', \ 'SignatureNonce' : str(uuid.uuid1()), \ 'Timestamp' : timestamp, \ } for key in user_params.keys(): parameters[key] = user_params[key] signature = compute_signature(parameters, access_key_secret) parameters['Signature'] = signature url = cdn_server_address + "/?" + urllib.urlencode(parameters) return url def make_request(user_params, quiet=False): url = compose_url(user_params) try: req = urllib2.Request(url) r = urllib2.urlopen(req) except urllib2.HTTPError, err: print "Response Code:\n=============" print err body=err.read() body_json = json.loads(body) body_str =json.dumps(body_json,indent=4) print "\nResponse Info:\n==============" print body_str return if r.getcode() == 200: print "Response Code:\n=============\n200 OK" print "\nResponse Info:\n==============" body=r.read() body_json = json.loads(body) body_str =json.dumps(body_json,indent=4) print body_str def configure_accesskeypair(args, options): if options.accesskeyid is None or options.accesskeysecret is None: print("config miss parameters, use --id=[accesskeyid] --secret=[accesskeysecret]") sys.exit(1) config = ConfigParser.RawConfigParser() config.add_section(CONFIGSECTION) config.set(CONFIGSECTION, 'accesskeyid', options.accesskeyid) config.set(CONFIGSECTION, 'accesskeysecret', options.accesskeysecret) cfgfile = open(CONFIGFILE, 'w+') config.write(cfgfile) cfgfile.close() def setup_credentials(): config = ConfigParser.ConfigParser() try: config.read(CONFIGFILE) global access_key_id global access_key_secret access_key_id = config.get(CONFIGSECTION, 'accesskeyid') access_key_secret = config.get(CONFIGSECTION, 'accesskeysecret') except Exception, e: print traceback.format_exc() print("can't get access key pair, use config --id=[accesskeyid] --secret=[accesskeysecret] to setup") sys.exit(1) def parse_args(user_params): req_args = {} if user_params['action'] == 'push_test_env' or user_params['action'] == 'push_product_env': if not user_params.has_key('domain') or not user_params.has_key('rule'): parser.print_help() sys.exit(0) data = [] for rule in user_params['rule']: rule_cfg = { 'functionId' : 180, 'functionName' : 'edge_function', 'functionArgs' : [] } for k in rule: arg_cfg = {} if k == 'configid': rule_cfg['configId'] = int(rule[k]) elif k == 'rule_path': try: f = open(rule[k], "r") code = f.read() except IOError: print "io error" sys.exit(0) else: f.close() arg_cfg['argName'] = 'rule' arg_cfg['argValue'] = code rule_cfg['functionArgs'].append(arg_cfg) else: arg_cfg['argName'] = k arg_cfg['argValue'] = rule[k] rule_cfg['functionArgs'].append(arg_cfg) data.append(rule_cfg) rule_str = json.dumps(data) if user_params['action'] == 'push_test_env': req_args = {'Action':'SetCdnDomainStagingConfig', 'DomainName':user_params['domain'], 'Functions':rule_str} else: req_args = {'Action':'BatchSetCdnDomainConfig', 'DomainNames':user_params['domain'], 'Functions':rule_str} elif user_params['action'] == 'query_test_env': if not user_params.has_key('domain'): parser.print_help() sys.exit(0) req_args = {'Action':'DescribeCdnDomainStagingConfig', 'DomainName':user_params['domain'], 'FunctionNames':'edge_function'} elif user_params['action'] == 'query_product_env': if not user_params.has_key('domain'): parser.print_help() sys.exit(0) req_args = {'Action':'DescribeCdnDomainConfigs', 'DomainName':user_params['domain'], 'FunctionNames':'edge_function'} elif user_params['action'] == 'del_test_env': if not user_params.has_key('domain') or not user_params.has_key('configid'): parser.print_help() sys.exit(0) req_args = {'Action':'DeleteSpecificStagingConfig', 'DomainName':user_params['domain'], 'ConfigId':user_params['configid']} elif user_params['action'] == 'del_product_env': if not user_params.has_key('domain') or not user_params.has_key('configid'): parser.print_help() sys.exit(0) req_args = {'Action':'DeleteSpecificConfig', 'DomainName':user_params['domain'], 'ConfigId':user_params['configid']} elif user_params['action'] == 'publish_test_env': if not user_params.has_key('domain'): parser.print_help() sys.exit(0) req_args = {'Action':'PublishStagingConfigToProduction', 'DomainName':user_params['domain'], 'FunctionName':'edge_function'} elif user_params['action'] == 'rollback_test_env': if not user_params.has_key('domain'): parser.print_help() sys.exit(0) req_args = {'Action':'RollbackStagingConfig', 'DomainName':user_params['domain'], 'FunctionName':'edge_function'} else: parser.print_help() sys.exit(0) return req_args if __name__ == '__main__': parser = OptionParser("%s Action=action Param1=Value1 Param2=Value2 %s\n" % (sys.argv[0], cmdlist)) parser.add_option("-i", "--id", dest="accesskeyid", help="specify access key id") parser.add_option("-s", "--secret", dest="accesskeysecret", help="specify access key secret") (options, args) = parser.parse_args() if len(args) < 1: parser.print_help() sys.exit(0) if args[0] == 'help': parser.print_help() sys.exit(0) if args[0] != 'config': setup_credentials() else: #it's a configure id/secret command configure_accesskeypair(args, options) sys.exit(0) user_params = {} idx = 1 if sys.argv[1].lower().startswith('action='): _, value = sys.argv[1].split('=') user_params['action'] = value idx = 2 else: parser.print_help() sys.exit(0) for arg in sys.argv[idx:]: try: key, value = arg.split('=', 1) if key == 'rule': # push_test_env / push_product_env if not user_params.has_key('rule'): user_params['rule'] = [] user_params['rule'].append(json.loads(value)) else: user_params[key.strip()] = value except ValueError, e: print(e.read().strip()) raise SystemExit(e) req_args = parse_args(user_params) make_request(req_args)
CLI工具的使用說明
配置AccessKey ID和AccessKey Secret。
$python ./es.py config --id=AK_ID --secret=AK_SECRET $cat aliyun.ini [Credentials] accesskeyid = 更新後AK accesskeysecret = 更新後AK Secret發布邊緣指令碼規則至類比環境或生產環境。
./es.py action=push_test_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>"}' ./es.py action=push_product_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>","configid":"<configid>"}'規則欄位說明,請參見下表:
欄位名稱
是否必選
說明
enable
是
是否生效。取值:
on
off
pos
是
執行位置。取值:
head
foot
pri
是
執行優先順序。不同執行位置的優先順序各自獨立。取值範圍:0~999。
0表示優先順序最高
999表示優先順序最低。
rule
是
規則內容。
brk
否
本規則命中情況下,是否終止本階段剩餘規則的執行。取值:
on
off
testip
否
測試客戶IP。預設為空白。如果配置了該參數,則只有用戶端IP地址可觸發本條規則執行。
option
否
擴充項。當前支援擴充。
_es_dbg=signature用於增加調試回應標頭。說明為網域名稱添加功能配置時會產生ConfigId(功能配置ID,唯一性),通過指定ConfigId可更新或刪除網域名稱配置,通過ConfigId使用說明可以瞭解到ConfigId的產生、查詢和使用方法。
新增規則不需要指定configid。
修改規則需要指定configid,使用查詢介面可擷取到對應規則的configid。
您可以指定多條rule。
查詢類比環境或生產環境下的邊緣指令碼規則。
./es.py action=query_test_env domain=<domain> ./es.py action=query_product_env domain=<domain>刪除類比環境或生產環境下的邊緣指令碼規則。
./es.py action=del_test_env domain=<domain> configid=<configid> ./es.py action=del_product_env domain=<domain> configid=<configid>類比環境的邊緣指令碼規則執行正式發布或復原。
./es.py action=publish_test_env domain=<domain> ./es.py action=rollback_test_env domain=<domain>
基於m3u8.es規則攔截所有.m3u8請求做樣本,為您詳細介紹邊緣規則的編寫、儲存、測試和發布的操作方法。具體操作,請參見樣本:通過CLI工具使用邊緣指令碼。
Just-in-Time 偵錯方式
配置Just-in-Time 偵錯。
您可以通過控制台的WebIDE圖形化操作頁面進行
_es_dbg配置,也可以使用如下命令進行配置。./es.py action=push_test_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>","configid":"<configid>", "option":"_es_dbg=123"}'測試請求。
測試時請求參數中攜帶
_es_dbg參數,參數值為配置的option中_es_dbg的值,關注如下回應標頭資訊。Trace資訊:
X-DEBUG-ES-TRACE-RULE-{規則ID},查看對應規則的執行流,格式為_行號_函數名(入參):傳回值{_執行耗時}。