全部產品
Search
文件中心

CDN:【FC控制台】通過Function Compute轉存離線日誌到Object Storage Service

更新時間:Aug 14, 2025

通過阿里雲Function Compute服務,將阿里雲CDN產生的離線日誌自動、定期地轉存到Object Storage Service中,以實現日誌的長期歸檔和分析。

背景資訊

阿里雲CDN為您的加速網域名稱提供詳細的訪問日誌,這些離線日誌是進行使用者行為分析、服務問題排查和營運資料統計的重要依據。根據阿里雲CDN的服務策略,離線記錄檔在阿里雲CDN伺服器上僅保留30天,超過該期限後將被自動清理。

為了滿足資料合規、長期審計或歷史資料回溯等需求,您可能需要對這些日誌進行永久性儲存。Object Storage Service提供了高可用、低成本且可持久化的儲存方案,是日誌資料長期歸檔的理想選擇。Function Compute用於監聽產生阿里雲CDN日誌的事件,並且調用任務函數將阿里雲CDN的離線日誌轉存到Object Storage Service中。通過本方案,您可以搭建一個自動化的工作流程,將阿里雲CDN日誌無縫轉存至您Object Storage Service Bucket中。

執行邏輯

整個自動化轉存方案的核心是利用Function Compute作為“調度器”和“搬運工”,串連阿里雲CDN和Object Storage Service。其工作流程如下:

  1. 事件觸發:在Function Compute中設定一個觸發器,每當阿里雲CDN產生日誌的時候,都會觸發該觸發器。

  2. 函數執行:事件觸發程序被觸發後,會自動執行關聯的函數代碼。

  3. 拉取日誌:函數代碼會根據當前日期計算出需要拉取的前一天的記錄檔名,並產生阿里雲CDN離線日誌的下載地址。隨後,函數向該地址發起請求,下載記錄檔到Function Compute的臨時環境中。

  4. 轉存Object Storage Service:函數成功下載記錄檔後,會調用Object Storage Service的API,將該檔案上傳到您指定的Object Storage Service Bucket中的指定目錄下。

整個過程全自動執行,深度整合阿里雲CDN、Function Compute、Object Storage Service三項阿里雲服務,提高雲上管理服務的效率。

計費說明

本方案涉及以下產品的計費,請您關註:

  • 阿里雲CDN:阿里雲CDN產生和提供離線日誌下載的功能免費

  • Function Compute:Function Compute會根據函數執行次數、消耗的資源(vCPU和記憶體)以及執行時間長度等進行計費。對於每日僅執行少量幾次的輕量級日誌轉存任務,費用通常極低。詳情請參見Function Compute計費概述

  • Object Storage Service:Object Storage Service會根據您佔用的儲存空間、API請求次數以及可能產生的外網流出流量進行計費。詳情請參見Object Storage Service計費概述

前提條件

  • 請確保您的阿里雲CDN服務、Function Compute服務以及Object Storage Service服務均在同一個阿里雲帳號下開通,保證服務間授權和順利訪問。

  • 參考建立儲存空間,提前在Object Storage Service中建立一個用於存放記錄檔的Bucket,並記錄好Bucket儲存空間名稱外網訪問的Endpoint值存放記錄檔的目錄名稱

配置步驟

1. 擷取Bucket配置

在Function Compute中建立任務函數的時候,需要填寫儲存日誌的Object Storage Service資訊,因此,需要提前準備好Bucket儲存空間名稱外網訪問的Endpoint值存放記錄檔的目錄名稱資訊。請參考以下步驟擷取相應的資訊:

擷取Bucket配置

  1. 點擊進入Object Storage Service控制台的Bucket列表頁簽,選擇儲存日誌的Bucket。

  2. 點擊Bucket名稱進入Bucket詳情頁。

  3. 在Bucket中選擇概覽頁簽。從基本資料模組擷取儲存空間名稱,從訪問寬口模組擷取外網訪問Endpoint(地區節點)值。

    image

  4. 點擊檔案管理的檔案清單,在檔案清單中點擊建立目錄並輸入目錄名稱(推薦目錄名稱為cdn_log)。

    image

2. 建立Function Compute任務

本自動化轉存方案的核心是利用Function Compute作為“調度器”和“搬運工”,因此需要在Function Compute中配置對應觸發器和任務函數。

  1. 進入Function Compute3.0控制台,在左側導覽列選擇函數

  2. 函數頁簽中,點擊建立函數,選擇事件函數,點擊建立事件函數

    image

  3. 建立事件函數時 只需配置 影響函數正常執行的關鍵參數。

    • 基礎配置-函數名稱:後續操作需要使用該函數名(推薦使用cdn-log-dump)。

    • 函數代碼-運行環境:任務函數是Python代碼,所以此處選擇內建運行時PythonPython 3.10

    • 更多配置-環境變數:任務函數中需要擷取儲存的Bucket資訊,因此需要在環境變數中傳入Bucket的配置資訊。建立三個環境變數並填入對應參數:

      • target_oss_bucketBucket儲存空間名稱

      • target_oss_endpoint外網訪問的Endpoint值

      • target_oss_prefix存放記錄檔的目錄名稱

      image

  4. 參數配置完成之後,點擊建立即可完成函數的建立。

  5. 函數詳情中,點擊進入觸發器頁簽,點擊建立觸發器

    image

  6. 按照以下指引,完成對觸發器關鍵參數的配置。點擊確定

    • 觸發器類型:選擇CDN 同步調用

    • 名稱:填寫觸發器名稱(推薦使用cdn-logs-triggers)。

    • 觸發事件:選擇LogFileCreated

    • 網域名稱:此處必須填寫同一個阿里雲帳號下並且正常啟動並執行阿里雲CDN加速網域名稱。

    • 描述:填寫該觸發器的描述資訊(推薦使用:CDN離線記錄檔產生觸發器)。

    • 角色名稱:選擇AliyunCDNEventNotificationRole

  7. 在完成觸發器參數的配置之後,點擊確定。如果此時出現尚未建立 CDN 觸發器使用的預設角色的提示,點擊立即授權,根據指引完成預設角色的建立;如果沒有出現該提示,則直接完成觸發器的建立。

    image

  8. 函數詳情中,點擊進入代碼頁簽,在線上編譯器中,輸入下方的代碼(從阿里雲CDN拉取離線日誌,並且存入Object Storage Service中)。

    轉存任務代碼

    # coding=utf-8
    
    import os, time, json, requests, traceback, oss2, fc2
    from requests.exceptions import *
    from fc2.fc_exceptions import *
    from oss2.models import PartInfo
    from oss2.exceptions import *
    from multiprocessing import Pool
    from contextlib import closing
    
    MAX_PROCCESSES = 20 # The number of worker processes in each subtask
    BLOCK_SIZE = 6 * 1024 * 1024 # The size of each part
    BLOCK_NUM_INTERNAL = 18 # The default number of blocks in each subtask in case of internal url
    BLOCK_NUM = 10 # The default number of blocks in each subtask
    MAX_SUBTASKS = 49 # The number of worker processes to do subtasks
    CHUNK_SIZE = 8 * 1024 # The size of each chunk
    SLEEP_TIME = 0.1 # The initial seconds to wait for retrying
    MAX_RETRY_TIME = 10 # The maximum retry times
    
    def retry(func):
        """
        Return the result of the lambda function func with retry.
        :param func: (required, lambda) the function.
        :return: The result of func.
        """
        wait_time = SLEEP_TIME
        retry_cnt = 1
        while True:
            if retry_cnt > MAX_RETRY_TIME:
                return func()
            try:
                return func()
            except (ConnectionError, SSLError, ConnectTimeout, Timeout) as e:
                print(traceback.format_exc())
            except (OssError) as e:
                if 500 <= e.status < 600:
                    print(traceback.format_exc())
                else:
                    raise Exception(e)
            except (FcError) as e:
                if (500 <= e.status_code < 600) or (e.status_code == 429):
                    print(traceback.format_exc())
                else:
                    raise Exception(e)
            print('Retry %d times...' % retry_cnt)
            time.sleep(wait_time)
            wait_time *= 2
            retry_cnt += 1
    
    def get_info(url):
        """
        Get the CRC64 and total length of the file.
        :param url: (required, string) the url address of the file.
        :return: CRC64, length
        """
        with retry(lambda : requests.get(url, {}, stream = True)) as r:
            return r.headers['x-oss-hash-crc64ecma'], int(r.headers['content-length'])
    
    class Response(object):
        """
        The response class to support reading by chunks.
        """
        def __init__(self, response):
            self.response = response
            self.status = response.status_code
            self.headers = response.headers
    
        def read(self, amt = None):
            if amt is None:
                content = b''
                for chunk in self.response.iter_content(CHUNK_SIZE):
                    content += chunk
                return content
            else:
                try:
                    return next(self.response.iter_content(amt))
                except StopIteration:
                    return b''
    
        def __iter__(self):
            return self.response.iter_content(CHUNK_SIZE)
    
    def migrate_part(args):
        """
        Download a part from url and then upload it to OSS.
        :param args: (bucket, object_name, upload_id, part_number, url, st, en)
        :bucket: (required, Bucket) the goal OSS bucket.
        :object_name: (required, string) the goal object_name.
        :upload_id: (required, integer) the upload_id of this upload task.
        :part_number: (integer) the part_number of this part.
        :url: (required, string) the url address of the file.
        :st, en: (required, integer) the byte range of this part, denoting [st, en].
        :return: (part_number, etag)
        :part_number: (integer) the part_number of this part.
        :etag: (string) the etag of the upload_part result.
        """
        bucket = args[0]
        object_name = args[1]
        upload_id = args[2]
        part_number = args[3]
        url = args[4]
        st = args[5]
        en = args[6]
        try:
            headers = {'Range' : 'bytes=%d-%d' % (st, en)}
            resp = Response(retry(lambda : requests.get(url, headers = headers, stream = True)))
            result = retry(lambda : bucket.upload_part(object_name, upload_id, part_number, resp))
            return (part_number, result.etag)
        except Exception as e:
            print(traceback.format_exc())
            raise Exception(e)
    
    def do_subtask(event, context):
        """
        Download a range of the file from url and then upload it to OSS.
        :param event: (required, json) the json format of event.
        :param context: (required, FCContext) the context of handler.
        :return: parts
        :parts: ([(integer, string)]) the part_number and etag of each process.
        """
        oss_endpoint = os.environ.get('target_oss_endpoint')
        oss_bucket_name = os.environ.get('target_oss_bucket')
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
        object_name = event['object_name']
        upload_id = event['upload_id']
        part_number = event['part_number']
        url = event['url']
        st = event['st']
        en = event['en']
        if part_number == 1:
            return [migrate_part((bucket, object_name, upload_id, part_number, url, st, en))]
        pool = Pool(MAX_PROCCESSES)
        tasks = []
        while st <= en:
            nxt = min(en, st + BLOCK_SIZE - 1)
            tasks.append((bucket, object_name, upload_id, part_number, url, st, nxt))
            part_number += 1
            st = nxt + 1
        parts = pool.map(migrate_part, tasks)
        pool.close()
        pool.join()
        return parts
    
    def invoke_subtask(args):
        """
        Invoke the same function synchronously to start a subtask.
        :param args: (object_name, upload_id, part_number, url, st, en, context)
        :object_name: (required, string) the goal object_name.
        :upload_id: (required, integer) the upload_id of this upload task.
        :part_number: (integer) the part_number of the first part in this subtask.
        :url: (required, string) the url address of the file.
        :st, en: (required, integer) the byte range of this subtask, denoting [st, en].
        :context: (required, FCContext) the context of handler.
        :return: the return of the invoked function.
        """
        object_name = args[0]
        upload_id = args[1]
        part_number = args[2]
        url = args[3]
        st = args[4]
        en = args[5]
        context = args[6]
        account_id = context.account_id
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        region = context.region
        service_name = context.service.name
        function_name = context.function.name
        endpoint = 'http://%s.%s-internal.fc.aliyuncs.com' % (account_id, region)
        client = fc2.Client(
            endpoint = endpoint,
            accessKeyID = access_key_id,
            accessKeySecret = access_key_secret,
            securityToken = security_token
        )
        payload = {
            'object_name' : object_name,
            'upload_id' : upload_id,
            'part_number' : part_number,
            'url' : url,
            'st' : st,
            'en' : en,
            'is_children' : True
        }
        if part_number == 1:
            return json.dumps(do_subtask(payload, context))
        ret = retry(lambda : client.invoke_function(service_name, function_name, payload = json.dumps(payload)))
        return ret.data
    
    def divide(n, m):
        """
        Calculate ceil(n / m) without floating point arithmetic.
        :param n, m: (integer)
        :return: (integer) ceil(n / m).
        """
        ret = n // m
        if n % m > 0:
            ret += 1
        return ret
    
    def migrate_file(url, oss_object_name, context):
        """
        Download the file from url and then upload it to OSS.
        :param url: (required, string) the url address of the file.
        :param oss_object_name: (required, string) the goal object_name.
        :param context: (required, FCContext) the context of handler.
        :return: actual_crc64, expect_crc64
        :actual_crc64: (string) the CRC64 of upload.
        :expect_crc64: (string) the CRC64 of source file.
        """
        crc64, total_size = get_info(url)
        oss_endpoint = os.environ.get('target_oss_endpoint')
        oss_bucket_name = os.environ.get('target_oss_bucket')
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
        upload_id = retry(lambda : bucket.init_multipart_upload(oss_object_name)).upload_id
        pool = Pool(MAX_SUBTASKS)
        st = 0
        part_number = 1
        tasks = []
        block_num = BLOCK_NUM_INTERNAL if '-internal.aliyuncs.com' in oss_endpoint else BLOCK_NUM
        block_num = min(block_num, divide(divide(total_size, BLOCK_SIZE), MAX_SUBTASKS + 1))
        while st < total_size:
            en = min(total_size - 1, st + block_num * BLOCK_SIZE - 1)
            tasks.append((oss_object_name, upload_id, part_number, url, st, en, context))
            size = en - st + 1
            cnt = divide(size, BLOCK_SIZE)
            part_number += cnt
            st = en + 1
        subtasks = pool.map(invoke_subtask, tasks)
        pool.close()
        pool.join()
        parts = []
        for it in subtasks:
            for part in json.loads(it):
                parts.append(PartInfo(part[0], part[1]))
        res = retry(lambda : bucket.complete_multipart_upload(oss_object_name, upload_id, parts))
        return str(res.crc), str(crc64)
    
    def get_oss_object_name(url):
        """
        Get the OSS object name.
        :param url: (required, string) the url address of the file.
        :return: (string) the OSS object name.
        """
        prefix = os.environ.get('target_oss_prefix')
        tmps = url.split('?')
        if len(tmps) != 2:
            raise Exception('Invalid url : %s' % url)
        urlObject = tmps[0]
        if urlObject.count('/') < 3:
            raise Exception('Invalid url : %s' % url)
        objectParts = urlObject.split('/')
        objectParts = [prefix] + objectParts[len(objectParts) - 3 : len(objectParts)]
        return '/'.join(objectParts)
    
    def handler(event, context):
        evt = json.loads(event)
        if list(evt.keys()).count('is_children'):
            return json.dumps(do_subtask(evt, context))
        url = evt['events'][0]['eventParameter']['filePath']
        if not (url.startswith('http://') or url.startswith('https://')):
            url = 'https://' + url
        oss_object_name = get_oss_object_name(url)
        st_time = int(time.time())
        wait_time = SLEEP_TIME
        retry_cnt = 1
        while True:
            actual_crc64, expect_crc64 = migrate_file(url, oss_object_name, context)
            if actual_crc64 == expect_crc64:
                break
            print('Migration object CRC64 not matched, expected: %s, actual: %s' % (expect_crc64, actual_crc64))
            if retry_cnt > MAX_RETRY_TIME:
                raise Exception('Maximum retry time exceeded.')
            print('Retry %d times...' % retry_cnt)
            time.sleep(wait_time)
            wait_time *= 2
            retry_cnt += 1
        print('Success! Total time: %d s.' % (int(time.time()) - st_time))
  9. 點擊部署代碼,即可完成整個函數的配置。

3. 建立專屬角色和權限原則

Function Compute使用Object Storage Service的時候,需要具有訪問Object Storage Service的許可權。為了簡化授權流程,Function Compute支援關聯角色。按照以下步驟,給這個離線日誌轉存函數配置一個可以使用Object Storage Service的角色。

  1. 開啟RAM 存取控制控制台,選擇許可權管理權限原則頁簽。

  2. 點擊建立權限原則,選擇指令碼編輯

  3. 將下邊策略中的BucketName改為自己的Bucket儲存空間名稱,將三處FC-NAME都替換成配置步驟2中的函數名稱(推薦使用cdn-log-dump)。

    {
      "Version": "1",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "oss:PutObject",
          "Resource": "acs:oss:*:*:BucketName/*"
        },
        {
          "Effect": "Allow",
          "Action": "fc:InvokeFunction",
          "Resource": [
            "acs:fc:*:*:services/FC-NAME/functions/FC-NAME",
            "acs:fc:*:*:services/FC-NAME.*/functions/*"
          ]
        }
      ]
    }
  4. 點擊確定,填寫策略名稱稱備忘。然後再次點擊確定,完成建立權限原則(策略名稱稱推薦使用:AliyunCDNLogDumpAccess;備忘推薦使用:管理CDN離線日誌轉存的許可權)。

  5. 點擊身份管理中的角色頁簽,點擊建立角色

  6. 信任主體類型選擇雲帳號信任主體名稱選擇當前雲帳號xxxxxxx,然後點擊確定

  7. 建立角色彈出框中輸入角色名稱(推薦使用AliyunCDNLogDumpRole)然後點擊確定,完成角色的建立。

  8. 在建立角色的基本資料中,選擇許可權管理頁簽,點擊精確授權策略類型選擇自訂策略策略名稱稱填寫第四步建立的權限原則名稱(推薦使用AliyunCDNLogDumpAccess)。然後點擊確定

  9. 信任策略頁簽中,點擊編輯信任策略。在指令碼編輯中,輸入下方的信任策略,然後點擊確定

    {
      "Statement": [
        {
          "Action": "sts:AssumeRole",
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "fc.aliyuncs.com"
            ]
          }
        }
      ],
      "Version": "1"
    }

至此,已經完成了整個角色和許可權的配置,接下來需要把這個角色綁定到Function Compute任務上。

4. Function Compute任務綁定角色

  1. 在Function Compute控制台的函數頁簽,選擇步驟2建立的函數,點擊配置

  2. 配置頁簽中,選擇進階配置,點擊對應的配置按鈕。

    image

  3. 在進階配置中,找到許可權-函數角色選項,選擇步驟3建立的角色(推薦使用AliyunCDNLogDumpRole)。然後點擊部署,完成Function Compute任務綁定角色的操作。

    image

5. 測試Function Compute任務(可選)

在完成前四個步驟之後,整個阿里雲CDN離線日誌轉存到Object Storage Service的操作已經全部完成。但是由於離線日誌的產生存在24小時左右的延遲,因此無法及時看到配置的Function Compute任務是否能夠正常執行。您可以按照以下步驟測試組態的Function Compute任務。

  1. 在Function Compute控制台的函數頁簽,選擇步驟2建立的函數,點擊配置

  2. 測試頁簽中,測試請求操作選擇建立新測試事件事件模板選擇CDN(LogFileCreated)事件名稱填寫測試cdn_log_dump

    image

  3. 用下方擷取的參數,替換修改事件模板中的filePath參數。

    如何擷取測試的filePath參數

    1. 進入阿里雲CDN離線日誌下載控制台。

    2. 選擇觸發器配置的加速網域名稱,日期選擇當前日期前一天,點擊查詢。

    3. 選擇其中一個檔案,複製檔案的下載連結(滑鼠放置在下載按鈕上,右鍵選擇複製連結)。

  4. 點擊測試函數,待執行完成之後,可以看到返回結果為null,並且執行成功的狀態。

    image

  5. 在Object Storage ServiceBucket控制台,選擇用作儲存阿里雲CDN日誌的Bucket。

  6. 選擇檔案清單,進入到配置的儲存阿里雲CDN日誌的目錄中,可以看到以加速網域名稱命名的檔案夾,檔案夾內有以日期命名的子檔案夾,裡麵包含了 5.3 步驟中配置的檔案。說明Function Compute任務已成功處理阿里雲CDN離線日誌轉存。

    image