All Products
Search
Document Center

Object Storage Service:Multipart copy (Python SDK V2)

Last Updated:Jul 31, 2025

This topic describes how to use the UploadPartCopy method of Python SDK V2 to copy multiple parts from a source object to a destination bucket in the same region. The parts are then merged to create a complete object.

Precautions

  • The sample code in this topic uses the China (Hangzhou) region (cn-hangzhou) as an example. The public endpoint is used by default. If you want to access OSS from other Alibaba Cloud services in the same region, use an internal endpoint. For more information about OSS regions and their corresponding endpoints, see OSS regions and endpoints.

  • To copy an object, you must have read permissions on the source object and read and write permissions on the destination bucket.

  • Cross-region copy is not supported. For example, you cannot copy an object from a bucket in the China (Hangzhou) region to a bucket in the China (Qingdao) region.

  • When you copy an object, ensure that no retention policy is configured for the source or destination bucket. Otherwise, the The object you specified is immutable. error is reported.

Method definition

upload_part_copy(request: UploadPartCopyRequest, **kwargs) → UploadPartCopyResult

Request parameters

Parameter

Type

Description

request

UploadPartCopyRequest

The request parameters. For more information, see UploadPartCopyRequest

Return values

Type

Description

UploadPartCopyResult

The return value. For more information, see UploadPartCopyResult

For the complete definition of the multipart copy method, see upload_part_copy.

Multipart copy procedure

A multipart copy involves the following three steps:

  1. Initialize a multipart upload event.

    Call the client.initiate_multipart_upload method to obtain a globally unique upload ID from OSS.

  2. Copy the parts.

    Call the client.upload_part_copy method to copy each part.

    Note
    • For the same upload ID, the part number identifies the relative position of the part in the complete object. If you upload a new part with the same part number, the existing data for that part is overwritten.

    • OSS returns the MD5 hash of the received part data in the ETag header.

    • OSS calculates the MD5 hash of the copied part. If the MD5 hash does not match the expected value, the InvalidDigest error code is returned.

  3. Complete the multipart upload.

    After all parts are copied, call the client.complete_multipart_upload method to merge the parts into a complete object.

Sample code

The following sample code shows how to copy multiple parts from a source object to a destination bucket and then merge them into a complete object.

import argparse
import alibabacloud_oss_v2 as oss

# Create a command line argument parser and describe the purpose of the script: synchronous multipart copy and upload sample
parser = argparse.ArgumentParser(description="upload part copy synchronously sample")

# Add the --region command line parameter, which indicates the region where the bucket is located. This parameter is required.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# Add the --bucket command line parameter, which indicates the name of the destination bucket to which the object is to be uploaded. This parameter is required.
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# Add the --endpoint command line parameter, which indicates the domain name that other services can use to access OSS. This parameter is optional.
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# Add the --key command line parameter, which indicates the key of the destination object in OSS. This parameter is required.
parser.add_argument('--key', help='The name of the object.', required=True)
# Add the --source_bucket command line parameter, which indicates the name of the bucket where the source object is located. This parameter is required.
parser.add_argument('--source_bucket', help='The name of the source bucket.', required=True)
# Add the --source_key command line parameter, which indicates the key of the source object in OSS. This parameter is required.
parser.add_argument('--source_key', help='The name of the source object.', required=True)

def main():
    # Parse the parameters provided on the command line to obtain the values entered by the user.
    args = parser.parse_args()

    # Load the authentication information required to access OSS from environment variables for identity verification.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configurations of the SDK to create a configuration object and set the authentication provider.
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = args.region

    # If a custom endpoint is provided, update the endpoint property in the configuration object.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Use the preceding configurations to initialize the OSS client and prepare for interaction with OSS.
    client = oss.Client(cfg)

    # Obtain the metadata of the source object.
    result_meta = client.get_object_meta(oss.GetObjectMetaRequest(
        bucket=args.source_bucket,
        key=args.source_key,
    ))

    # Initialize a multipart upload request and return an upload ID to identify the process.
    result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
    ))

    # Define the size of each part. In this example, the size is set to 1 MB.
    part_size = 1024 * 1024
    total_size = result_meta.content_length  # The total size of the source file.
    part_number = 1  # The part number starts from 1.
    upload_parts = []  # Used to store the information of uploaded parts.
    offset = 0  # The current byte offset.

    # Loop until all data is uploaded.
    while offset < total_size:
        num_to_upload = min(part_size, total_size - offset)  # Calculate the amount of data to be uploaded this time.
        end = offset + num_to_upload - 1  # Determine the end position.

        # Perform the actual multipart copy and upload operation.
        up_result = client.upload_part_copy(oss.UploadPartCopyRequest(
            bucket=args.bucket,
            key=args.key,
            upload_id=result.upload_id,
            part_number=part_number,
            source_bucket=args.source_bucket,
            source_key=args.source_key,
            source_range=f'bytes={offset}-{end}',  # Specify the range in the source object.
        ))

        # Print the status information of the uploaded part.
        print(f'status code: {up_result.status_code},'
              f' request id: {up_result.request_id},'
              f' part number: {part_number},'
              f' last modified: {up_result.last_modified},'
              f' etag: {up_result.etag},'
              f' source version id: {up_result.source_version_id}'
        )

        # Record the information of the successfully uploaded part.
        upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))
        offset += num_to_upload  # Update the offset.
        part_number += 1  # Update the part number.

    # Sort all uploaded parts by part number.
    parts = sorted(upload_parts, key=lambda p: p.part_number)

    # Send a request to the OSS service to complete the multipart upload.
    result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
        upload_id=result.upload_id,
        complete_multipart_upload=oss.CompleteMultipartUpload(
            parts=parts
        )
    ))

    # Print the detailed results after the upload is complete.
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' bucket: {result.bucket},'
          f' key: {result.key},'
          f' location: {result.location},'
          f' etag: {result.etag},'
          f' encoding type: {result.encoding_type},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id}'
    )

# When this script is directly executed, call the main function to start processing the logic.
if __name__ == "__main__":
    main()  # The entry point of the script. The program flow starts from here.

References