All Products
Search
Document Center

Object Storage Service:Download objects as files

Last Updated:Aug 01, 2025

This topic describes how to quickly download an object from an Object Storage Service (OSS) bucket to a local device.

Usage notes

The sample code in this topic uses the region ID cn-hangzhou of the China (Hangzhou) region. By default, a public endpoint is used to access resources in a bucket. If you want to access resources in the bucket from other Alibaba Cloud services in the same region in which the bucket is located, use an internal endpoint. For more information about the regions and endpoints supported by OSS, see Regions and endpoints.

Permissions

By default, an Alibaba Cloud account has full permissions. RAM users or RAM roles under an Alibaba Cloud account do not have any permissions by default. The Alibaba Cloud account or account administrator must grant operation permissions through RAM Policy or Bucket Policy.

API

Action

Definition

GetObject

oss:GetObject

Downloads an object.

oss:GetObjectVersion

When downloading an object, if you specify the object version through versionId, this permission is required.

kms:Decrypt

When downloading an object, if the object metadata contains X-Oss-Server-Side-Encryption: KMS, this permission is required.

Method definition

get_object(request: GetObjectRequest, **kwargs) → GetObjectResult

Request parameters

Parameter

Type

Description

request

GetObjectRequest

The request parameter. For more information, see GetObjectRequest

Response parameters

Type

Description

GetObjectResult

The return value. For details, see GetObjectResult

For the complete definition of the simple download method, see get_object.

Sample code

The following sample code downloads an object to a local device:

import argparse
import alibabacloud_oss_v2 as oss
import os

# Create a command-line parameter parser.
parser = argparse.ArgumentParser(description="get object sample")

# Specify the --region parameter to indicate the region in which the bucket is located. This parameter is required.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# Specify the --bucket parameter to indicate the name of the bucket. This command line parameter is required. 
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# Specify the --endpoint parameter to indicate the endpoint of the region in which the bucket is located. This parameter is optional.
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# Specify the --key parameter to indicate the name of the object. This parameter is required.
parser.add_argument('--key', help='The name of the object.', required=True)

def main():
    # Parse the command-line parameters.
    args = parser.parse_args()

    # Obtain access credentials from environment variables for authentication.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Load the default configuration of the SDK and specify the credential provider.
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # Specify the region in which the bucket is located.
    cfg.region = args.region

    # If an endpoint is provided, specify the endpoint in the configuration object.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Use the configuration to create an OSSClient instance.
    client = oss.Client(cfg)

    # Execute the request to download the object, and specify the bucket name and object name.
    result = client.get_object(oss.GetObjectRequest(
        bucket=args.bucket,  # Specify the name of the bucket.
        key=args.key,  # Specify object key.
    ))

    # Display the response to check whether the request is successful.
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content length: {result.content_length},'
          f' content range: {result.content_range},'
          f' content type: {result.content_type},'
          f' etag: {result.etag},'
          f' last modified: {result.last_modified},'
          f' content md5: {result.content_md5},'
          f' cache control: {result.cache_control},'
          f' content disposition: {result.content_disposition},'
          f' content encoding: {result.content_encoding},'
          f' expires: {result.expires},'
          f' hash crc64: {result.hash_crc64},'
          f' storage class: {result.storage_class},'
          f' object type: {result.object_type},'
          f' version id: {result.version_id},'
          f' tagging count: {result.tagging_count},'
          f' server side encryption: {result.server_side_encryption},'
          f' server side data encryption: {result.server_side_data_encryption},'
          f' next append position: {result.next_append_position},'
          f' expiration: {result.expiration},'
          f' restore: {result.restore},'
          f' process status: {result.process_status},'
          f' delete marker: {result.delete_marker},'
    )

    # ========== Method 1: Read the entire object ==========
    with result.body as body_stream:
        data = body_stream.read()
        print(f"The object is read. Data length: {len(data)} bytes")

        path = "./get-object-sample.txt"
        with open(path, 'wb') as f:
            f.write(data)
        print(f"The object is downloaded and saved to a local path: {path}")

    # # ========== Method 2: Read the object in chunks ==========
    # with result.body as body_stream:
    #     chunk_path = "./get-object-sample-chunks.txt"
    #     total_size = 0

    #     with open(chunk_path, 'wb') as f:
    #         # Use 256KB block size (you can adjust the block_size parameter as needed)
    #         for chunk in body_stream.iter_bytes(block_size=256 * 1024):
    #             f.write(chunk)
    #             total_size += len(chunk)
    #             print(f"Received data block: {len(chunk)} bytes | Total: {total_size} bytes")

    #     print(f"The object is downloaded and saved to the local path: {chunk_path}")

# Call the main function when the script is directly run.
if __name__ == "__main__":
    main()  # The entry point of the script. When the script is directly run, the main function is called.

Common scenarios

Conditional download

When you download a single object from a bucket, you can specify conditions based on the last modified time or the ETag of the object. The object is downloaded only if these conditions are met. Otherwise, an error is returned, and the download operation is not triggered. This reduces unnecessary network transmission and resource consumption, and improves the download efficiency.

The following table describes the available conditions.

Note
  • if_modified_since and if_unmodified_since can coexist. if_match and if_none_match can also coexist.

  • You can use the client.get_object_meta method to obtain the ETag.

Parameter

Description

if_modified_since

If the specified time is earlier than the time when an object was last modified, the object can be downloaded. Otherwise, 304 Not Modified is returned.

if_unmodified_since

If the specified time is later than or equal to the time when an object was last modified, the object can be downloaded. Otherwise, 412 Precondition Failed is returned.

if_match

If the specified ETag matches that of an object, the object can be downloaded. Otherwise, 412 Precondition Failed is returned.

if_none_match

If the specified ETag does not match that of an object, the object can be downloaded. Otherwise, 304 Not Modified is returned.

The following sample code provides an example on how to implement a conditional download:

import argparse
import alibabacloud_oss_v2 as oss
from datetime import datetime, timezone

# Create a parser for parsing command-line arguments and describe the purpose of the script.
parser = argparse.ArgumentParser(description="get object to file sample")

# Specify the --region parameter to indicate the region in which the bucket is located. This parameter is required.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# Specify the --bucket parameter to indicate the name of the bucket in which the object is stored. This parameter is required.
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# Specify the --endpoint parameter to indicate the endpoint of the region in which the bucket is located. This parameter is optional.
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# Specify the --key parameter to indicate the name of the object. This parameter is required.
parser.add_argument('--key', help='The name of the object.', required=True)
# Specify the --file_path parameter to indicate the path to which the object is downloaded. This parameter is required.
parser.add_argument('--file_path', help='The path of the file to save the downloaded content.', required=True)

def main():
    # Parse the command-line parameters to obtain the specified values.
    args = parser.parse_args()

    # Obtain access credentials from environment variables for authentication.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configuration to create a configuration object (cfg) and specify the credential provider.
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    
    # Set the region attribute of the cfg object to the region in the parser.
    cfg.region = args.region

    # If a custom endpoint is provided, update the endpoint attribute of the cfg object with the provided endpoint.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Use the preceding configuration to initialize the OSSClient instance.
    client = oss.Client(cfg)

    # Define if_modified_since time.
    # Only objects modified after this time will be returned.
    if_modified_since = datetime(2024, 10, 1, 12, 0, 0, tzinfo=timezone.utc)

    # Assuming the ETag is DA5223EFCD7E0353BE08866700000000, if the specified ETag matches the ETag of the object, the IfMatch condition is met and the download will be triggered.
    etag = "\"DA5223EFCD7E0353BE08866700000000\""

    # Execute the request to download the object and save it to a local file.
    result = client.get_object_to_file(
        oss.GetObjectRequest(
            bucket=args.bucket,  # Specify the name of the bucket.
            key=args.key,        # Specify the object key.
            if_modified_since=if_modified_since,  # Only objects modified after the specified time will be returned.
            if_match=etag,       # Only objects with matching ETag will be returned.
        ),
        args.file_path  # Specify the local path to save the downloaded file.
    )

    # Display the response information, including status code, request ID, etc.
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content length: {result.content_length},'
          f' content range: {result.content_range},'
          f' content type: {result.content_type},'
          f' etag: {result.etag},'
          f' last modified: {result.last_modified},'
          f' content md5: {result.content_md5},'
          f' cache control: {result.cache_control},'
          f' content disposition: {result.content_disposition},'
          f' content encoding: {result.content_encoding},'
          f' expires: {result.expires},'
          f' hash crc64: {result.hash_crc64},'
          f' storage class: {result.storage_class},'
          f' object type: {result.object_type},'
          f' version id: {result.version_id},'
          f' tagging count: {result.tagging_count},'
          f' server side encryption: {result.server_side_encryption},'
          f' server side data encryption: {result.server_side_data_encryption},'
          f' next append position: {result.next_append_position},'
          f' expiration: {result.expiration},'
          f' restore: {result.restore},'
          f' process status: {result.process_status},'
          f' delete marker: {result.delete_marker},'
          f' server time: {result.headers.get("x-oss-server-time")},'
    )

# Call the main function to start the processing logic when the script is directly run.
if __name__ == "__main__":
    main()  # Specify the entry point of the script. The control flow starts here.

Display download progress

When you download an object, you can use a progress bar to monitor the download progress in real time. Progress monitoring helps you check whether a download task is stuck if it takes a long time to complete.

The following sample code demonstrates how to use a progress bar to view the download progress when downloading an object to a local file using get_object_to_file.

import argparse
import alibabacloud_oss_v2 as oss

# Create a parser for parsing command-line arguments and describe the purpose of the script.
parser = argparse.ArgumentParser(description="get object sample")

# Specify the --region parameter to indicate the region in which the bucket is located. This parameter is required.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# Specify the --bucket parameter to indicate the name of the bucket in which the object is stored. This parameter is required.
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# Specify the --endpoint parameter to indicate the endpoint of the region in which the bucket is located. This parameter is optional.
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# Specify the --key parameter to indicate the name of the object. This parameter is required. 
parser.add_argument('--key', help='The name of the object.', required=True)

def main():
    # Parse the command-line parameters to obtain the specified values.
    args = parser.parse_args()

    # Obtain access credentials from environment variables for authentication.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configuration to create a configuration object (cfg) and specify the credential provider.
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # Set the region attribute of the cfg object to the region in the parser.
    cfg.region = args.region

    # If a custom endpoint is provided, update the endpoint attribute of the cfg object with the provided endpoint.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Use the preceding configuration to initialize the OSSClient instance.
    client = oss.Client(cfg)

    # Define a dictionary variable progress_state to store the download progress status, with an initial value of 0.
    progress_state = {'saved': 0}
    
    # Define the progress callback function _progress_fn.
    def _progress_fn(n, written, total):
        # Use a dictionary to store the accumulated written bytes.
        progress_state['saved'] += n

        # Calculate the current download percentage by dividing the number of bytes written by the total bytes and truncating the result to an integer.
        rate = int(100 * (float(written) / float(total)))

        # Print the current download progress. Use \r to return to the start of the line, achieving real-time refresh in the terminal.
        # end='' suppresses line breaks, enabling subsequent prints to overwrite the current line.
        print(f'\r{rate}% ', end='')

    # Execute the request to download the object, and specify the bucket name, object name and the progress callback function.
    result = client.get_object_to_file(
        oss.GetObjectRequest(
            bucket=args.bucket,  # Specify the name of the bucket.
            key=args.key,        # Specify the object key.
            progress_fn=_progress_fn, # Specify the progress callback function.
        ),
        "/local/dir/example", # Specify the local path to save the file.
    )

    # Display the response information.
    print(vars(result))

# Call the main function to start the processing logic when the script is directly run.
if __name__ == "__main__":
    main()  # Specify the entry point of the script. The control flow starts here.

The following sample code demonstrates how to use a progress bar to view the download progress when streaming an object using get_object.

import argparse
import alibabacloud_oss_v2 as oss
import os

# Create a command-line parameter parser.
parser = argparse.ArgumentParser(description="get object sample")

# Specify the --region parameter to indicate the region in which the bucket is located. This parameter is required.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# Specify the --bucket parameter to indicate the name of the bucket. This command line parameter is required.
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# Specify the --endpoint parameter to indicate the endpoint of the region in which the bucket is located. This parameter is optional.
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# Specify the --key parameter to indicate the name of the object. This parameter is required.
parser.add_argument('--key', help='The name of the object.', required=True)

def main():
    # Parse the command line parameters.
    args = parser.parse_args()

    # Obtain access credentials from environment variables for authentication.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Load the default configuration of the SDK and specify the credential provider.
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # Specify the region in which the bucket is located.
    cfg.region = args.region

    # If an endpoint is provided, specify the endpoint in the configuration object.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Use the configuration to create an OSSClient instance.
    client = oss.Client(cfg)

    # Execute the request to download the object, and specify the bucket name and object name.
    result = client.get_object(oss.GetObjectRequest(
        bucket=args.bucket,  # Specify the name of the bucket.
        key=args.key,  # Specify the object key.
    ))

    # Get the total size of the file in bytes from the object response result.
    total_size = result.content_length

    # Initialize the progress counter to 0, used to record the amount of data downloaded.
    progress_save_n = 0

    # Iterate through data blocks in the response body to implement block-by-block data reading.
    for d in result.body.iter_bytes():
        # Accumulate the length of the current data block into the total downloaded amount.
        progress_save_n += len(d)

        # Calculate the current download percentage by computing the ratio of the downloaded amount to the total size and rounding to the nearest integer.
        rate = int(100 * (float(progress_save_n) / float(total_size)))

        # Print the current download progress. Utilize \r (carriage return) to reposition the cursor at the beginning of the line, enabling real-time progress updates within the command-line interface.
        # end='' suppresses line breaks, enabling subsequent prints to overwrite the current line.
        print(f'\r{rate}% ', end='')

    # Print all attributes of the result object for debugging purposes or to examine the complete response content.
    print(vars(result))


# Call the main function to start the processing logic when the script is directly run.
if __name__ == "__main__":
    main()  # Specify the entry point of the script. The control flow starts here.

Batch download objects to local files

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import alibabacloud_oss_v2 as oss
import os
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Tuple, Optional
import signal

class DownloadTask:
    """Download task class"""
    def __init__(self, object_key: str, local_path: str, size: int):
        self.object_key = object_key
        self.local_path = local_path
        self.size = size

class DownloadResult:
    """Download result class"""
    def __init__(self, object_key: str, success: bool = False, error: Optional[str] = None, size: int = 0):
        self.object_key = object_key
        self.success = success
        self.error = error
        self.size = size

class BatchDownloader:
    """Batch downloader"""

    def __init__(self, client: oss.Client, bucket: str, max_workers: int = 5):
        self.client = client
        self.bucket = bucket
        self.max_workers = max_workers
        self.stop_event = threading.Event()

    def list_objects(self, prefix: str = "", max_keys: int = 1000) -> List[DownloadTask]:
        """List all objects with the specified prefix in the bucket"""
        tasks = []
        continuation_token = None

        print(f"Scanning files in the bucket...")

        while not self.stop_event.is_set():
            try:
                # Create a request to list objects。
                request = oss.ListObjectsV2Request(
                    bucket=self.bucket,
                    prefix=prefix,
                    max_keys=max_keys,
                    continuation_token=continuation_token
                )

                # Execute the list operation.
                result = self.client.list_objects_v2(request)

                # Process the list results.
                for obj in result.contents:
                    # Skip folder objects (identified by a trailing '/' suffix and 0-byte size).
                    if obj.key.endswith('/') and obj.size == 0:
                        continue

                    # Calculate the local file path.
                    relative_path = obj.key[len(prefix):] if prefix else obj.key

                    tasks.append(DownloadTask(
                        object_key=obj.key,
                        local_path=relative_path,
                        size=obj.size
                    ))

                # Check if there are more objects.
                if not result.next_continuation_token:
                    break
                continuation_token = result.next_continuation_token

            except Exception as e:
                raise Exception(f"Failed to list objects: {str(e)}")

        return tasks

    def download_file(self, task: DownloadTask, local_dir: str) -> DownloadResult:
        """Download a single file"""
        result = DownloadResult(task.object_key, size=task.size)

        try:
            # Calculate the full local file path.
            full_local_path = os.path.join(local_dir, task.local_path)

            # Create the local file directory.
            os.makedirs(os.path.dirname(full_local_path), exist_ok=True)

            # Verify file existence and validate size consistency to enable resumable transfers.
            if os.path.exists(full_local_path):
                local_size = os.path.getsize(full_local_path)
                if local_size == task.size:
                    result.success = True
                    return result

            # Create download request.
            get_request = oss.GetObjectRequest(
                bucket=self.bucket,
                key=task.object_key
            )

            # Execute download operation.
            response = self.client.get_object(get_request)

            # Save the file.
            with open(full_local_path, 'wb') as f:
                with response.body as body_stream:
                    # Read and write in blocks
                    for chunk in body_stream.iter_bytes(block_size=1024 * 1024):  # 1MB blocks
                        if self.stop_event.is_set():
                            raise Exception("Download interrupted")
                        f.write(chunk)

            result.success = True

        except Exception as e:
            result.error = str(e)
            # If download fails, delete the incomplete file.
            try:
                if os.path.exists(full_local_path):
                    os.remove(full_local_path)
            except:
                pass

        return result

    def batch_download(self, tasks: List[DownloadTask], local_dir: str) -> List[DownloadResult]:
        """Execute batch download"""
        results = []
        completed = 0
        total = len(tasks)

        print(f"Starting to download {total} files using {self.max_workers} concurrent threads...")

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all download tasks.
            future_to_task = {
                executor.submit(self.download_file, task, local_dir): task
                for task in tasks
            }

            # Process completed tasks.
            for future in as_completed(future_to_task):
                if self.stop_event.is_set():
                    break

                task = future_to_task[future]
                try:
                    result = future.result()
                    results.append(result)
                    completed += 1

                    # Display the progress.
                    if result.success:
                        print(f"✓ [{completed}/{total}] {result.object_key} ({self.format_bytes(result.size)})")
                    else:
                        print(f"✗ [{completed}/{total}] {result.object_key} - Error: {result.error}")

                except Exception as e:
                    result = DownloadResult(task.object_key, error=str(e), size=task.size)
                    results.append(result)
                    completed += 1
                    print(f"✗ [{completed}/{total}] {task.object_key} - Exception: {str(e)}")

        return results

    def stop(self):
        """Stop download"""
        self.stop_event.set()
        print("\nStopping download...")

    @staticmethod
    def format_bytes(bytes_size: int) -> str:
        """Format bytes to readable format"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if bytes_size < 1024.0:
                return f"{bytes_size:.1f} {unit}"
            bytes_size /= 1024.0
        return f"{bytes_size:.1f} PB"

def signal_handler(signum, frame):
    """Signal handler"""
    print(f"\nReceived signal {signum}, stopping...")
    if hasattr(signal_handler, 'downloader'):
        signal_handler.downloader.stop()
    sys.exit(0)

def main():
    # Create a command line parameter parser.
    parser = argparse.ArgumentParser(description="OSS Batch Download Tool")

    # Add command line parameters.
    parser.add_argument('--region', help='The region in which the bucket is located', required=True)
    parser.add_argument('--bucket', help='The name of the bucket', required=True)
    parser.add_argument('--endpoint', help='Custom endpoint (optional)')
    parser.add_argument('--prefix', help='Folder prefix to download, empty string means download the entire bucket', default="")
    parser.add_argument('--local-dir', help='Local download directory', default="./downloads")
    parser.add_argument('--workers', help='Number of concurrent downloads', type=int, default=5)
    parser.add_argument('--max-keys', help='Maximum number of objects to list at once', type=int, default=1000)

    # Parse the command line parameters.
    args = parser.parse_args()

    try:
        # Obtain access credentials from environment variables for authentication.
        credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

        # Load the default configurations of the SDK.
        cfg = oss.config.load_default()
        cfg.credentials_provider = credentials_provider
        cfg.region = args.region

        # If the endpoint parameter is provided, specify the custom endpoint.
        if args.endpoint:
            cfg.endpoint = args.endpoint

        # Create an OSS client.
        client = oss.Client(cfg)

        # Create local download directory.
        local_dir = getattr(args, 'local_dir')
        os.makedirs(local_dir, exist_ok=True)

        # Create batch downloader.
        downloader = BatchDownloader(client, args.bucket, args.workers)

        # Configure a signal handler to enable graceful shutdown procedures.
        signal_handler.downloader = downloader
        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

        print(f"Starting batch download")
        print(f"Bucket: {args.bucket}")
        print(f"Prefix: '{args.prefix}' {'(entire bucket)' if not args.prefix else ''}")
        print(f"Local directory: {local_dir}")
        print(f"Concurrency: {args.workers}")
        print("-" * 50)

        # List all objects to download.
        tasks = downloader.list_objects(args.prefix, getattr(args, 'max_keys'))

        if not tasks:
            print("No files found to download")
            return

        print(f"Found {len(tasks)} files to download")
        print("-" * 50)

        # Execute batch download.
        start_time = time.time()
        results = downloader.batch_download(tasks, local_dir)
        end_time = time.time()

        # Summarize download results.
        success_count = sum(1 for r in results if r.success)
        fail_count = len(results) - success_count
        total_size = sum(r.size for r in results if r.success)
        duration = end_time - start_time

        print("-" * 50)
        print(f"Download complete!")
        print(f"Success: {success_count}")
        print(f"Failed: {fail_count}")
        print(f"Total size: {BatchDownloader.format_bytes(total_size)}")
        print(f"Duration: {duration:.2f} seconds")

        if fail_count > 0:
            print(f"\nFailed files:")
            for result in results:
                if not result.success:
                    print(f"  - {result.object_key}: {result.error}")

    except KeyboardInterrupt:
        print("\nDownload interrupted by user")
        sys.exit(1)
    except Exception as e:
        print(f"Error: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Usage examples

# Download all objects with the prefix images/2024/ from the bucket my-bucket.
python batch_download.py --region cn-hangzhou --bucket my-bucket --prefix images/2024/

# Download to a specified local directory.
python batch_download.py --region cn-hangzhou --bucket my-bucket --prefix documents/ --local-dir ./my-downloads

# Increase download concurrency for enhanced throughput.
python batch_download.py --region cn-hangzhou --bucket my-bucket --prefix videos/ --workers 10

# Download all objects in the bucket by either omitting the prefix parameter or specifying an empty string as the prefix.
python batch_download.py --region cn-hangzhou --bucket my-bucket

# Alternatively, explicitly specify an empty prefix。
python batch_download.py --region cn-hangzhou --bucket my-bucket --prefix ""

Output example

The program displays detailed download progress during execution:

Starting batch download
Bucket: my-bucket
Prefix: 'images/2024/'
Local directory: ./downloads
Concurrency: 5
--------------------------------------------------
Scanning files in the bucket...
Found 150 files to download
--------------------------------------------------
Starting to download 150 files using 5 concurrent threads...
✓ [1/150] images/2024/photo1.jpg (2.3 MB)
✓ [2/150] images/2024/photo2.png (1.8 MB)
✗ [3/150] images/2024/photo3.gif - Error: Request timeout
✓ [4/150] images/2024/subfolder/photo4.jpg (3.1 MB)
...
✓ [150/150] images/2024/thumbnails/thumb150.jpg (256.0 KB)
--------------------------------------------------
Download complete!
Success: 148
Failed: 2
Total size: 1.2 GB
Duration: 45.67 seconds

Failed files:
  - images/2024/photo3.gif: Request timeout
  - images/2024/corrupted.jpg: Invalid response

Reference