全部產品
Search
文件中心

Object Storage Service:分區拷貝(Python SDK V2)

更新時間:Jul 31, 2025

本文介紹如何使用Python SDK V2的UploadPartCopy方法,將源Bucket中的多個分區檔案拷貝到同一地區下相同或不同目標Bucket中,然後合并成一個完整的檔案對象。

注意事項

  • 本文範例程式碼以華東1(杭州)的地區IDcn-hangzhou為例,預設使用外網Endpoint,如果您希望通過與OSS同地區的其他阿里雲產品訪問OSS,請使用內網Endpoint。關於OSS支援的Region與Endpoint的對應關係,請參見OSS地區和訪問網域名稱

  • 要進行拷貝檔案,您必須擁有源檔案的讀許可權及目標Bucket的讀寫權限。

  • 不支援跨地區拷貝。例如不能將華東1(杭州)地區儲存空間中的檔案拷貝到華北1(青島)地區。

  • 拷貝檔案時,您需要確保源Bucket和目標Bucket均未設定合規保留原則,否則報錯The object you specified is immutable.

方法定義

upload_part_copy(request: UploadPartCopyRequest, **kwargs) → UploadPartCopyResult

請求參數列表

參數名

類型

說明

request

UploadPartCopyRequest

佈建要求參數,具體請參見UploadPartCopyRequest

傳回值列表

類型

說明

UploadPartCopyResult

傳回值,具體請參見UploadPartCopyResult

關於分區拷貝方法的完整定義,請參見upload_part_copy

分區拷貝流程

分區拷貝分為以下三個步驟:

  1. 初始化一個分區上傳事件。

    調用client.initiate_multipart_upload方法返回OSS建立的全域唯一的uploadID。

  2. 上傳分區。

    調用client.upload_part_copy方法上傳分區資料。

    說明
    • 對於同一個uploadID,分區號(partNumber)標識了該分區在整個檔案內的相對位置。如果使用同一個分區號上傳了新的資料,那麼OSS上該分區已有的資料將會被覆蓋。

    • OSS將收到的分區資料的MD5值放在ETag頭內返回給使用者。

    • OSS計算上傳資料的MD5值,並與SDK計算的MD5值比較,如果不一致則返回InvalidDigest錯誤碼。

  3. 完成分區上傳。

    所有分區上傳完成後,調用client.complete_multipart_upload方法將所有分區合并成完整的檔案。

範例程式碼

您可以使用以下代碼將多個分區檔案從源儲存空間拷貝到目標儲存空間,然後合并成完整的檔案對象。

import argparse
import alibabacloud_oss_v2 as oss

# 建立命令列參數解析器,並描述指令碼用途:同步分區拷貝上傳樣本
parser = argparse.ArgumentParser(description="upload part copy synchronously sample")

# 添加命令列參數 --region,表示儲存空間所在的地區,必需參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令列參數 --bucket,表示要上傳對象的目標儲存空間名稱,必需參數
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令列參數 --endpoint,表示其他服務可用來訪問OSS的網域名稱,非必需參數
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令列參數 --key,表示目標對象在OSS中的鍵名,必需參數
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令列參數 --source_bucket,表示來源物件所在儲存空間的名稱,必需參數
parser.add_argument('--source_bucket', help='The name of the source bucket.', required=True)
# 添加命令列參數 --source_key,表示來源物件在OSS中的鍵名,必需參數
parser.add_argument('--source_key', help='The name of the source object.', required=True)

def main():
    # 解析命令列提供的參數,擷取使用者輸入的值
    args = parser.parse_args()

    # 從環境變數中載入訪問OSS所需的認證資訊,用於身分識別驗證
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的預設配置建立設定物件,並設定認證提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = args.region

    # 如果提供了自訂endpoint,則更新設定物件中的endpoint屬性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS用戶端,準備與OSS互動
    client = oss.Client(cfg)

    # 擷取來源物件的中繼資料
    result_meta = client.get_object_meta(oss.GetObjectMetaRequest(
        bucket=args.source_bucket,
        key=args.source_key,
    ))

    # 初始化一個多部分上傳請求,返回一個UploadId用於標識這個過程
    result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
    ))

    # 定義每個分區的大小(此處設定為1MB)
    part_size = 1024 * 1024
    total_size = result_meta.content_length  # 源檔案總大小
    part_number = 1  # 分區編號從1開始
    upload_parts = []  # 用來儲存已上傳分區的資訊
    offset = 0  # 當前處理到的位元組位移量

    # 迴圈處理直到所有資料都被上傳
    while offset < total_size:
        num_to_upload = min(part_size, total_size - offset)  # 計算本次要上傳的資料量
        end = offset + num_to_upload - 1  # 確定結束位置

        # 執行實際的分區拷貝上傳操作
        up_result = client.upload_part_copy(oss.UploadPartCopyRequest(
            bucket=args.bucket,
            key=args.key,
            upload_id=result.upload_id,
            part_number=part_number,
            source_bucket=args.source_bucket,
            source_key=args.source_key,
            source_range=f'bytes={offset}-{end}',  # 指定來源物件中的範圍
        ))

        # 輸出該分區上傳的狀態資訊
        print(f'status code: {up_result.status_code},'
              f' request id: {up_result.request_id},'
              f' part number: {part_number},'
              f' last modified: {up_result.last_modified},'
              f' etag: {up_result.etag},'
              f' source version id: {up_result.source_version_id}'
        )

        # 將成功上傳的分區資訊記錄下來
        upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))
        offset += num_to_upload  # 更新位移量
        part_number += 1  # 更新分區編號

    # 對所有已上傳的分區按其編號排序
    parts = sorted(upload_parts, key=lambda p: p.part_number)

    # 向OSS服務發送請求,通知其完成多部分上傳
    result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
        upload_id=result.upload_id,
        complete_multipart_upload=oss.CompleteMultipartUpload(
            parts=parts
        )
    ))

    # 輸出最終完成上傳後的詳細結果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' bucket: {result.bucket},'
          f' key: {result.key},'
          f' location: {result.location},'
          f' etag: {result.etag},'
          f' encoding type: {result.encoding_type},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id}'
    )

# 當此指令碼被直接執行時,調用main函數開始處理邏輯
if __name__ == "__main__":
    main()  # 指令碼進入點,控製程序流程從這裡開始

相關文檔