全部产品
Search
文档中心

CDN:使用自动化脚本刷新和预热

更新时间:Jun 04, 2024

阿里云CDN为您提供刷新预热自动化脚本,可以帮助您分批进行刷新或预热任务,对文件或目录快速进行刷新和预热,替代手动分批提交的繁琐操作。本文介绍Python自动化脚本的使用说明,并以Windows系统示例为您说明。

功能简介

当您指定刷新或预热URL列表文件后,脚本按照指定的并发刷新或预热数量对URL列表文件进行切割,分批进行刷新或预热。任务运行后会自动进行判断,等待上一个任务完成,脚本自动进行下一个刷新或预热任务的操作。具体的操作逻辑如下:

  1. 分批处理:假设你的URL列表中有100个URL,同时你设定了每批次最多处理10个URL,那么脚本会将URL列表切割成10个小批次,每个批次包含10个URL。而如果设定的并发数量更大或更小,批次的大小会相应调整。例如设定的并发数量是20,那么脚本会将100个URL分成5个批次,每个批次包含20个URL。

  2. 按批次运行任务:脚本在启动时会按照批次依次提交刷新或预热请求。每个批次的任务是并发执行的。

  3. 等待完成后进行下一批任务:当一个批次的刷新或预热任务完成后,脚本会继续执行下一个批次的任务。这个判断和操作是自动进行的,不需要人工干预。

适用场景

如果您有以下情况,建议您使用刷新预热自动化脚本:

  • 无开发人员,需手动提交刷新预热任务,运维成本高。

  • 刷新或预热URL过多,分批提交导致刷新或预热效率低。

  • 需要人工或程序判断刷新预热任务是否正常进行,费时费力。

使用限制

请确保操作系统的Python版本为3.x版本。您可以通过在命令行输入python --versionpython3 --version来检查Python版本是否符合要求。

步骤一:安装依赖

  1. 执行以下命令安装Python CDN SDK模块包,目前使用版本为v20180510。

    pip install aliyun-python-sdk-cdn
  2. 执行以下命令安装Python阿里云核心包,目前使用版本为2.6.0。

    pip install aliyun-python-sdk-core

步骤二:准备URL文件

创建一个包含需要刷新或预热的URL列表的文件。例如:urllist.txt,每行一个URL。请确保每个URL以http://https://开头,并且是合法的URL格式。内容示例如下:

http://example.com/file1.jpg
http://example.com/file2.jpg
http://example.com/file3.jpg
...
http://example.com/fileN.jpg

步骤三:创建脚本

将如下代码保存为自动化脚本,并命名为Refresh.py。您可以自定义脚本名称,此处为举例说明。

脚本示例代码

#!/usr/bin/env python3
# coding=utf-8
# __author__ = 'aliyun.cdn'
# __date__ = '2021-04-23'

'''Check Package'''
try:
    # 导入所需库
    import os, re, sys, getopt, time, json, logging
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
    from aliyunsdkcdn.request.v20180510.RefreshObjectCachesRequest import RefreshObjectCachesRequest
    from aliyunsdkcdn.request.v20180510.PushObjectCacheRequest import PushObjectCacheRequest
    from aliyunsdkcdn.request.v20180510.DescribeRefreshTasksRequest import DescribeRefreshTasksRequest
    from aliyunsdkcdn.request.v20180510.DescribeRefreshQuotaRequest import DescribeRefreshQuotaRequest

# 捕获导入异常
except ImportError as e:
    sys.exit("[error] Please pip install aliyun-python-sdk-cdn and aliyun-python-sdk-core. Details: {e}")

# 初始化日志记录
logging.basicConfig(level=logging.DEBUG, filename='./RefreshAndPredload.log')

# 定义全局变量类,存储AK、SK、FD等信息
class Envariable(object):
    LISTS = []
    REGION = 'cn-zhangzhou'
    AK = None
    SK = None
    FD = None
    CLI = None
    TASK_TYPE = None
    TASK_AREA = None
    TASK_OTYPE = None

    # 设置AK
    @staticmethod
    def set_ak(ak):
        Envariable.AK = ak

    # 获取AK
    @staticmethod
    def get_ak():
        return Envariable.AK

    # 设置SK
    @staticmethod
    def set_sk(sk):
        Envariable.SK = sk

    # 获取SK
    @staticmethod
    def get_sk():
        return Envariable.SK

    # 设置FD
    @staticmethod
    def set_fd(fd):
        Envariable.FD = fd

    # 获取FD
    @staticmethod
    def get_fd():
        return Envariable.FD

    # 设置任务类型
    @staticmethod
    def set_task_type(task_type):
        Envariable.TASK_TYPE = task_type

    # 获取任务类型
    @staticmethod
    def get_task_type():
        return Envariable.TASK_TYPE

    # 设置任务区域
    @staticmethod
    def set_task_area(task_area):
        Envariable.TASK_AREA = task_area

    # 获取任务区域
    @staticmethod
    def get_task_area():
        return Envariable.TASK_AREA

    # 设置任务对象类型
    @staticmethod
    def set_task_otype(task_otype):
        Envariable.TASK_OTYPE = task_otype

    # 获取任务对象类型
    @staticmethod
    def get_task_otype():
        return Envariable.TASK_OTYPE

    # 创建AcsClient
    @staticmethod
    def set_acs_client():
        Envariable.CLI = AcsClient(Envariable.get_ak(), Envariable.get_sk(), Envariable.REGION)

    # 获取AcsClient
    @staticmethod
    def get_acs_client():
        return Envariable.CLI


class InitHandler(object):
    def __init__(self, ak, sk, region):
        try:
            self.client = AcsClient(ak, sk, region)
        except Exception:
            logging.info("[error]: initial AcsClient failed")
            exit(1)


class BaseCheck(object):
    def __init__(self):
        self.invalidurl = ''
        self.lines = 0
        self.urllist = Envariable.get_fd()

    # 检查配额
    def printQuota(self):
        try:
            if Envariable.get_acs_client():
                client = Envariable.get_acs_client()
            else:
                Envariable.set_acs_client()
                client = Envariable.get_acs_client()
            quotas = DescribeRefreshQuotaRequest()
            quotaResp = json.loads(Envariable.get_acs_client().do_action_with_exception(quotas))
        except Exception as e:
            logging.info("\n[error]: initial AcsClient failed\n")
            sys.exit(1)

        if Envariable.TASK_TYPE:
            if Envariable.TASK_TYPE == 'push':
                if self.lines > int(quotaResp['PreloadRemain']):
                    sys.exit("\n[error]:PreloadRemain is not enough {0}".format(quotaResp['PreloadRemain']))
                return True
            if Envariable.TASK_TYPE == 'clear':
                if Envariable.get_task_otype() == 'File' and self.lines > int(quotaResp['UrlRemain']):
                    sys.exit("\n[error]:UrlRemain is not enough {0}".format(quotaResp['UrlRemain']))
                elif Envariable.get_task_otype() == 'Directory' and self.lines > int(quotaResp['DirRemain']):
                    sys.exit("\n[error]:DirRemain is not enough {0}".format(quotaResp['DirRemain']))
                else:
                    return True

    # 验证URL格式
    def urlFormat(self):
        with open(self.urllist, "r") as f:
            for line in f.readlines():
                self.lines += 1
                if not re.match(r'^((https)|(http))', line):
                    self.invalidurl = line + '\n' + self.invalidurl
            if self.invalidurl != '':
                sys.exit("\n[error]: URL format is illegal \n{0}".format(self.invalidurl))
            return True

# 批量处理类,将URL列表按指定数量分成多个批次
class doTask(object):
    @staticmethod
    def urlencode_pl(inputs_str):
        len_str = len(inputs_str)
        if inputs_str == "" or len_str <= 0:
            return ""
        result_end = ""
        for chs in inputs_str:
            if chs.isalnum() or chs in {":", "/", ".", "-", "_", "*"}:
                result_end += chs
            elif chs == ' ':
                result_end += '+'
            else:
                result_end += f'%{ord(chs):02X}'
        return result_end

    # 分批处理URL
    @staticmethod
    def doProd():
        gop = 20  # 这里定义了每个批次的最大URL数量
        mins = 1
        maxs = gop
        with open(Envariable.get_fd(), "r") as f:
            for line in f.readlines():
                line = doTask.urlencode_pl(line.strip()) + "\n"
                Envariable.LISTS.append(line)
                if mins >= maxs:
                    yield Envariable.LISTS
                    Envariable.LISTS = []
                    mins = 1
                else:
                    mins += 1
        if Envariable.LISTS:
            yield Envariable.LISTS

    # 执行刷新或预热任务
    @staticmethod
    def doRefresh(lists):
        try:
            if Envariable.get_acs_client():
                client = Envariable.get_acs_client()
            else:
                Envariable.set_acs_client()
                client = Envariable.get_acs_client()

            if Envariable.get_task_type() == 'clear':
                taskID = 'RefreshTaskId'
                request = RefreshObjectCachesRequest()
                if Envariable.get_task_otype():
                    request.set_ObjectType(Envariable.get_task_otype())
            elif Envariable.get_task_type() == 'push':
                taskID = 'PushTaskId'
                request = PushObjectCacheRequest()
                if Envariable.get_task_area():
                    request.set_Area(Envariable.get_task_area())

            taskreq = DescribeRefreshTasksRequest()
            request.set_accept_format('json')
            request.set_ObjectPath(lists)
            response = json.loads(client.do_action_with_exception(request))
            print(response)

            timeout = 0
            while True:
                count = 0
                taskreq.set_accept_format('json')
                taskreq.set_TaskId(response[taskID])
                taskresp = json.loads(client.do_action_with_exception(taskreq))
                print(f"[{response[taskID]}] is doing... ...")
                for t in taskresp['Tasks']['CDNTask']:
                    if t['Status'] != 'Complete':
                        count += 1
                if count == 0:
                    logging.info(f"[{response[taskID]}] is finish")
                    break
                elif timeout > 5:
                    logging.info(f"[{response[taskID]}] timeout")
                    break
                else:
                    timeout += 1
                    time.sleep(5)
                    continue
        except Exception as e:
            logging.info(f"\n[error]:{e}")
            sys.exit(1)


class Refresh(object):
    def main(self, argv):
        if len(argv) < 1:
            sys.exit(f"\n[usage]: {sys.argv[0]} -h ")
        try:
            opts, args = getopt.getopt(argv, "hi:k:n:r:t:a:o:")
        except getopt.GetoptError as e:
            sys.exit(f"\n[usage]: {sys.argv[0]} -h ")

        for opt, arg in opts:
            if opt == '-h':
                self.help()
                sys.exit()
            elif opt == '-i':
                Envariable.set_ak(arg)
            elif opt == '-k':
                Envariable.set_sk(arg)
            elif opt == '-r':
                Envariable.set_fd(arg)
            elif opt == '-t':
                Envariable.set_task_type(arg)
            elif opt == '-a':
                Envariable.set_task_area(arg)
            elif opt == '-o':
                Envariable.set_task_otype(arg)
            else:
                sys.exit(f"\n[usage]: {sys.argv[0]} -h ")

        try:
            if not (Envariable.get_ak() and Envariable.get_sk() and Envariable.get_fd() and Envariable.get_task_type()):
                sys.exit("\n[error]: Must be by parameter '-i', '-k', '-r', '-t'\n")
            if Envariable.get_task_type() not in {"push", "clear"}:
                sys.exit("\n[error]: taskType Error, '-t' option in 'push' or 'clear'\n")
            if Envariable.get_task_area() and Envariable.get_task_otype():
                sys.exit("\n[error]: -a and -o cannot exist at same time\n")
            if Envariable.get_task_area():
                if Envariable.get_task_area() not in {"domestic", "overseas"}:
                    sys.exit("\n[error]: Area value Error, '-a' option in 'domestic' or 'overseas'\n")
            if Envariable.get_task_otype():
                if Envariable.get_task_otype() not in {"File", "Directory"}:
                    sys.exit("\n[error]: ObjectType value Error, '-a' options in 'File' or 'Directory'\n")
                if Envariable.get_task_type() == 'push':
                    sys.exit("\n[error]: -t must be clear and 'push' -a use together\n")
        except Exception as e:
            logging.info(f"\n[error]: Parameter {e} error\n")
            sys.exit(1)

        handler = BaseCheck()
        if handler.urlFormat() and handler.printQuota():
            for g in doTask.doProd():
                doTask.doRefresh(''.join(g))
                time.sleep(1)

    def help(self):
        print("\nscript options explain: \
                    \n\t -i <AccessKey>                  访问阿里云凭证,访问控制台上可以获得; \
                    \n\t -k <AccessKeySecret>            访问阿里云密钥,访问控制台上可以获得; \
                    \n\t -r <filename>                   filename指“文件所在的路径+文件名称”,自动化脚本运行后将会读取文件内记录的URL;文件内的URL记录方式为每行一条URL,有特殊字符先做URLencode,以http或https开头; \
                    \n\t -t <taskType>                   任务类型,clear:刷新,push:预热; \
                    \n\t -a [String,<domestic|overseas>] 可选项,预热范围,不传默认是全球;\
                    \n\t    domestic                     仅中国内地; \
                    \n\t    overseas                     全球(不包含中国内地); \
                    \n\t -o [String,<File|Directory>]    可选项,刷新的类型; \
                    \n\t    File                         文件刷新(默认值); \
                    \n\t    Directory                    目录刷新")


if __name__ == '__main__':
    fun = Refresh()
    fun.main(sys.argv[1:])

代码执行流程

  1. gop指定的数量(100个)将文件拆分成多个批次。

  2. 顺序处理每个批次的URL。

  3. 等待当前批次任务完成后,再执行下一个批次。

说明

您可以通过调整gop变量调整每个批次的大小。

查看帮助信息

脚本创建完成后,您可以在命令行(CMD,PowerShell或终端)中运行python $script -h,用于请求并显示Python脚本的命令行帮助信息。

说明

$script通常是指一个变量,这个变量是Python脚本的文件名。例如,如果您的脚本文件名是Refresh.py,您可以运行python Refresh.py -h

在命令行(CMD,PowerShell或终端)运行以下命令,脚本会显示帮助信息,告诉您如何正确使用该脚本及其所有参数。

python Refresh.py -h

运行命令后可能会输出以下内容:

script options explain:
              -i <AccessKey>                   //访问阿里云凭证,访问控制台获得;
              -k <AccessKeySecret>             //访问阿里云密匙,访问控制台获得;
              -r <filename>                    //filename指“文件所在的路径+文件名称”,自动化脚本运行后将会读取文件内记录的URL;文件内的URL记录方式为每行一条URL,有特殊字符先做URLencode,以http或https开头;
              -t <taskType>                    //任务类型,clear:刷新,push:预热;
              -a [String,<domestic|overseas>   //可选项,预热范围,不传默认是全球;            
                   domestic                    //仅中国内地;             
                   overseas                    //全球(不包含中国内地);             
              -o [String,<File|Directory>]     //可选项,刷新的类型;             
                   File                        //文件刷新(默认值);             
                   Directory                   //目录刷新;

步骤四:运行脚本

在命令行(CMD,PowerShell或终端)使用以下命令行运行脚本:

python Refresh.py -i <YourAccessKey> -k <YourAccessKeySecret> -r <PathToUrlFile> -t <TaskType>
    说明

    <YourAccessKey>:您的阿里云AccessKey ID。

    <YourAccessKeySecret>:您的阿里云AccessKey Secret。

    <PathToUrlFile>:包含URL列表的文件路径,如urllist.txt

    <TaskType>:任务类型,clear(刷新)或push(预热)。

示例命令

  • 假设AccessKey为yourAccessKey,AccessKeySecret为yourAccessKeySecret,URL文件为urllist.txt,且文件和Refresh.py脚本在相同目录下,任务类型为clear(刷新),在命令行(CMD,PowerShell或终端)执行以下命令。

    python Refresh.py -i yourAccessKey -k yourAccessKeySecret -r urllist.txt -t clear
  • 如果文件在不同目录,例如D:\example\filename\urllist.txt,在命令行(CMD,PowerShell或终端)执行以下命令。

    python Refresh.py -i yourAccessKey -k yourAccessKeySecret -r D:\example\filename\urllist.txt -t clear

运行示例如下:

python Refresh.py -i yourAccessKey -k yourAccessKeySecret -r urllist.txt -t clear
{'RequestId': 'C1686DCA-F3B5-5575-ADD1-05F96617D770', 'RefreshTaskId': '18392588710'}
[18392588710] is doing... ...
{'RequestId': '5BEAD371-9D82-5DA5-BE60-58EC2C915E82', 'RefreshTaskId': '18392588804'}
[18392588804] is doing... ...
{'RequestId': 'BD0B3D22-66CF-5B1D-A995-D912A5EA8E2F', 'RefreshTaskId': '18392588804'}
[18392588804] is doing... ...
[18392588804] is doing... ...
[18392588804] is doing... ...