全部產品
Search
文件中心

Object Storage Service:簡單下載(Python SDK V2)

更新時間:Jul 31, 2025

本文介紹如何通過簡單下載方法將儲存空間(Bucket)中的檔案(Object)下載到本地,此方法操作簡便,適合快速將雲端儲存的檔案下載到本地。

注意事項

本文範例程式碼以華東1(杭州)的地區IDcn-hangzhou為例,預設使用外網Endpoint,如果您希望通過與OSS同地區的其他阿里雲產品訪問OSS,請使用內網Endpoint。關於OSS支援的Region與Endpoint的對應關係,請參見OSS地區和訪問網域名稱

許可權說明

阿里雲帳號預設擁有全部許可權。阿里雲帳號下的RAM使用者或RAM角色預設沒有任何許可權,需要阿里雲帳號或帳號管理員通過RAM PolicyBucket Policy授予操作許可權。

API

Action

說明

GetObject

oss:GetObject

下載Object。

oss:GetObjectVersion

下載Object時,如果通過versionId指定了Object的版本,則需要授予此操作的許可權。

kms:Decrypt

下載Object時,如果Object的中繼資料套件含X-Oss-Server-Side-Encryption: KMS,則需要此操作的許可權。

方法定義

get_object(request: GetObjectRequest, **kwargs) → GetObjectResult

請求參數列表

參數名

類型

說明

request

GetObjectRequest

佈建要求參數,具體請參見GetObjectRequest

傳回值列表

類型

說明

GetObjectResult

傳回值,具體請參見GetObjectResult

關於簡單下載方法的完整定義,請參見get_object

範例程式碼

您可以使用以下代碼將儲存空間中的檔案下載到本地。

import argparse
import alibabacloud_oss_v2 as oss
import os

# 建立命令列參數解析器
parser = argparse.ArgumentParser(description="get object sample")

# 添加命令列參數 --region,表示儲存空間所在的地區,必需參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令列參數 --bucket,表示儲存空間的名稱,必需參數
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令列參數 --endpoint,表示其他服務可用來訪問OSS的網域名稱,非必需參數
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令列參數 --key,表示對象的名稱,必需參數
parser.add_argument('--key', help='The name of the object.', required=True)

def main():
    # 解析命令列參數
    args = parser.parse_args()

    # 從環境變數中載入憑證資訊,用於身分識別驗證
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 載入SDK的預設配置,並設定憑證提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 設定配置中的地區資訊
    cfg.region = args.region

    # 如果提供了endpoint參數,則設定配置中的endpoint
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用配置好的資訊建立OSS用戶端
    client = oss.Client(cfg)

    # 執行擷取對象的請求,指定儲存空間名稱和對象名稱
    result = client.get_object(oss.GetObjectRequest(
        bucket=args.bucket,  # 指定儲存空間名稱
        key=args.key,  # 指定對象鍵名
    ))

    # 輸出擷取對象的結果資訊,用於檢查請求是否成功
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content length: {result.content_length},'
          f' content range: {result.content_range},'
          f' content type: {result.content_type},'
          f' etag: {result.etag},'
          f' last modified: {result.last_modified},'
          f' content md5: {result.content_md5},'
          f' cache control: {result.cache_control},'
          f' content disposition: {result.content_disposition},'
          f' content encoding: {result.content_encoding},'
          f' expires: {result.expires},'
          f' hash crc64: {result.hash_crc64},'
          f' storage class: {result.storage_class},'
          f' object type: {result.object_type},'
          f' version id: {result.version_id},'
          f' tagging count: {result.tagging_count},'
          f' server side encryption: {result.server_side_encryption},'
          f' server side data encryption: {result.server_side_data_encryption},'
          f' next append position: {result.next_append_position},'
          f' expiration: {result.expiration},'
          f' restore: {result.restore},'
          f' process status: {result.process_status},'
          f' delete marker: {result.delete_marker},'
    )

    # ========== 方式1:完整讀取 ==========
    with result.body as body_stream:
        data = body_stream.read()
        print(f"檔案讀取完成,資料長度:{len(data)} bytes")

        path = "./get-object-sample.txt"
        with open(path, 'wb') as f:
            f.write(data)
        print(f"檔案下載完成,儲存至路徑:{path}")

    # # ========== 方式2:分塊讀取 ==========
    # with result.body as body_stream:
    #     chunk_path = "./get-object-sample-chunks.txt"
    #     total_size = 0

    #     with open(chunk_path, 'wb') as f:
    #         # 使用256KB塊大小(可根據需要調整block_size參數)
    #         for chunk in body_stream.iter_bytes(block_size=256 * 1024):
    #             f.write(chunk)
    #             total_size += len(chunk)
    #             print(f"已接收資料區塊:{len(chunk)} bytes | 累計:{total_size} bytes")

    #     print(f"檔案下載完成,儲存至路徑:{chunk_path}")

# 當此指令碼被直接運行時,調用main函數
if __name__ == "__main__":
    main()  # 指令碼入口,當檔案被直接運行時調用main函數

常見使用情境

根據限定條件下載

當從Bucket中下載單個檔案(Object)時,您可以指定基於檔案最後修改時間或ETag(檔案內容標識符)的條件限制。只有當這些條件得到滿足時才會執行下載操作;如果不滿足,則會返回錯誤並且不會觸發下載。利用限定條件下載不僅可以減少不必要的網路傳輸和資源消耗,還能提高下載效率。

OSS支援的限定條件如下:

說明
  • if_modified_since和if_unmodified_since可以同時存在。if_match和if_none_match也可以同時存在。

  • 您可以通過client.get_object_meta方法擷取ETag。

參數

描述

if_modified_since

如果指定的時間早於實際修改時間,則正常傳輸檔案,否則返回錯誤(304 Not modified)。

if_unmodified_since

如果指定的時間等於或者晚於檔案實際修改時間,則正常傳輸檔案,否則返回錯誤(412 Precondition failed)。

if_match

如果指定的ETag和OSS檔案的ETag匹配,則正常傳輸檔案,否則返回錯誤(412 Precondition failed)。

if_none_match

如果指定的ETag和OSS檔案的ETag不匹配,則正常傳輸檔案,否則返回錯誤(304 Not modified)。

以下範例程式碼展示了如何使用限定條件下載。

import argparse
import alibabacloud_oss_v2 as oss
from datetime import datetime, timezone

# 建立一個命令列參數解析器,並描述指令碼用途:擷取對象並儲存到檔案樣本
parser = argparse.ArgumentParser(description="get object to file sample")

# 添加命令列參數 --region,表示儲存空間所在的地區,必需參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令列參數 --bucket,表示要擷取對象的儲存空間名稱,必需參數
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令列參數 --endpoint,表示其他服務可用來訪問OSS的網域名稱,非必需參數
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令列參數 --key,表示對象(檔案)在OSS中的鍵名,必需參數
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令列參數 --file_path,表示下載檔案的本地路徑,必需參數
parser.add_argument('--file_path', help='The path of the file to save the downloaded content.', required=True)

def main():
    # 解析命令列提供的參數,擷取使用者輸入的值
    args = parser.parse_args()

    # 從環境變數中載入訪問OSS所需的認證資訊,用於身分識別驗證
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的預設配置建立設定物件,並設定認證提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    
    # 設定設定物件的地區屬性,根據使用者提供的命令列參數
    cfg.region = args.region

    # 如果提供了自訂endpoint,則更新設定物件中的endpoint屬性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS用戶端,準備與OSS互動
    client = oss.Client(cfg)

    # 定義 if_modified_since 時間
    # 只有在此時間之後被修改的對象才會被返回
    if_modified_since = datetime(2024, 10, 1, 12, 0, 0, tzinfo=timezone.utc)

    # 假設ETag為DA5223EFCD7E0353BE08866700000000,則填寫的ETag與Object的ETag值相等時,將滿足IfMatch的限定條件,並觸發下載行為。
    etag = "\"DA5223EFCD7E0353BE08866700000000\""

    # 執行擷取對象並儲存到本地檔案的請求
    result = client.get_object_to_file(
        oss.GetObjectRequest(
            bucket=args.bucket,  # 指定儲存空間名稱
            key=args.key,        # 指定對象鍵名
            if_modified_since=if_modified_since,  # 只有在指定時間之後被修改的對象才會被返回
            if_match=etag,       # 只有 ETag 匹配的對象才會被返回
        ),
        args.file_path  # 指定下載檔案的本地路徑
    )

    # 輸出擷取對象的結果資訊,包括狀態代碼、請求ID等
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content length: {result.content_length},'
          f' content range: {result.content_range},'
          f' content type: {result.content_type},'
          f' etag: {result.etag},'
          f' last modified: {result.last_modified},'
          f' content md5: {result.content_md5},'
          f' cache control: {result.cache_control},'
          f' content disposition: {result.content_disposition},'
          f' content encoding: {result.content_encoding},'
          f' expires: {result.expires},'
          f' hash crc64: {result.hash_crc64},'
          f' storage class: {result.storage_class},'
          f' object type: {result.object_type},'
          f' version id: {result.version_id},'
          f' tagging count: {result.tagging_count},'
          f' server side encryption: {result.server_side_encryption},'
          f' server side data encryption: {result.server_side_data_encryption},'
          f' next append position: {result.next_append_position},'
          f' expiration: {result.expiration},'
          f' restore: {result.restore},'
          f' process status: {result.process_status},'
          f' delete marker: {result.delete_marker},'
          f' server time: {result.headers.get("x-oss-server-time")},'
    )

# 當此指令碼被直接執行時,調用main函數開始處理邏輯
if __name__ == "__main__":
    main()  # 指令碼進入點,控製程序流程從這裡開始

列印下載檔案的進度條

當您在下載檔案時,可以使用進度條即時瞭解下載進度,避免因為等待時間過長而感到不安或懷疑任務是否卡住。

以下範例程式碼展示了在下載檔案到本地時,使用進度條查看下載檔案的進度,以get_object_to_file為例。

import argparse
import alibabacloud_oss_v2 as oss

# 建立一個命令列參數解析器,並描述指令碼用途:擷取對象樣本
parser = argparse.ArgumentParser(description="get object sample")

# 添加命令列參數 --region,表示儲存空間所在的地區,必需參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令列參數 --bucket,表示要擷取對象的儲存空間名稱,必需參數
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令列參數 --endpoint,表示其他服務可用來訪問OSS的網域名稱,非必需參數
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令列參數 --key,表示對象(檔案)在OSS中的鍵名,必需參數
parser.add_argument('--key', help='The name of the object.', required=True)

def main():
    # 解析命令列提供的參數,擷取使用者輸入的值
    args = parser.parse_args()

    # 從環境變數中載入訪問OSS所需的認證資訊,用於身分識別驗證
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的預設配置建立設定物件,並設定認證提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 設定設定物件的地區屬性,根據使用者提供的命令列參數
    cfg.region = args.region

    # 如果提供了自訂endpoint,則更新設定物件中的endpoint屬性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS用戶端,準備與OSS互動
    client = oss.Client(cfg)

    # 定義一個字典變數 progress_state 用於儲存下載進度狀態,初始值為 0
    progress_state = {'saved': 0}
    
    # 定義進度回呼函數 _progress_fn
    def _progress_fn(n, written, total):
        # 使用字典儲存累計寫入的位元組數
        progress_state['saved'] += n

        # 計算當前下載百分比,將已寫入位元組數與總位元組數進行除法運算後取整
        rate = int(100 * (float(written) / float(total)))

        # 列印當前下載進度,\r 表示回到行首,實現命令列中即時重新整理效果
        # end='' 表示不換行,使下一次列印覆蓋當前行
        print(f'\r{rate}% ', end='')

    # 執行擷取對象的請求,指定儲存空間名稱、對象名稱及進度回呼函數
    result = client.get_object_to_file(
        oss.GetObjectRequest(
            bucket=args.bucket,  # 指定儲存空間名稱
            key=args.key,        # 指定對象鍵名
            progress_fn=_progress_fn, # 指定進度回呼函數
        ),
        "/local/dir/example", # 指定儲存到本地的檔案路徑
    )

    # 輸出擷取對象的結果資訊
    print(vars(result))

# 當此指令碼被直接執行時,調用main函數開始處理邏輯
if __name__ == "__main__":
    main()  # 指令碼進入點,控製程序流程從這裡開始

以下範例程式碼展示了在流式下載時,使用進度條查看下載檔案的進度,以get_object為例。

import argparse
import alibabacloud_oss_v2 as oss
import os

# 建立命令列參數解析器
parser = argparse.ArgumentParser(description="get object sample")

# 添加命令列參數 --region,表示儲存空間所在的地區,必需參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令列參數 --bucket,表示儲存空間的名稱,必需參數
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令列參數 --endpoint,表示其他服務可用來訪問OSS的網域名稱,非必需參數
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令列參數 --key,表示對象的名稱,必需參數
parser.add_argument('--key', help='The name of the object.', required=True)

def main():
    # 解析命令列參數
    args = parser.parse_args()

    # 從環境變數中載入訪問OSS所需的認證資訊,用於身分識別驗證
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 載入SDK的預設配置,並設定憑證提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 設定配置中的地區資訊
    cfg.region = args.region

    # 如果提供了endpoint參數,則設定配置中的endpoint
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用配置好的資訊建立OSS用戶端
    client = oss.Client(cfg)

    # 執行擷取對象的請求,指定儲存空間名稱和對象名稱
    result = client.get_object(oss.GetObjectRequest(
        bucket=args.bucket,  # 指定儲存空間名稱
        key=args.key,  # 指定對象鍵名
    ))

    # 擷取對象返回的結果中包含檔案的總大小(位元組數)
    total_size = result.content_length

    # 初始化進度計數器為 0,用於記錄已下載的資料量
    progress_save_n = 0

    # 遍曆響應體中的資料區塊,實現逐塊讀取資料
    for d in result.body.iter_bytes():
        # 累加當前資料區塊的長度到已下載總量中
        progress_save_n += len(d)

        # 計算當前下載百分比,將已下載量與總大小進行比例換算並取整
        rate = int(100 * (float(progress_save_n) / float(total_size)))

        # 列印當前下載進度,\r 表示回到行首,實現命令列中即時重新整理效果
        # end='' 表示不換行,使下一次列印覆蓋當前行
        print(f'\r{rate}% ', end='')

    # 列印結果對象的所有屬性資訊,用於調試或查看完整響應內容
    print(vars(result))


# 當此指令碼被直接運行時,調用main函數
if __name__ == "__main__":
    main()  # 指令碼入口,當檔案被直接運行時調用main函數

批量下載檔案到本地

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import alibabacloud_oss_v2 as oss
import os
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Tuple, Optional
import signal

class DownloadTask:
    """下載任務類"""
    def __init__(self, object_key: str, local_path: str, size: int):
        self.object_key = object_key
        self.local_path = local_path
        self.size = size

class DownloadResult:
    """下載結果類"""
    def __init__(self, object_key: str, success: bool = False, error: Optional[str] = None, size: int = 0):
        self.object_key = object_key
        self.success = success
        self.error = error
        self.size = size

class BatchDownloader:
    """批量下載器"""

    def __init__(self, client: oss.Client, bucket: str, max_workers: int = 5):
        self.client = client
        self.bucket = bucket
        self.max_workers = max_workers
        self.stop_event = threading.Event()

    def list_objects(self, prefix: str = "", max_keys: int = 1000) -> List[DownloadTask]:
        """列舉儲存空間中指定首碼的所有對象"""
        tasks = []
        continuation_token = None

        print(f"正在掃描儲存空間中的檔案...")

        while not self.stop_event.is_set():
            try:
                # 建立列舉對象請求
                request = oss.ListObjectsV2Request(
                    bucket=self.bucket,
                    prefix=prefix,
                    max_keys=max_keys,
                    continuation_token=continuation_token
                )

                # 執行列舉操作
                result = self.client.list_objects_v2(request)

                # 處理列舉結果
                for obj in result.contents:
                    # 跳過檔案夾對象(以/結尾且大小為0)
                    if obj.key.endswith('/') and obj.size == 0:
                        continue

                    # 計算本地檔案路徑
                    relative_path = obj.key[len(prefix):] if prefix else obj.key

                    tasks.append(DownloadTask(
                        object_key=obj.key,
                        local_path=relative_path,
                        size=obj.size
                    ))

                # 檢查是否還有更多個物件
                if not result.next_continuation_token:
                    break
                continuation_token = result.next_continuation_token

            except Exception as e:
                raise Exception(f"列舉對象失敗: {str(e)}")

        return tasks

    def download_file(self, task: DownloadTask, local_dir: str) -> DownloadResult:
        """下載單個檔案"""
        result = DownloadResult(task.object_key, size=task.size)

        try:
            # 計算完整的本地檔案路徑
            full_local_path = os.path.join(local_dir, task.local_path)

            # 建立本地檔案目錄
            os.makedirs(os.path.dirname(full_local_path), exist_ok=True)

            # 檢查檔案是否已存在且大小一致(斷點續傳)
            if os.path.exists(full_local_path):
                local_size = os.path.getsize(full_local_path)
                if local_size == task.size:
                    result.success = True
                    return result

            # 建立下載請求
            get_request = oss.GetObjectRequest(
                bucket=self.bucket,
                key=task.object_key
            )

            # 執行下載
            response = self.client.get_object(get_request)

            # 儲存檔案
            with open(full_local_path, 'wb') as f:
                with response.body as body_stream:
                    # 分塊讀取並寫入
                    for chunk in body_stream.iter_bytes(block_size=1024 * 1024):  # 1MB塊
                        if self.stop_event.is_set():
                            raise Exception("下載被中斷")
                        f.write(chunk)

            result.success = True

        except Exception as e:
            result.error = str(e)
            # 如果下載失敗,刪除不完整的檔案
            try:
                if os.path.exists(full_local_path):
                    os.remove(full_local_path)
            except:
                pass

        return result

    def batch_download(self, tasks: List[DownloadTask], local_dir: str) -> List[DownloadResult]:
        """執行批量下載"""
        results = []
        completed = 0
        total = len(tasks)

        print(f"開始下載 {total} 個檔案,使用 {self.max_workers} 個並發...")

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交所有下載任務
            future_to_task = {
                executor.submit(self.download_file, task, local_dir): task
                for task in tasks
            }

            # 處理完成的任務
            for future in as_completed(future_to_task):
                if self.stop_event.is_set():
                    break

                task = future_to_task[future]
                try:
                    result = future.result()
                    results.append(result)
                    completed += 1

                    # 顯示進度
                    if result.success:
                        print(f"✓ [{completed}/{total}] {result.object_key} ({self.format_bytes(result.size)})")
                    else:
                        print(f"✗ [{completed}/{total}] {result.object_key} - 錯誤: {result.error}")

                except Exception as e:
                    result = DownloadResult(task.object_key, error=str(e), size=task.size)
                    results.append(result)
                    completed += 1
                    print(f"✗ [{completed}/{total}] {task.object_key} - 異常: {str(e)}")

        return results

    def stop(self):
        """停止下載"""
        self.stop_event.set()
        print("\n正在停止下載...")

    @staticmethod
    def format_bytes(bytes_size: int) -> str:
        """格式化位元組數為可讀格式"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if bytes_size < 1024.0:
                return f"{bytes_size:.1f} {unit}"
            bytes_size /= 1024.0
        return f"{bytes_size:.1f} PB"

def signal_handler(signum, frame):
    """訊號處理器"""
    print(f"\n接收到訊號 {signum},正在停止...")
    if hasattr(signal_handler, 'downloader'):
        signal_handler.downloader.stop()
    sys.exit(0)

def main():
    # 建立命令列參數解析器
    parser = argparse.ArgumentParser(description="OSS 批量下載工具")

    # 添加命令列參數
    parser.add_argument('--region', help='儲存空間所在的地區', required=True)
    parser.add_argument('--bucket', help='儲存空間的名稱', required=True)
    parser.add_argument('--endpoint', help='自訂訪問網域名稱(可選)')
    parser.add_argument('--prefix', help='要下載的檔案夾首碼,Null 字元串表示下載整個儲存空間', default="")
    parser.add_argument('--local-dir', help='本地下載目錄', default="./downloads")
    parser.add_argument('--workers', help='並發下載數量', type=int, default=5)
    parser.add_argument('--max-keys', help='每次列舉的最大對象數', type=int, default=1000)

    # 解析命令列參數
    args = parser.parse_args()

    try:
        # 從環境變數中載入憑證資訊,用於身分識別驗證
        credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

        # 載入SDK的預設配置
        cfg = oss.config.load_default()
        cfg.credentials_provider = credentials_provider
        cfg.region = args.region

        # 如果提供了endpoint參數,則設定自訂endpoint
        if args.endpoint:
            cfg.endpoint = args.endpoint

        # 建立OSS用戶端
        client = oss.Client(cfg)

        # 建立本地下載目錄
        local_dir = getattr(args, 'local_dir')
        os.makedirs(local_dir, exist_ok=True)

        # 建立批量下載器
        downloader = BatchDownloader(client, args.bucket, args.workers)

        # 設定訊號處理器以支援優雅停止
        signal_handler.downloader = downloader
        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

        print(f"開始批量下載")
        print(f"儲存空間: {args.bucket}")
        print(f"首碼: '{args.prefix}' {'(整個儲存空間)' if not args.prefix else ''}")
        print(f"本地目錄: {local_dir}")
        print(f"並發數: {args.workers}")
        print("-" * 50)

        # 列舉所有需要下載的對象
        tasks = downloader.list_objects(args.prefix, getattr(args, 'max_keys'))

        if not tasks:
            print("沒有找到需要下載的檔案")
            return

        print(f"找到 {len(tasks)} 個檔案需要下載")
        print("-" * 50)

        # 執行批量下載
        start_time = time.time()
        results = downloader.batch_download(tasks, local_dir)
        end_time = time.time()

        # 統計下載結果
        success_count = sum(1 for r in results if r.success)
        fail_count = len(results) - success_count
        total_size = sum(r.size for r in results if r.success)
        duration = end_time - start_time

        print("-" * 50)
        print(f"下載完成!")
        print(f"成功: {success_count}")
        print(f"失敗: {fail_count}")
        print(f"總大小: {BatchDownloader.format_bytes(total_size)}")
        print(f"耗時: {duration:.2f} 秒")

        if fail_count > 0:
            print(f"\n失敗的檔案:")
            for result in results:
                if not result.success:
                    print(f"  - {result.object_key}: {result.error}")

    except KeyboardInterrupt:
        print("\n下載被使用者中斷")
        sys.exit(1)
    except Exception as e:
        print(f"錯誤: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    main()

使用樣本

# 下載 my-bucket 中 images/2024/ 檔案夾下的所有檔案
python batch_download.py --region cn-hangzhou --bucket my-bucket --prefix images/2024/

# 下載到指定本地目錄
python batch_download.py --region cn-hangzhou --bucket my-bucket --prefix documents/ --local-dir ./my-downloads

# 使用更多並發下載
python batch_download.py --region cn-hangzhou --bucket my-bucket --prefix videos/ --workers 10

# 下載整個儲存空間(不指定prefix或使用Null 字元串)
python batch_download.py --region cn-hangzhou --bucket my-bucket

# 或者顯式指定空首碼
python batch_download.py --region cn-hangzhou --bucket my-bucket --prefix ""

輸出樣本

程式運行時會顯示詳細的下載進度:

開始批量下載
儲存空間: my-bucket
首碼: 'images/2024/'
本地目錄: ./downloads
並發數: 5
--------------------------------------------------
正在掃描儲存空間中的檔案...
找到 150 個檔案需要下載
--------------------------------------------------
開始下載 150 個檔案,使用 5 個並發...
✓ [1/150] images/2024/photo1.jpg (2.3 MB)
✓ [2/150] images/2024/photo2.png (1.8 MB)
✗ [3/150] images/2024/photo3.gif - 錯誤: Request timeout
✓ [4/150] images/2024/subfolder/photo4.jpg (3.1 MB)
...
✓ [150/150] images/2024/thumbnails/thumb150.jpg (256.0 KB)
--------------------------------------------------
下載完成!
成功: 148
失敗: 2
總大小: 1.2 GB
耗時: 45.67 秒

失敗的檔案:
  - images/2024/photo3.gif: Request timeout
  - images/2024/corrupted.jpg: Invalid response

相關文檔