全部產品
Search
文件中心

Object Storage Service:上傳回調(Python SDK V1)

更新時間:Aug 08, 2025

OSS在完成簡單上傳(put_object和put_object_from_file)以及分區上傳(complete_multipart_upload)時可以提供回調(Callback)給應用伺服器。您只需要在發送給OSS的請求中攜帶相應的Callback參數,即可實現回調。

注意事項

  • 本文以華東1(杭州)外網Endpoint為例。如果您希望通過與OSS同地區的其他阿里雲產品訪問OSS,請使用內網Endpoint。關於OSS支援的Region與Endpoint的對應關係,請參見OSS地區和訪問網域名稱

  • 本文以OSS網域名稱建立OSSClient為例。如果您希望通過自訂網域名、STS等方式建立OSSClient,請參見初始化(Python SDK V1)

範例程式碼

簡單上傳回調

# -*- coding: utf-8 -*-
import json
import base64
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 從環境變數中擷取訪問憑證。運行本程式碼範例之前,請確保已設定環境變數OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# 填寫Bucket所在地區對應的Endpoint。以華東1(杭州)為例,Endpoint填寫為https://oss-cn-hangzhou.aliyuncs.com。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"

# 填寫Endpoint對應的Region資訊,例如cn-hangzhou。注意,v4簽名下,必須填寫該參數
region = "cn-hangzhou"

# yourBucketName填寫儲存空間名稱。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)

# 定義回調參數Base64編碼函數。
def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))

# 設定上傳回調參數。
callback_params = {}
# 設定回調請求的伺服器位址,例如http://oss-demo.aliyuncs.com:23450。
callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
#(可選)設定回調請求訊息頭中Host的值,即您的伺服器配置Host的值。
#callback_params['callbackHost'] = 'yourCallbackHost'
# 指定回調請求的 Body 內容,使用預留位置動態傳遞對象資訊
callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# 指定回調請求的 Content-Type
callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
encoded_callback = encode_callback(callback_params)
# 設定發起回調請求的自訂參數,由Key和Value組成,Key必須以x:開始。
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
encoded_callback_var = encode_callback(callback_var_params)

# 上傳回調。
params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
# 填寫Object完整路徑和字串。Object完整路徑中不能包含Bucket名稱。
result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)

分區上傳回調

# -*- coding: utf-8 -*-

import json
from oss2.credentials import EnvironmentVariableCredentialsProvider
import oss2

key = 'exampleobject.txt'
content = "Anything you're good at contributes to happiness."

# 從環境變數中擷取訪問憑證。運行本程式碼範例之前,請確保已設定環境變數OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# 填寫Bucket所在地區對應的Endpoint。以華東1(杭州)為例,Endpoint填寫為https://oss-cn-hangzhou.aliyuncs.com。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"

# 填寫Endpoint對應的Region資訊,例如cn-hangzhou。注意,v4簽名下,必須填寫該參數
region = "cn-hangzhou"

# 設定回調請求的伺服器位址,例如http://oss-demo.aliyuncs.com:23450。
callback_url = 'http://oss-demo.aliyuncs.com:23450'

# yourBucketName填寫儲存空間名稱。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)


# 準備回調參數。
callback_dict = {}
callback_dict['callbackUrl'] = callback_url

#(可選)設定回調請求訊息頭中Host的值,即您的伺服器配置Host的值。
# callback_dict['callbackHost'] = 'oss-cn-hangzhou.aliyuncs.com'

## 指定回調請求的 Body 內容,使用預留位置動態傳遞對象資訊
callback_dict['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# 指定回調請求的 Content-Type
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'

callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
callback_var_param_json = json.dumps(callback_var_params).strip()
encoded_callback_var = oss2.utils.b64encode_as_string(callback_var_param_json)

# 回調參數是json格式,並且base64編碼
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = oss2.utils.b64encode_as_string(callback_param)
# 回調參數編碼後放在header中傳給oss
headers = {'x-oss-callback': base64_callback_body, 'x-oss-callback-var': encoded_callback_var}


"""
分區上傳回調
"""

# 分區上傳回調
# 初始化上傳任務
parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
# 上傳分區
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size = len(content), part_crc = result.crc))
# 完成上傳並回調
result = bucket.complete_multipart_upload(key, upload_id, parts, headers)

# 上傳並回調成功status為200,上傳成功回調失敗status為203
if result.status == 200:
    print("檔案上傳成功,回調成功 (HTTP 200)")
elif result.status == 203:
    print("檔案上傳成功,但回調失敗 (HTTP 203)")
else:
    print(f"上傳異常,狀態代碼: {result.status}")


# 確認檔案上傳成功
result = bucket.head_object(key)
assert 'x-oss-hash-crc64ecma' in result.headers

表單上傳回調

更多PostObject資訊請參見PostObject

# -*- coding: utf-8 -*-
import os
import time
import datetime
import json
import base64
import hmac
import hashlib
import crcmod
import requests


# 以下代碼展示了PostObject的用法。PostObject不依賴於OSS Python SDK。

# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等資訊。
# 通過環境變數擷取,或者把諸如“<你的AccessKeyId>”替換成真實的AccessKeyId等。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的訪問網域名稱>')
# 這裡以oss-demo.aliyuncs.com:23450舉例
call_back_url = "http://oss-demo.aliyuncs.com:23450"
# 例如cn-hangzhou
region = "<你的region>"


# 確認上面的參數都填寫正確了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
    assert '<' not in param, '請設定參數:' + param

def convert_base64(input):
    return base64.b64encode(input.encode(encoding='utf-8')).decode('utf-8')

def calculate_crc64(data):
    """計算檔案的MD5
    :param data: 資料
    :return 資料的MD5值
    """
    _POLY = 0x142F0E1EBA9EA3693
    _XOROUT = 0XFFFFFFFFFFFFFFFF

    crc64 = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
    crc64.update(data.encode())

    return crc64.crcValue

def build_gmt_expired_time(expire_time):
    """產生GMT格式的請求逾時時間
    :param int expire_time: 逾時時間,單位秒
    :return str GMT格式的逾時時間
    """
    now = int(time.time())
    expire_syncpoint  = now + expire_time

    expire_gmt = datetime.datetime.fromtimestamp(expire_syncpoint).isoformat()
    expire_gmt += 'Z'

    return expire_gmt

def build_encode_policy(expired_time, condition_list):
    """產生policy
    :param int expired_time: 逾時時間,單位秒
    :param list condition_list: 限制條件列表
    """
    policy_dict = {}
    policy_dict['expiration'] = build_gmt_expired_time(expired_time)
    policy_dict['conditions'] = condition_list

    policy = json.dumps(policy_dict).strip()
    policy_encode = base64.b64encode(policy.encode())

    return policy_encode

def build_signature(access_key_secret, date):
    """產生簽名
    :param str access_key_secret: access key secret
    :return str 請求籤名
    """

    signing_key = "aliyun_v4" + access_key_secret
    h1 = hmac.new(signing_key.encode(), date.encode(), hashlib.sha256)
    h1_key = h1.digest()
    h2 = hmac.new(h1_key, region.encode(), hashlib.sha256)
    h2_key = h2.digest()
    h3 = hmac.new(h2_key, product.encode(), hashlib.sha256)
    h3_key = h3.digest()
    h4 = hmac.new(h3_key, "aliyun_v4_request".encode(), hashlib.sha256)
    h4_key = h4.digest()

    h = hmac.new(h4_key, string_to_sign.encode(), hashlib.sha256)
    signature = h.hexdigest()

    return signature

def bulid_callback(cb_url, cb_body, cb_body_type=None, cb_host=None):
    """產生callback字串
    :param str cb_url: 回調伺服器位址,檔案上傳成功後OSS向此url發送回調請求
    :param str cb_body: 發起回調請求的Content-Type,預設application/x-www-form-urlencoded
    :param str cb_body_type: 發起回調時請求body
    :param str cb_host: 發起回調請求時Host頭的值
    :return str 編碼後的Callback
    """
    callback_dict = {}

    callback_dict['callbackUrl'] = cb_url

    callback_dict['callbackBody'] = cb_body
    if cb_body_type is None:
        callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
    else:
        callback_dict['callbackBodyType'] = cb_body_type

    if cb_host is not None:
        callback_dict['callbackHost'] = cb_host

    callback_param = json.dumps(callback_dict).strip()
    base64_callback = base64.b64encode(callback_param.encode());

    return base64_callback.decode()

def build_post_url(endpoint, bucket_name):
    """產生POST請求URL
    :param str endpoint: endpoint
    :param str bucket_name: bucket name
    :return str POST請求URL
    """
    if endpoint.startswith('http://'):
        return endpoint.replace('http://', 'http://{0}.'.format(bucket_name))
    elif endpoint.startswith('https://'):
        return endpoint.replace('https://', 'https://{0}.'.format(bucket_name))
    else:
        return 'http://{0}.{1}'.format(bucket_name, endpoint)

def build_post_body(field_dict, boundary):
    """產生POST請求Body
    :param dict field_dict: POST請求表單域
    :param str boundary: 表單域的邊界字串
    :return str POST請求Body
    """
    post_body = ''

    # 編碼錶單域
    for k,v in field_dict.items():
        if k != 'content' and k != 'content-type':
            post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"{1}\"\r\n\r\n{2}\r\n'''.format(boundary, k, v)

    # 上傳檔案的內容,必須作為最後一個表單域
    post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"{1}\"\r\nContent-Type: {2}\r\n\r\n{3}'''.format(
        boundary, field_dict['key'], field_dict['content-type'], field_dict['content'])

    # 加上表單域結束符
    post_body += '\r\n--{0}--\r\n'.format(boundary)

    return post_body.encode('utf-8')

def build_post_headers(body_len, boundary, headers=None):
    """生氣POST請求Header
    :param str body_len: POST請求Body長度
    :param str boundary: 表單域的邊界字串
    :param dict 請求Header
    """
    headers = headers if headers else {}
    headers['Content-Length'] = str(body_len)
    headers['Content-Type'] = 'multipart/form-data; boundary={0}'.format(boundary)

    return headers

def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return base64.b64encode(cb_str.encode()).decode()

# POST請求表單域,注意大小寫
field_dict = {}
# object名稱
field_dict['key'] = '0303/post.txt'
# access key id
field_dict['OSSAccessKeyId'] = access_key_id

product = "oss"


utc_time = datetime.datetime.utcnow()
# 到期時間設定3600秒
expiration = '2120-01-01T12:00:00.000Z'
date = utc_time.strftime("%Y%m%d")
policy_map = {
    "expiration": expiration,
    "conditions": [
        {"bucket": bucket_name},
        {"x-oss-signature-version": "OSS4-HMAC-SHA256"},
        {"x-oss-credential": f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"},
        {"x-oss-date": utc_time.strftime("%Y%m%dT%H%M%SZ")},
        ["content-length-range", 1, 1024]
    ]
}
policy = json.dumps(policy_map)
print(policy)
string_to_sign = base64.b64encode(policy.encode()).decode()

field_dict['policy'] = string_to_sign

field_dict['x-oss-signature-version'] = "OSS4-HMAC-SHA256"
field_dict['x-oss-credential'] = f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"
field_dict['x-oss-date'] = f"{utc_time.strftime('%Y%m%dT%H%M%SZ')}"
# 請求籤名
field_dict['x-oss-signature'] = build_signature(access_key_secret, date)



# 臨時使用者Token,當使用臨時使用者密鑰時Token必填;非臨時使用者填空或不填
# field_dict['x-oss-security-token'] = ''
# Content-Disposition
field_dict['Content-Disposition'] = 'attachment;filename=download.txt'
# 使用者自訂meta
field_dict['x-oss-meta-uuid'] = 'uuid-xxx'
# callback,沒有回調需求不填該域
field_dict['callback'] = bulid_callback(call_back_url,
                                        'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}',
                                        'application/x-www-form-urlencoded')
# callback中的自訂變數,沒有回調不填該域
field_dict['x:my_var1'] = 'value1'
field_dict['x:my_var2'] = 'value2'

# 如果上傳檔案
# with open("", r) as f:
#     content = f.read()
# field_dict['content'] = content

# 上傳檔案內容
field_dict['content'] = 'a'*64
# 上傳檔案類型
field_dict['content-type'] = 'text/plain'

# 表單域的邊界字串,一般為隨機字串
boundary = '9431149156168'

# 發送POST請求
body = build_post_body(field_dict, boundary)
headers = build_post_headers(len(body), boundary)

resp = requests.post(build_post_url(endpoint, bucket_name),
                     data=body,
                     headers=headers)

# 確認請求結果
print(resp.status_code)
assert resp.status_code == 200
assert resp.headers['x-oss-hash-crc64ecma'] == str(calculate_crc64(field_dict['content']))

相關文檔

  • 關於上傳回調的完整範例程式碼,請參見GitHub樣本

  • 關於上傳回調的API介面說明,請參見Callback