全部產品
Search
文件中心

Object Storage Service:Python分區上傳

更新時間:Jul 04, 2025

OSS提供的分區上傳(Multipart Upload)功能,將要上傳的較大檔案(Object)分成多個分區(Part)來分別上傳,上傳完成後再調用CompleteMultipartUpload介面將這些Part組合成一個Object。

注意事項

  • 本文範例程式碼以華東1(杭州)的地區IDcn-hangzhou為例,預設使用外網Endpoint,如果您希望通過與OSS同地區的其他阿里雲產品訪問OSS,請使用內網Endpoint。關於OSS支援的Region與Endpoint的對應關係,請參見OSS地區和訪問網域名稱

  • 要分區上傳,您必須有oss:PutObject許可權。具體操作,請參見為RAM使用者授予自訂的權限原則

分區上傳流程

分區上傳(Multipart Upload)分為以下三個步驟:

  1. 初始化一個分區上傳事件。

    調用Client.InitiateMultipartUpload方法返回OSS建立的全域唯一的uploadID。

  2. 上傳分區。

    調用Client.UploadPart方法上傳分區資料。

    說明
    • 對於同一個uploadID,分區號(partNumber)標識了該分區在整個檔案內的相對位置。如果使用同一個分區號上傳了新的資料,那麼OSS上該分區已有的資料將會被覆蓋。

    • OSS將收到的分區資料的MD5值放在ETag頭內返回給使用者。

    • OSS計算上傳資料的MD5值,並與SDK計算的MD5值比較,如果不一致則返回InvalidDigest錯誤碼。

  3. 完成分區上傳。

    所有分區上傳完成後,調用Client.CompleteMultipartUpload方法將所有分區合并成完整的檔案。

範例程式碼

以下代碼展示如何將本地的大檔案分割成多個分區檔案並發上傳到儲存空間,然後合并成完整的檔案對象。

import os
import argparse
import alibabacloud_oss_v2 as oss

# 建立命令列參數解析器,用於分區上傳樣本。
parser = argparse.ArgumentParser(description="multipart upload sample")

# 添加命令列參數 --region,表示儲存空間所在的地區,必需參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)

# 添加命令列參數 --bucket,表示儲存空間的名稱,必需參數
parser.add_argument('--bucket', help='The name of the bucket.', required=True)

# 添加命令列參數 --endpoint,表示其他服務可用來訪問OSS的網域名稱,非必需參數
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')

# 添加命令列參數 --key,表示對象的名稱,必需參數
parser.add_argument('--key', help='The name of the object.', required=True)

# 添加命令列參數 --file_path,表示要上傳的檔案路徑,必需參數
parser.add_argument('--file_path', help='The path of Upload file.', required=True)


def main():
    # 解析命令列參數
    args = parser.parse_args()

    # 從環境變數中載入憑證資訊,用於身分識別驗證
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的預設配置,並設定憑證提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 設定配置中的地區資訊
    cfg.region = args.region

    # 如果提供了endpoint參數,則設定配置中的endpoint
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用配置好的資訊建立OSS用戶端
    client = oss.Client(cfg)

    # 初始化分區上傳請求,擷取upload_id用於後續分區上傳
    result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
    ))

    # 定義每個分區的大小為5MB
    part_size = 5 * 1024 * 1024

    # 擷取要上傳檔案的總大小
    data_size = os.path.getsize(args.file_path)

    # 初始化分區編號,從1開始
    part_number = 1

    # 儲存每個分區上傳的結果
    upload_parts = []

    # 開啟檔案以二進位模式讀取
    with open(args.file_path, 'rb') as f:
        # 遍曆檔案,按照part_size分區上傳
        for start in range(0, data_size, part_size):
            n = part_size
            if start + n > data_size:  # 處理最後一個分區可能小於part_size的情況
                n = data_size - start

            # 建立SectionReader來讀取檔案的特定部分
            reader = oss.io_utils.SectionReader(oss.io_utils.ReadAtReader(f), start, n)

            # 上傳分區
            up_result = client.upload_part(oss.UploadPartRequest(
                bucket=args.bucket,
                key=args.key,
                upload_id=result.upload_id,
                part_number=part_number,
                body=reader
            ))

            # 列印每個分區上傳的結果資訊
            print(f'status code: {up_result.status_code},'
                  f' request id: {up_result.request_id},'
                  f' part number: {part_number},'
                  f' content md5: {up_result.content_md5},'
                  f' etag: {up_result.etag},'
                  f' hash crc64: {up_result.hash_crc64},'
                  )

            # 將分區上傳結果儲存到列表中
            upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))

            # 增加分區編號
            part_number += 1

    # 對上傳的分區按照分區編號排序
    parts = sorted(upload_parts, key=lambda p: p.part_number)

    # 發送完成分區上傳請求,合并所有分區為一個完整的對象
    result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
        upload_id=result.upload_id,
        complete_multipart_upload=oss.CompleteMultipartUpload(
            parts=parts
        )
    ))

    # 下面的代碼是另一種方式,通過伺服器端列出併合並所有分區資料為一個完整的對象
    # 這種方法適用於當您不確定所有分區是否都已成功上傳時
    # Merge fragmented data into a complete Object through the server-side List method
    # result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
    #     bucket=args.bucket,
    #     key=args.key,
    #     upload_id=result.upload_id,
    #     complete_all='yes'
    # ))

    # 輸出完成分區上傳的結果資訊
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' bucket: {result.bucket},'
          f' key: {result.key},'
          f' location: {result.location},'
          f' etag: {result.etag},'
          f' encoding type: {result.encoding_type},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
    )

if __name__ == "__main__":
    main()  # 指令碼入口,當檔案被直接運行時調用main函數

常見使用情境

分區上傳並設定上傳回調

如果您希望在檔案分區上傳後通知應用伺服器,可參考以下程式碼範例。

import os
import argparse
import base64
import alibabacloud_oss_v2 as oss

# 建立命令列參數解析器,用於分區上傳樣本。
parser = argparse.ArgumentParser(description="multipart upload sample")

# 添加命令列參數 --region,表示儲存空間所在的地區,必需參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)

# 添加命令列參數 --bucket,表示儲存空間的名稱,必需參數
parser.add_argument('--bucket', help='The name of the bucket.', required=True)

# 添加命令列參數 --endpoint,表示其他服務可用來訪問OSS的網域名稱,非必需參數
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')

# 添加命令列參數 --key,表示對象的名稱,必需參數
parser.add_argument('--key', help='The name of the object.', required=True)

# 添加命令列參數 --file_path,表示要上傳的檔案路徑,必需參數
parser.add_argument('--file_path', help='The path of Upload file.', required=True)


def main():
    # 解析命令列參數
    args = parser.parse_args()

    # 從環境變數中載入憑證資訊,用於身分識別驗證
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的預設配置,並設定憑證提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 設定配置中的地區資訊
    cfg.region = args.region

    # 如果提供了endpoint參數,則設定配置中的endpoint
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用配置好的資訊建立OSS用戶端
    client = oss.Client(cfg)

    # 初始化分區上傳請求,擷取upload_id用於後續分區上傳
    result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
    ))

    # 定義每個分區的大小為5MB
    part_size = 1 * 1024 * 1024

    # 擷取要上傳檔案的總大小
    data_size = os.path.getsize(args.file_path)

    # 初始化分區編號,從1開始
    part_number = 1

    # 儲存每個分區上傳的結果
    upload_parts = []

    # 開啟檔案以二進位模式讀取
    with open(args.file_path, 'rb') as f:
        # 遍曆檔案,按照part_size分區上傳
        for start in range(0, data_size, part_size):
            n = part_size
            if start + n > data_size:  # 處理最後一個分區可能小於part_size的情況
                n = data_size - start

            # 建立SectionReader來讀取檔案的特定部分
            reader = oss.io_utils.SectionReader(oss.io_utils.ReadAtReader(f), start, n)

            # 上傳分區
            up_result = client.upload_part(oss.UploadPartRequest(
                bucket=args.bucket,
                key=args.key,
                upload_id=result.upload_id,
                part_number=part_number,
                body=reader
            ))

            # 列印每個分區上傳的結果資訊
            print(f'status code: {up_result.status_code},'
                  f' request id: {up_result.request_id},'
                  f' part number: {part_number},'
                  f' content md5: {up_result.content_md5},'
                  f' etag: {up_result.etag},'
                  f' hash crc64: {up_result.hash_crc64},'
                  )

            # 將分區上傳結果儲存到列表中
            upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))

            # 增加分區編號
            part_number += 1

    # 對上傳的分區按照分區編號排序
    parts = sorted(upload_parts, key=lambda p: p.part_number)

    # 定義回調地址
    call_back_url = "http://www.example.com/callback"
    # 構造回調參數(callback):指定回調地址和回調請求體,使用 Base 64 編碼
    callback=base64.b64encode(str('{\"callbackUrl\":\"' + call_back_url + '\",\"callbackBody\":\"bucket=${bucket}&object=${object}&my_var_1=${x:var1}&my_var_2=${x:var2}\"}').encode()).decode()
    # 構造自訂變數(callback-var),使用 Base 64 編碼
    callback_var=base64.b64encode('{\"x:var1\":\"value1\",\"x:var2\":\"value2\"}'.encode()).decode()

    # 發送完成分區上傳請求,合并所有分區為一個完整的對象
    result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
        upload_id=result.upload_id,
        complete_multipart_upload=oss.CompleteMultipartUpload(
            parts=parts
        ),
        callback=callback,
        callback_var=callback_var
    ))

    # 下面的代碼是另一種方式,通過伺服器端列出併合並所有分區資料為一個完整的對象
    # 這種方法適用於當您不確定所有分區是否都已成功上傳時
    # Merge fragmented data into a complete Object through the server-side List method
    # result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
    #     bucket=args.bucket,
    #     key=args.key,
    #     upload_id=result.upload_id,
    #     complete_all='yes'
    # ))

    # 輸出完成分區上傳的結果資訊
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' bucket: {result.bucket},'
          f' key: {result.key},'
          f' location: {result.location},'
          f' etag: {result.etag},'
          f' encoding type: {result.encoding_type},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
    )

if __name__ == "__main__":
    main()  # 指令碼入口,當檔案被直接運行時調用main函數

分區上傳顯示進度條

import os
import argparse
import alibabacloud_oss_v2 as oss

# 建立命令列參數解析器,用於分區上傳樣本。
parser = argparse.ArgumentParser(description="multipart upload sample")

# 添加命令列參數 --region,表示儲存空間所在的地區,必需參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)

# 添加命令列參數 --bucket,表示儲存空間的名稱,必需參數
parser.add_argument('--bucket', help='The name of the bucket.', required=True)

# 添加命令列參數 --endpoint,表示其他服務可用來訪問OSS的網域名稱,非必需參數
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')

# 添加命令列參數 --key,表示對象的名稱,必需參數
parser.add_argument('--key', help='The name of the object.', required=True)

# 添加命令列參數 --file_path,表示要上傳的檔案路徑,必需參數
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

def main():
    # 解析命令列參數
    args = parser.parse_args()

    # 從環境變數中載入憑證資訊,用於身分識別驗證
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的預設配置,並設定憑證提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 設定配置中的地區資訊
    cfg.region = args.region

    # 如果提供了endpoint參數,則設定配置中的endpoint
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用配置好的資訊建立OSS用戶端
    client = oss.Client(cfg)

    # 定義一個字典變數 progress_state 用於儲存上傳進度狀態,初始值為 0
    progress_state = {'saved': 0}
    def _progress_fn(n, written, total):
        # 使用字典儲存累計寫入的位元組數,避免使用 global 變數
        progress_state['saved'] += n

        # 計算當前上傳百分比,將已寫入位元組數與總位元組數進行除法運算後取整
        rate = int(100 * (float(written) / float(total)))

        # 列印當前上傳進度,\r 表示回到行首,實現命令列中即時重新整理效果
        # end='' 表示不換行,使下一次列印覆蓋當前行
        print(f'\r上傳進度:{rate}% ', end='')

    # 初始化分區上傳請求,擷取upload_id用於後續分區上傳
    result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
    ))

    # 定義每個分區的大小為5MB
    part_size = 5 * 1024 * 1024

    # 擷取要上傳檔案的總大小
    data_size = os.path.getsize(args.file_path)

    # 初始化分區編號,從1開始
    part_number = 1

    # 儲存每個分區上傳的結果
    upload_parts = []

    # 開啟檔案以二進位模式讀取
    with open(args.file_path, 'rb') as f:
        # 遍曆檔案,按照part_size分區上傳
        for start in range(0, data_size, part_size):
            n = part_size
            if start + n > data_size:  # 處理最後一個分區可能小於part_size的情況
                n = data_size - start

            # 建立SectionReader來讀取檔案的特定部分
            reader = oss.io_utils.SectionReader(oss.io_utils.ReadAtReader(f), start, n)

            # 上傳分區
            up_result = client.upload_part(oss.UploadPartRequest(
                bucket=args.bucket,
                key=args.key,
                upload_id=result.upload_id,
                part_number=part_number,
                body=reader,
                progress_fn=_progress_fn
            ))

            # 列印每個分區上傳的結果資訊
            print(f'status code: {up_result.status_code},'
                  f' request id: {up_result.request_id},'
                  f' part number: {part_number},'
                  f' content md5: {up_result.content_md5},'
                  f' etag: {up_result.etag},'
                  f' hash crc64: {up_result.hash_crc64},'
                  )

            # 將分區上傳結果儲存到列表中
            upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))

            # 增加分區編號
            part_number += 1

    # 對上傳的分區按照分區編號排序
    parts = sorted(upload_parts, key=lambda p: p.part_number)

    # 發送完成分區上傳請求,合并所有分區為一個完整的對象
    result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
        upload_id=result.upload_id,
        complete_multipart_upload=oss.CompleteMultipartUpload(
            parts=parts
        )
    ))

    # 下面的代碼是另一種方式,通過伺服器端列出併合並所有分區資料為一個完整的對象
    # 這種方法適用於當您不確定所有分區是否都已成功上傳時
    # Merge fragmented data into a complete Object through the server-side List method
    # result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
    #     bucket=args.bucket,
    #     key=args.key,
    #     upload_id=result.upload_id,
    #     complete_all='yes'
    # ))

    # 輸出完成分區上傳的結果資訊
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' bucket: {result.bucket},'
          f' key: {result.key},'
          f' location: {result.location},'
          f' etag: {result.etag},'
          f' encoding type: {result.encoding_type},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
    )

if __name__ == "__main__":
    main()  # 指令碼入口,當檔案被直接運行時調用main函數

相關文檔