All Products
Search
Document Center

Object Storage Service:Upload callbacks

Last Updated:Mar 19, 2025

OSS can send a callback to the application server when Object Storage Service (OSS) completes a multipart upload task by using complete_multipart_upload and a simple upload task by using put_object or put_object_from_file. To configure upload callbacks, you need to only specify the related callback parameters in the upload request that is sent to OSS.

Usage notes

  • In this topic, the public endpoint of the China (Hangzhou) region is used. If you want to access OSS from other Alibaba Cloud services in the same region as OSS, use an internal endpoint. For more information about OSS regions and endpoints, see Regions and endpoints.

  • In this topic, an OSSClient instance is created by using an OSS endpoint. If you want to create an OSSClient instance by using custom domain names or Security Token Service (STS), see Initialization.

Examples

Configure an upload callback for a simple upload request

# -*- coding: utf-8 -*-
import json
import base64
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# Obtain access credentials from environment variables. Before you run the sample code, make sure that the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET environment variables are configured. 
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# Specify the endpoint of the region in which the bucket is located. For example, if the bucket is located in the China (Hangzhou) region, set the endpoint to https://oss-cn-hangzhou.aliyuncs.com. 
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"

# Specify the region in which the bucket is located. Example: cn-hangzhou. This parameter is required if you use the V4 signature algorithm.
region = "cn-hangzhou"

# Specify the name of the bucket. 
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)

# Specify the function that is used to encode the callback parameters in Base64. 
def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))

# Specify the upload callback parameters. 
callback_params = {}
# Specify the address of the callback server that receives the callback request. Example: http://oss-demo.aliyuncs.com:23450. 
callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
# (Optional) Specify the Host field included in the callback request header. 
#callback_params['callbackHost'] = 'yourCallbackHost'
# Specify the body field that is included in the callback request. Use placeholders to pass object information.
callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# Specify the content type of the callback request.
callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
encoded_callback = encode_callback(callback_params)
# Specify custom parameters for the callback request. Each custom parameter consists of a key and a value. The key must start with x:. 
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
encoded_callback_var = encode_callback(callback_var_params)

# Specify upload callback parameters. 
params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
# Specify the full path of the object and the string that you want to upload. Do not include the bucket name in the full path. 
result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)

Configure an upload callback for a multipart upload request

# -*- coding: utf-8 -*-

import json
from oss2.credentials import EnvironmentVariableCredentialsProvider
import oss2

key = 'exampleobject.txt'
content = "Anything you're good at contributes to happiness."

# Obtain access credentials from environment variables. Before you run the sample code, make sure that the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET environment variables are configured. 
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# Specify the endpoint of the region in which the bucket is located. For example, if the bucket is located in the China (Hangzhou) region, set the endpoint to https://oss-cn-hangzhou.aliyuncs.com. 
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"

# Specify the region in which the bucket is located. Example: cn-hangzhou. This parameter is required if you use the V4 signature algorithm.
region = "cn-hangzhou"

# Specify the address of the callback server that receives the callback request. Example: http://oss-demo.aliyuncs.com:23450. 
callback_url = 'http://oss-demo.aliyuncs.com:23450'

# Specify the name of the bucket. 
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)


# Refer to https://www.alibabacloud.com/help/en/oss/developer-reference/callback to prepare the callback parameters.
callback_dict = {}
callback_dict['callbackUrl'] = callback_url

# (Optional) Specify the Host field included in the callback request header. 
# callback_dict['callbackHost'] = 'oss-cn-hangzhou.aliyuncs.com'

## Specify the body field that is included in the callback request. Use placeholders to pass object information.
callback_dict['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# Specify the content type of the callback request.
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'

callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
callback_var_param_json = json.dumps(callback_var_params).strip()
encoded_callback_var = oss2.utils.b64encode_as_string(callback_var_param_json)

# Specify the callback parameters in the JSON format and encode the callback parameters in Base64.
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = oss2.utils.b64encode_as_string(callback_param)
# Add the encoded callback parameters to the end of the header and pass the callback parameters to OSS.
headers = {'x-oss-callback': base64_callback_body, 'x-oss-callback-var': encoded_callback_var}


"""
Configure an upload callback for a multipart upload request
"""

# Configure an upload callback for a multipart upload request.
# Initialize the multipart upload task.
parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
# Upload parts.
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size = len(content), part_crc = result.crc))
# Complete the multipart upload task and send an upload callback.
result = bucket.complete_multipart_upload(key, upload_id, parts, headers)

# If the multipart upload task and the upload callback request are successful, HTTP status code 200 is returned. If the multipart upload task is complete but the upload callback request fails, HTTP status code 203 is returned.
if result.status == 200:
    print("The object is uploaded, and the callback request is successful (HTTP 200)")
elif result.status == 203:
    print("The object is uploaded, but the callback request fails (HTTP 203)")
else:
    print(f "Upload exception, HTTP status code: {result.status}")


# Check whether the object is uploaded.
result = bucket.head_object(key)
assert 'x-oss-hash-crc64ecma' in result.headers

Configure an upload callback for a form upload request

# -*- coding: utf-8 -*-
import os
import time
import datetime
import json
import base64
import hmac
import hashlib
import crcmod
import requests


The following sample code provides an example on how to Configure an upload callback for a PostObject request. The implementation is independent of OSS SDK for Python. 

# Obtain more information about the API operation at https://www.alibabacloud.com/help/en/oss/developer-reference/postobject.
 

# Initialize OSS information, such as the AccessKey ID, AccessKey secret, and endpoint. 
# Obtain the information from the environment variables or replace variables such as <yourAccessKeyId> with actual values. 
access_key_id=os.getenv ('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>')
access_key_secret=os.getenv ('OSS_TEST_ACCESS_KEY_SECRET, '<yourAccessKeySecret>')
bucket_name=os.getenv ('OSS_TEST_BUCKET, '<you bucket>')
endpoint=os.getenv ('OSS_TEST_ENDPOINT', '<yourendpoint>')
# In this example, oss-demo.aliyuncs.com:23450 is used.
call_back_url = "http://oss-demo.aliyuncs.com:23450"
# Specify the region in which the bucket is located. In this example, cn-hangzhou is used.
region = "<yourregion>"


# Make sure that the preceding parameters are correctly specified.
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
    assert '<'not in param, ' Please specify the parameters:' + param

def convert_base64(input):
    return base64.b64encode(input.encode(encoding='utf-8')).decode('utf-8')

def calculate_crc64(data):
    """ Calculate the MD5 hash of the local file.
    : param data: data
    : The MD5 hash of the local file is returned.
    """
    _POLY = 0x142F0E1EBA9EA3693
    _XOROUT = 0XFFFFFFFFFFFFFFFF

    crc64 = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
    crc64.update(data.encode())

    return crc64.crcValue

def build_gmt_expired_time(expire_time):
    """ Generate a GMT time string that specifies the expiration time of the request.
    : param int expire_time: The timeout period of the request. Unit: seconds.
    :return str: The GMT time string that specifies the expiration time of the request.
    """
    now = int(time.time())
    expire_syncpoint  = now + expire_time

    expire_gmt = datetime.datetime.fromtimestamp(expire_syncpoint).isoformat()
    expire_gmt += 'Z'

    return expire_gmt

def build_encode_policy(expired_time, condition_list):
    """ Generate policy.
    :param int expired_time: The timeout period of the policy. Unit: seconds.
    :param list condition_list: The conditions of the policy.
    """
    policy_dict = {}
    policy_dict['expiration'] = build_gmt_expired_time(expired_time)
    policy_dict['conditions'] = condition_list

    policy = json.dumps(policy_dict).strip()
    policy_encode = base64.b64encode(policy.encode())

    return policy_encode

def build_signature(access_key_secret, date):
    """ Generate a signature.
    :param str access_key_secret: access key secret
    :return str: Request for the signature.
    """

    signing_key = "aliyun_v4" + access_key_secret
    h1 = hmac.new(signing_key.encode(), date.encode(), hashlib.sha256)
    h1_key = h1.digest()
    h2 = hmac.new(h1_key, region.encode(), hashlib.sha256)
    h2_key = h2.digest()
    h3 = hmac.new(h2_key, product.encode(), hashlib.sha256)
    h3_key = h3.digest()
    h4 = hmac.new(h3_key, "aliyun_v4_request".encode(), hashlib.sha256)
    h4_key = h4.digest()

    h = hmac.new(h4_key, string_to_sign.encode(), hashlib.sha256)
    signature = h.hexdigest()

    return signature

def bulid_callback(cb_url, cb_body, cb_body_type=None, cb_host=None):
    """ Generate a callback string.
    :param str cb_url: Specify the URL of the callback server. OSS sends a callback request to the URL after the object is uploaded.
    :param str cb_body: Specify the content type of the callback request. Default value: application/x-www-form-urlencoded.
    :param str cb_body_type: Specify the request body of the callback request.
    :param str cb_host: Specify the value of the Host header in the callback request.
    :return str: The encoded callback string.
    """
    callback_dict = {}

    callback_dict['callbackUrl'] = cb_url

    callback_dict['callbackBody'] = cb_body
    if cb_body_type is None:
        callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
    else:
        callback_dict['callbackBodyType'] = cb_body_type

    if cb_host is not None:
        callback_dict['callbackHost'] = cb_host

    callback_param = json.dumps(callback_dict).strip()
    base64_callback = base64.b64encode(callback_param.encode());

    return base64_callback.decode()

def build_post_url(endpoint, bucket_name):
    """ Generate a URL for the PostObject request.
    :param str endpoint: endpoint
    :param str bucket_name: bucket name
    :return str: The URL of the PostObject request.
    """
    if endpoint.startswith('http://'):
        return endpoint.replace('http://', 'http://{0}.'.format(bucket_name))
    elif endpoint.startswith('https://'):
        return endpoint.replace('https://', 'https://{0}.'.format(bucket_name))
    else:
        return 'http://{0}.{1}'.format(bucket_name, endpoint)

def build_post_body(field_dict, boundary):
    """ Specify the body of the PostObject request.
    :param dict field_dict: Specify the form field of the PostObject request.
    :param str boundary: Specify the boundary string of the form field.
    :return str: The request body of the PostObject request.
    """
    post_body = ''

    # Encode the form field.
    for k,v in field_dict.items():
        if k != 'content' and k != 'content-type':
            post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"{1}\"\r\n\r\n{2}\r\n'''.format(boundary, k, v)

    # Specify the content of the object that you want to upload. It must be the last field in the form.
    post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"{1}\"\r\nContent-Type: {2}\r\n\r\n{3}'''.format(
        boundary, field_dict['key'], field_dict['content-type'], field_dict['content'])

    # Add terminators to the form field.
    post_body += '\r\n--{0}--\r\n'.format(boundary)

    return post_body.encode('utf-8')

def build_post_headers(body_len, boundary, headers=None):
    """ Specify the headers in the PostObject request.
    :param str body_len: Specify the length of the PostObject request body.
    :param str boundary: Specify the boundary string of the form field.
    # headers=dict(), # Specify the request headers.
    """
    headers = headers if headers else {}
    headers['Content-Length'] = str(body_len)
    headers['Content-Type'] = 'multipart/form-data; boundary={0}'.format(boundary)

    return headers

def encode_callback(callback_params):
    cb_str = json.dumps(callback_params).strip()
    return base64.b64encode(cb_str.encode()).decode()

# Specify the form field in the PostObject request. The form field is case-sensitive.
field_dict = {}
# Specify the name of the object.
field_dict['key'] = '0303/post.txt'
# access key id
field_dict['OSSAccessKeyId'] = access_key_id

product = "oss"


utc_time = datetime.datetime.utcnow()
# Set the validity period to 3600 seconds.
expiration = '2120-01-01T12:00:00.000Z'
date = utc_time.strftime("%Y%m%d")
policy_map = {
    "expiration": expiration,
    "conditions": [
        {"bucket": bucket_name},
        {"x-oss-signature-version": "OSS4-HMAC-SHA256"},
        {"x-oss-credential": f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"},
        {"x-oss-date": utc_time.strftime("%Y%m%dT%H%M%SZ")},
        ["content-length-range", 1, 1024]
    ]
}
policy = json.dumps(policy_map)
print(policy)
string_to_sign = base64.b64encode(policy.encode()).decode()

field_dict['policy'] = string_to_sign

field_dict['x-oss-signature-version'] = "OSS4-HMAC-SHA256"
field_dict['x-oss-credential'] = f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"
field_dict['x-oss-date'] = f"{utc_time.strftime('%Y%m%dT%H%M%SZ')}"
# Request a signature.
field_dict['x-oss-signature'] = build_signature(access_key_secret, date)



# Specify the security token for a temporary user. This parameter is required when a temporary user uses temporary credentials. You can leave this parameter empty or you do not need to specify this parameter for a non-temporary user.
# field_dict['x-oss-security-token'] = ''
# Content-Disposition
field_dict['Content-Disposition'] = 'attachment;filename=download.txt'
# Specify user metadata.
field_dict['x-oss-meta-uuid'] = 'uuid-xxx'
# Specify the callback parameter. If you do not need a callback, do not specify the parameter.
field_dict['callback'] = bulid_callback(call_back_url,
                                        'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}',
                                        'application/x-www-form-urlencoded')
# Specify the custom variables in the callback. If you do not need a callback, do not specify the parameter.
field_dict['x:my_var1'] = 'value1'
field_dict['x:my_var2'] = 'value2'

# Upload an object.
# with open("", r) as f:
#     content = f.read()
# field_dict['content'] = content

# Upload the object content.
field_dict['content'] = 'a'*64
# Specify the type of the uploaded object.
field_dict['content-type'] = 'text/plain'

# Specify the boundary string that is randomly generated by the form.
boundary = '9431149156168'

# Send the PostObject request.
body = build_post_body(field_dict, boundary)
headers = build_post_headers(len(body), boundary)

resp = requests.post(build_post_url(endpoint, bucket_name),
                     data=body,
                     headers=headers)

# Verify the request result.
print(resp.status_code)
assert resp.status_code == 200
assert resp.headers['x-oss-hash-crc64ecma'] == str(calculate_crc64(field_dict['content']))

References

  • For the complete sample code for upload callbacks, visit GitHub.

  • For more information about the API operation that you can call to configure upload callbacks, see Callback.