すべてのプロダクト
Search
ドキュメントセンター

CDN:[FC コンソール] Function Compute を使用して Object Storage Service (OSS) にオフラインログを保存する

最終更新日:Nov 09, 2025

Alibaba Cloud Function Compute を使用して、Content Delivery Network (CDN) からのオフラインログを Object Storage Service (OSS) に自動的かつ定期的に保存します。これにより、長期的なログのアーカイブと分析が可能になります。

背景情報

Content Delivery Network (CDN) は、高速化ドメイン名の詳細なアクセスログを提供します。これらのオフラインログは、ユーザーの動作分析、サービスのトラブルシューティング、および運用データの分析に重要です。CDN サービスポリシーによると、オフラインログファイルは CDN サーバーに 30 日間のみ保持され、この期間を過ぎると自動的に削除されます。

データコンプライアンス、長期監査、または既存データ分析の要件を満たすために、これらのログを永続的に保存する必要がある場合があります。Object Storage Service (OSS) は、高可用性、低コスト、かつ耐久性のあるストレージソリューションを提供し、長期的なログアーカイブに理想的な選択肢です。Function Compute (FC) は、CDN ログを生成するイベントをリッスンし、タスク関数を呼び出して CDN からのオフラインログを OSS に保存します。このソリューションを使用すると、CDN ログを OSS バケットにシームレスに保存するための自動化されたワークフローを構築できます。

実行ロジック

この自動化ストレージソリューションの中核は、FC をスケジューラおよびエグゼキュータとして使用して CDN と OSS を接続することです。ワークフローは次のとおりです:

  1. イベントトリガー: Function Compute でトリガーが構成されます。トリガーは、CDN がログを生成するたびにアクティブ化されます。

  2. 関数実行: イベントトリガーがアクティブ化されると、関連付けられた関数コードが自動的に実行されます。

  3. ログのプル: 関数コードは、現在の日付に基づいて前日のログファイル名を計算し、CDN オフラインログのダウンロード URL を生成します。その後、関数はこの URL にリクエストを送信して、ログファイルを FC の一時環境にダウンロードします。

  4. Object Storage Service (OSS) への保存: 関数がログファイルのダウンロードに成功すると、OSS API を呼び出して、指定された OSS バケット内の指定されたディレクトリにファイルをアップロードします。

プロセス全体は完全に自動化されており、CDN、FC、OSS の 3 つの Alibaba Cloud サービスを統合しています。これにより、クラウドサービス管理の効率が向上します。

課金の説明

このソリューションでは、以下のプロダクトの課金が発生します。

  • Content Delivery Network (CDN): オフラインログを生成してダウンロードを提供する機能は無料です。

  • Function Compute: FC は、関数実行回数、消費リソース (vCPU とメモリ)、および実行時間に基づいて課金されます。1 日に数回しか実行されない軽量なログストレージタスクの場合、コストは通常非常に低くなります。詳細については、「Function Compute の課金の概要」をご参照ください。

  • Object Storage Service (OSS): OSS は、使用するストレージ容量、API リクエスト数、およびインターネット経由のアウトバウンドトラフィックに基づいて課金されます。詳細については、「Object Storage Service (OSS) の課金の概要」をご参照ください。

前提条件

  • CDN、FC、および OSS が同じ Alibaba Cloud アカウントでアクティブ化されていることを確認し、サービスの権限付与とシームレスなアクセスを確保してください。

  • バケットの作成」の手順に従って、OSS にログファイルを保存するためのバケットを作成します。バケット名インターネットアクセスのエンドポイント値、およびログファイルが保存されるディレクトリの名前を記録してください。

構成ステップ

1. バケット構成を取得する

FC でタスク関数を作成する際には、ログストレージ用の OSS 情報を提供する必要があります。したがって、まずログファイルを保存するためのバケット名インターネットアクセスのエンドポイント値、およびディレクトリの名前を取得する必要があります。この情報を取得するには、次の手順に従ってください:

バケット構成の取得

  1. OSS コンソールの バケットリスト タブに移動します。ログストレージ用のバケットを選択します。

  2. バケット名をクリックして、バケットの詳細ページに移動します。

  3. バケットの詳細ページで、[概要] タブを選択します。基本情報セクションから [バケット名] を取得します。アクセスポートセクションから、[インターネットアクセス][エンドポイント (リージョン)] の値を取得します。

    image

  4. [ファイル管理] をクリックし、次に [ファイルリスト] をクリックします。ファイルリストで、[ディレクトリの作成] をクリックし、ディレクトリ名を入力します。cdn_log を使用することをお勧めします。

    image

2. Function Compute タスクを作成する

この自動化ストレージソリューションの中核は、FC をスケジューラおよびエグゼキュータとして使用することです。したがって、FC で対応するトリガーとタスク関数を構成する必要があります。

  1. Function Compute 3.0 コンソールに移動します。左側のナビゲーションウィンドウで、[関数] を選択します。

  2. [関数] タブで、[関数の作成] をクリックし、[イベント関数] を選択してから、[イベント関数の作成] をクリックします。

  3. イベント関数を作成する際には、関数の適切な実行に影響を与える主要なパラメーターのみを構成します。

    • 基本構成 - 関数名: この関数名は後続のステップで使用します。cdn-log-dump を使用することをお勧めします。

    • 関数コード - ランタイム環境: タスク関数は Python コードであるため、[ビルトインランタイム][Python]、および [Python 3.10] を選択します。

    • 詳細設定 - 環境変数: タスク関数はバケット情報を取得する必要があります。したがって、環境変数でバケット構成情報を渡す必要があります。3 つの環境変数を作成し、対応するパラメーターを入力します:

      • target_oss_bucket: バケット名

      • target_oss_endpoint: インターネットアクセスのエンドポイント値

      • target_oss_prefix: ログファイルを保存するためのディレクトリの名前

      image

  4. パラメータを構成した後、[作成] をクリックして関数を作成します。

  5. [関数の詳細] ページで、[トリガー] タブをクリックし、次に [トリガーの作成] をクリックします。

    image

  6. 次の手順に従って、トリガーの主要なパラメーターを構成します。[OK] をクリックします。

    • [トリガータイプ]: [CDN 同期呼び出し] を選択します。

    • 名前: トリガー名を入力します。cdn-logs-triggers を使用することをお勧めします。

    • [トリガーイベント]: [LogFileCreated] を選択します。

    • ドメイン名: 同じ Alibaba Cloud アカウント下にあり、正常に実行されている CDN 高速化ドメイン名を入力する必要があります。

    • 説明: トリガーの説明を入力します。CDN オフラインログファイル作成トリガーを使用することをお勧めします。

    • [ロール名]: [AliyunCDNEventNotificationRole] を選択します。

  7. トリガーパラメーターを構成した後、[OK] をクリックします。メッセージ [CDN トリガーのデフォルトロールが作成されていません] が表示された場合は、[今すぐ承認] をクリックし、指示に従ってデフォルトロールを作成します。このメッセージが表示されない場合は、トリガーは直接作成されます。

    image

  8. [関数の詳細] ページで、[コード] タブをクリックします。オンラインコンパイラで、次のコードを入力して CDN からオフラインログをプルし、OSS に保存します。

    ストレージタスクコード

    # coding=utf-8
    
    import os, time, json, requests, traceback, oss2, fc2
    from requests.exceptions import *
    from fc2.fc_exceptions import *
    from oss2.models import PartInfo
    from oss2.exceptions import *
    from multiprocessing import Pool
    from contextlib import closing
    
    MAX_PROCCESSES = 20 # 各サブタスクのワーカープロセスの数
    BLOCK_SIZE = 6 * 1024 * 1024 # 各パートのサイズ
    BLOCK_NUM_INTERNAL = 18 # 内部 URL の場合の各サブタスクのデフォルトのブロック数
    BLOCK_NUM = 10 # 各サブタスクのデフォルトのブロック数
    MAX_SUBTASKS = 49 # サブタスクを実行するワーカープロセスの数
    CHUNK_SIZE = 8 * 1024 # 各チャンクのサイズ
    SLEEP_TIME = 0.1 # リトライを待つ初期秒数
    MAX_RETRY_TIME = 10 # 最大リトライ回数
    
    def retry(func):
        """
        リトライ付きでラムダ関数 func の結果を返します。
        :param func: (必須、ラムダ) 関数。
        :return: func の結果。
        """
        wait_time = SLEEP_TIME
        retry_cnt = 1
        while True:
            if retry_cnt > MAX_RETRY_TIME:
                return func()
            try:
                return func()
            except (ConnectionError, SSLError, ConnectTimeout, Timeout) as e:
                print(traceback.format_exc())
            except (OssError) as e:
                if 500 <= e.status < 600:
                    print(traceback.format_exc())
                else:
                    raise Exception(e)
            except (FcError) as e:
                if (500 <= e.status_code < 600) or (e.status_code == 429):
                    print(traceback.format_exc())
                else:
                    raise Exception(e)
            print('%d 回リトライしています...' % retry_cnt)
            time.sleep(wait_time)
            wait_time *= 2
            retry_cnt += 1
    
    def get_info(url):
        """
        ファイルの CRC64 と合計長を取得します。
        :param url: (必須、文字列) ファイルの URL アドレス。
        :return: CRC64、長さ
        """
        with retry(lambda : requests.get(url, {}, stream = True)) as r:
            return r.headers['x-oss-hash-crc64ecma'], int(r.headers['content-length'])
    
    class Response(object):
        """
        チャンク単位の読み取りをサポートする応答クラス。
        """
        def __init__(self, response):
            self.response = response
            self.status = response.status_code
            self.headers = response.headers
    
        def read(self, amt = None):
            if amt is None:
                content = b''
                for chunk in self.response.iter_content(CHUNK_SIZE):
                    content += chunk
                return content
            else:
                try:
                    return next(self.response.iter_content(amt))
                except StopIteration:
                    return b''
    
        def __iter__(self):
            return self.response.iter_content(CHUNK_SIZE)
    
    def migrate_part(args):
        """
        URL からパートをダウンロードし、OSS にアップロードします。
        :param args: (bucket, object_name, upload_id, part_number, url, st, en)
        :bucket: (必須、Bucket) ターゲット OSS バケット。
        :object_name: (必須、文字列) ターゲットオブジェクト名。
        :upload_id: (必須、整数) このアップロードタスクの upload_id。
        :part_number: (整数) このパートの part_number。
        :url: (必須、文字列) ファイルの URL アドレス。
        :st, en: (必須、整数) このパートのバイト範囲。[st, en] を示します。
        :return: (part_number, etag)
        :part_number: (整数) このパートの part_number。
        :etag: (文字列) upload_part 結果の etag。
        """
        bucket = args[0]
        object_name = args[1]
        upload_id = args[2]
        part_number = args[3]
        url = args[4]
        st = args[5]
        en = args[6]
        try:
            headers = {'Range' : 'bytes=%d-%d' % (st, en)}
            resp = Response(retry(lambda : requests.get(url, headers = headers, stream = True)))
            result = retry(lambda : bucket.upload_part(object_name, upload_id, part_number, resp))
            return (part_number, result.etag)
        except Exception as e:
            print(traceback.format_exc())
            raise Exception(e)
    
    def do_subtask(event, context):
        """
        URL からファイルの範囲をダウンロードし、OSS にアップロードします。
        :param event: (必須、json) イベントの JSON フォーマット。
        :param context: (必須、FCContext) ハンドラのコンテキスト。
        :return: parts
        :parts: ([(整数, 文字列)]) 各プロセスの part_number と etag。
        """
        oss_endpoint = os.environ.get('target_oss_endpoint')
        oss_bucket_name = os.environ.get('target_oss_bucket')
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
        object_name = event['object_name']
        upload_id = event['upload_id']
        part_number = event['part_number']
        url = event['url']
        st = event['st']
        en = event['en']
        if part_number == 1:
            return [migrate_part((bucket, object_name, upload_id, part_number, url, st, en))]
        pool = Pool(MAX_PROCCESSES)
        tasks = []
        while st <= en:
            nxt = min(en, st + BLOCK_SIZE - 1)
            tasks.append((bucket, object_name, upload_id, part_number, url, st, nxt))
            part_number += 1
            st = nxt + 1
        parts = pool.map(migrate_part, tasks)
        pool.close()
        pool.join()
        return parts
    
    def invoke_subtask(args):
        """
        同じ関数を同期的に呼び出して、サブタスクを開始します。
        :param args: (object_name, upload_id, part_number, url, st, en, context)
        :object_name: (必須、文字列) ターゲットオブジェクト名。
        :upload_id: (必須、整数) このアップロードタスクの upload_id。
        :part_number: (整数) このサブタスクの最初のパートの part_number。
        :url: (必須、文字列) ファイルの URL アドレス。
        :st, en: (必須、整数) このサブタスクのバイト範囲。[st, en] を示します。
        :context: (必須、FCContext) ハンドラのコンテキスト。
        :return: 呼び出された関数の戻り値。
        """
        object_name = args[0]
        upload_id = args[1]
        part_number = args[2]
        url = args[3]
        st = args[4]
        en = args[5]
        context = args[6]
        account_id = context.account_id
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        region = context.region
        service_name = context.service.name
        function_name = context.function.name
        endpoint = 'http://%s.%s-internal.fc.aliyuncs.com' % (account_id, region)
        client = fc2.Client(
            endpoint = endpoint,
            accessKeyID = access_key_id,
            accessKeySecret = access_key_secret,
            securityToken = security_token
        )
        payload = {
            'object_name' : object_name,
            'upload_id' : upload_id,
            'part_number' : part_number,
            'url' : url,
            'st' : st,
            'en' : en,
            'is_children' : True
        }
        if part_number == 1:
            return json.dumps(do_subtask(payload, context))
        ret = retry(lambda : client.invoke_function(service_name, function_name, payload = json.dumps(payload)))
        return ret.data
    
    def divide(n, m):
        """
        浮動小数点演算なしで ceil(n / m) を計算します。
        :param n, m: (整数)
        :return: (整数) ceil(n / m)。
        """
        ret = n // m
        if n % m > 0:
            ret += 1
        return ret
    
    def migrate_file(url, oss_object_name, context):
        """
        URL からファイルをダウンロードし、OSS にアップロードします。
        :param url: (必須、文字列) ファイルの URL アドレス。
        :param oss_object_name: (必須、文字列) ターゲットオブジェクト名。
        :param context: (必須、FCContext) ハンドラのコンテキスト。
        :return: actual_crc64, expect_crc64
        :actual_crc64: (文字列) アップロードの CRC64。
        :expect_crc64: (文字列) ソースファイルの CRC64。
        """
        crc64, total_size = get_info(url)
        oss_endpoint = os.environ.get('target_oss_endpoint')
        oss_bucket_name = os.environ.get('target_oss_bucket')
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
        upload_id = retry(lambda : bucket.init_multipart_upload(oss_object_name)).upload_id
        pool = Pool(MAX_SUBTASKS)
        st = 0
        part_number = 1
        tasks = []
        block_num = BLOCK_NUM_INTERNAL if '-internal.aliyuncs.com' in oss_endpoint else BLOCK_NUM
        block_num = min(block_num, divide(divide(total_size, BLOCK_SIZE), MAX_SUBTASKS + 1))
        while st < total_size:
            en = min(total_size - 1, st + block_num * BLOCK_SIZE - 1)
            tasks.append((oss_object_name, upload_id, part_number, url, st, en, context))
            size = en - st + 1
            cnt = divide(size, BLOCK_SIZE)
            part_number += cnt
            st = en + 1
        subtasks = pool.map(invoke_subtask, tasks)
        pool.close()
        pool.join()
        parts = []
        for it in subtasks:
            for part in json.loads(it):
                parts.append(PartInfo(part[0], part[1]))
        res = retry(lambda : bucket.complete_multipart_upload(oss_object_name, upload_id, parts))
        return str(res.crc), str(crc64)
    
    def get_oss_object_name(url):
        """
        OSS オブジェクト名を取得します。
        :param url: (必須、文字列) ファイルの URL アドレス。
        :return: (文字列) OSS オブジェクト名。
        """
        prefix = os.environ.get('target_oss_prefix')
        tmps = url.split('?')
        if len(tmps) != 2:
            raise Exception('無効な URL です : %s' % url)
        urlObject = tmps[0]
        if urlObject.count('/') < 3:
            raise Exception('無効な URL です : %s' % url)
        objectParts = urlObject.split('/')
        objectParts = [prefix] + objectParts[len(objectParts) - 3 : len(objectParts)]
        return '/'.join(objectParts)
    
    def handler(event, context):
        evt = json.loads(event)
        if list(evt.keys()).count('is_children'):
            return json.dumps(do_subtask(evt, context))
        url = evt['events'][0]['eventParameter']['filePath']
        if not (url.startswith('http://') or url.startswith('https://')):
            url = 'https://' + url
        oss_object_name = get_oss_object_name(url)
        st_time = int(time.time())
        wait_time = SLEEP_TIME
        retry_cnt = 1
        while True:
            actual_crc64, expect_crc64 = migrate_file(url, oss_object_name, context)
            if actual_crc64 == expect_crc64:
                break
            print('移行オブジェクトの CRC64 が一致しません。期待値: %s、実際値: %s' % (expect_crc64, actual_crc64))
            if retry_cnt > MAX_RETRY_TIME:
                raise Exception('最大リトライ回数を超えました。')
            print('%d 回リトライしています...' % retry_cnt)
            time.sleep(wait_time)
            wait_time *= 2
            retry_cnt += 1
        print('成功しました。合計時間: %d 秒。' % (int(time.time()) - st_time))
  9. [コードのデプロイ] をクリックして、関数の構成を完了します。

3. 専用のロールとアクセスポリシーの作成

FC は OSS にアクセスするための権限が必要です。権限付与プロセスを簡素化するために、FC はロールの関連付けをサポートしています。次の手順に従って、オフラインログストレージ関数が OSS にアクセスできるようにするロールを構成します。

  1. Resource Access Management (RAM) コンソールを開きます。左側のナビゲーションウィンドウで、[権限管理] > [ポリシー] を選択します。

  2. [ポリシーの作成] をクリックします。[ポリシーの作成] ページで、[スクリプト] タブをクリックします。

  3. 次のポリシーで、BucketName をご自身のバケット名に置き換え、FC-NAME の 3 つのインスタンスすべてをステップ 2 の関数名に置き換えます。cdn-log-dump を使用することをお勧めします。

    {
      "Version": "1",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "oss:PutObject",
          "Resource": "acs:oss:*:*:BucketName/*"
        },
        {
          "Effect": "Allow",
          "Action": "fc:InvokeFunction",
          "Resource": [
            "acs:fc:*:*:services/FC-NAME/functions/FC-NAME",
            "acs:fc:*:*:services/FC-NAME.*/functions/*"
          ]
        }
      ]
    }
  4. [OK] をクリックします。[ポリシー名][説明] を入力し、再度 [OK] をクリックして [アクセスポリシーの作成] プロセスを完了します。(ポリシー名として AliyunCDNLogDumpAccess を、説明としてCDN ログストレージの権限を管理を使用することをお勧めします)。

  5. 左側のナビゲーションウィンドウで、[ID 管理] > [ロール] を選択します。[ロール] ページで、[ロールの作成] をクリックします。

  6. [信頼できるエンティティタイプ][Alibaba Cloud アカウント] を選択します。[信頼できる Alibaba Cloud アカウント][現在の Alibaba Cloud アカウント Xxxxxxx] を選択します。次に、[OK] をクリックします。

  7. [ロールの作成] パネルで、[ロール名] を入力します。AliyunCDNLogDumpRole を使用することをお勧めします。次に、[OK] をクリックしてロールを作成します。

  8. ロールの詳細ページで、[権限] タブをクリックし、次に [権限の付与] をクリックします。[権限付与の範囲][カスタムポリシー] に設定します。[ポリシー名] に、ステップ 4 で作成したポリシーの名前を入力します。AliyunCDNLogDumpAccess を使用することをお勧めします。次に、[OK] をクリックします。

  9. [信頼ポリシー] タブをクリックし、次に [信頼ポリシーの編集] をクリックします。[スクリプト] エディターで、次の信頼ポリシーを入力し、[OK] をクリックします。

    {
      "Statement": [
        {
          "Action": "sts:AssumeRole",
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "fc.aliyuncs.com"
            ]
          }
        }
      ],
      "Version": "1"
    }

この時点で、ロールと権限の構成は完了です。次に、このロールを FC タスクにバインドする必要があります。

4. ロールを Function Compute タスクにバインドする

  1. FC コンソールで、[関数] タブで、ステップ 2 で作成した関数を選択し、[設定] をクリックします。

  2. [設定] タブで、[詳細設定] を選択し、対応する [設定] ボタンをクリックします。

    image

  3. 詳細設定で、[権限] - [関数ロール] オプションを見つけます。ステップ 3 で作成したロールを選択します。AliyunCDNLogDumpRole を使用することをお勧めします。次に、[デプロイ] をクリックしてロールを FC タスクにバインドします。

    image

5. Function Compute タスクのテスト (オプション)

最初の 4 つのステップを完了すると、CDN オフラインログを OSS に保存するためのプロセス全体が構成されます。ただし、オフラインログの生成には約 24 時間の遅延があるため、構成した FC タスクが正しく実行されているかをすぐに確認することはできません。次の手順に従って、構成した FC タスクをテストできます。

  1. FC コンソールで、[関数] タで、ステップ 2 で作成した関数を選択し、[設定] をクリックします。

  2. [テスト] タブで、[テストリクエスト操作] には [新しいテストイベントの作成] を選択します。 [イベントテンプレート] には [CDN(LogFileCreated)] を選択します。 [イベント名] には [Test_cdn_log_dump] を入力します。

    image

  3. 次の手順で取得したパラメーターを使用して、イベントテンプレートの filePath パラメーターを置き換えます。

    テスト用の filePath パラメーターの取得方法

    1. CDN オフラインログダウンロード コンソールに移動します。

    2. トリガーで構成された高速化ドメイン名を選択します。現在の日付の前日を選択し、[クエリ] をクリックします。

    3. ファイルを選択します。ダウンロードリンクをコピーするには、[ダウンロード] ボタンにカーソルを合わせ、右クリックして [リンクをコピー] を選択します。

  4. [関数のテスト] をクリックします。実行が完了すると、戻り値が null で、ステータスが成功であることがわかります。

    image

  5. OSS バケットコンソールで、CDN ログの保存に使用するバケットを選択します。

  6. [ファイルリスト] を選択し、CDN ログを保存するように構成されたディレクトリに移動します。高速化ドメイン名にちなんで名付けられたフォルダが表示されます。このフォルダ内には、日付にちなんで名付けられたサブフォルダがあり、テストで指定されたファイルが含まれています。これは、FC タスクが CDN オフラインログストレージを正常に処理したことを示します。

    image