OSS在完成簡單上傳(put_object和put_object_from_file)以及分區上傳(complete_multipart_upload)時可以提供回調(Callback)給應用伺服器。您只需要在發送給OSS的請求中攜帶相應的Callback參數,即可實現回調。
注意事項
本文以華東1(杭州)外網Endpoint為例。如果您希望通過與OSS同地區的其他阿里雲產品訪問OSS,請使用內網Endpoint。關於OSS支援的Region與Endpoint的對應關係,請參見OSS地區和訪問網域名稱。
本文以OSS網域名稱建立OSSClient為例。如果您希望通過自訂網域名、STS等方式建立OSSClient,請參見初始化(Python SDK V1)。
範例程式碼
簡單上傳回調
# -*- coding: utf-8 -*-
import json
import base64
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
# 從環境變數中擷取訪問憑證。運行本程式碼範例之前,請確保已設定環境變數OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
# 填寫Bucket所在地區對應的Endpoint。以華東1(杭州)為例,Endpoint填寫為https://oss-cn-hangzhou.aliyuncs.com。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
# 填寫Endpoint對應的Region資訊,例如cn-hangzhou。注意,v4簽名下,必須填寫該參數
region = "cn-hangzhou"
# yourBucketName填寫儲存空間名稱。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)
# 定義回調參數Base64編碼函數。
def encode_callback(callback_params):
cb_str = json.dumps(callback_params).strip()
return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))
# 設定上傳回調參數。
callback_params = {}
# 設定回調請求的伺服器位址,例如http://oss-demo.aliyuncs.com:23450。
callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
#(可選)設定回調請求訊息頭中Host的值,即您的伺服器配置Host的值。
#callback_params['callbackHost'] = 'yourCallbackHost'
# 指定回調請求的 Body 內容,使用預留位置動態傳遞對象資訊
callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# 指定回調請求的 Content-Type
callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
encoded_callback = encode_callback(callback_params)
# 設定發起回調請求的自訂參數,由Key和Value組成,Key必須以x:開始。
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
encoded_callback_var = encode_callback(callback_var_params)
# 上傳回調。
params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
# 填寫Object完整路徑和字串。Object完整路徑中不能包含Bucket名稱。
result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)分區上傳回調
# -*- coding: utf-8 -*-
import json
from oss2.credentials import EnvironmentVariableCredentialsProvider
import oss2
key = 'exampleobject.txt'
content = "Anything you're good at contributes to happiness."
# 從環境變數中擷取訪問憑證。運行本程式碼範例之前,請確保已設定環境變數OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
# 填寫Bucket所在地區對應的Endpoint。以華東1(杭州)為例,Endpoint填寫為https://oss-cn-hangzhou.aliyuncs.com。
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
# 填寫Endpoint對應的Region資訊,例如cn-hangzhou。注意,v4簽名下,必須填寫該參數
region = "cn-hangzhou"
# 設定回調請求的伺服器位址,例如http://oss-demo.aliyuncs.com:23450。
callback_url = 'http://oss-demo.aliyuncs.com:23450'
# yourBucketName填寫儲存空間名稱。
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)
# 準備回調參數。
callback_dict = {}
callback_dict['callbackUrl'] = callback_url
#(可選)設定回調請求訊息頭中Host的值,即您的伺服器配置Host的值。
# callback_dict['callbackHost'] = 'oss-cn-hangzhou.aliyuncs.com'
## 指定回調請求的 Body 內容,使用預留位置動態傳遞對象資訊
callback_dict['callbackBody'] = 'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}'
# 指定回調請求的 Content-Type
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
callback_var_param_json = json.dumps(callback_var_params).strip()
encoded_callback_var = oss2.utils.b64encode_as_string(callback_var_param_json)
# 回調參數是json格式,並且base64編碼
callback_param = json.dumps(callback_dict).strip()
base64_callback_body = oss2.utils.b64encode_as_string(callback_param)
# 回調參數編碼後放在header中傳給oss
headers = {'x-oss-callback': base64_callback_body, 'x-oss-callback-var': encoded_callback_var}
"""
分區上傳回調
"""
# 分區上傳回調
# 初始化上傳任務
parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
# 上傳分區
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size = len(content), part_crc = result.crc))
# 完成上傳並回調
result = bucket.complete_multipart_upload(key, upload_id, parts, headers)
# 上傳並回調成功status為200,上傳成功回調失敗status為203
if result.status == 200:
print("檔案上傳成功,回調成功 (HTTP 200)")
elif result.status == 203:
print("檔案上傳成功,但回調失敗 (HTTP 203)")
else:
print(f"上傳異常,狀態代碼: {result.status}")
# 確認檔案上傳成功
result = bucket.head_object(key)
assert 'x-oss-hash-crc64ecma' in result.headers表單上傳回調
更多PostObject資訊請參見PostObject。
# -*- coding: utf-8 -*-
import os
import time
import datetime
import json
import base64
import hmac
import hashlib
import crcmod
import requests
# 以下代碼展示了PostObject的用法。PostObject不依賴於OSS Python SDK。
# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等資訊。
# 通過環境變數擷取,或者把諸如“<你的AccessKeyId>”替換成真實的AccessKeyId等。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的訪問網域名稱>')
# 這裡以oss-demo.aliyuncs.com:23450舉例
call_back_url = "http://oss-demo.aliyuncs.com:23450"
# 例如cn-hangzhou
region = "<你的region>"
# 確認上面的參數都填寫正確了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '請設定參數:' + param
def convert_base64(input):
return base64.b64encode(input.encode(encoding='utf-8')).decode('utf-8')
def calculate_crc64(data):
"""計算檔案的MD5
:param data: 資料
:return 資料的MD5值
"""
_POLY = 0x142F0E1EBA9EA3693
_XOROUT = 0XFFFFFFFFFFFFFFFF
crc64 = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
crc64.update(data.encode())
return crc64.crcValue
def build_gmt_expired_time(expire_time):
"""產生GMT格式的請求逾時時間
:param int expire_time: 逾時時間,單位秒
:return str GMT格式的逾時時間
"""
now = int(time.time())
expire_syncpoint = now + expire_time
expire_gmt = datetime.datetime.fromtimestamp(expire_syncpoint).isoformat()
expire_gmt += 'Z'
return expire_gmt
def build_encode_policy(expired_time, condition_list):
"""產生policy
:param int expired_time: 逾時時間,單位秒
:param list condition_list: 限制條件列表
"""
policy_dict = {}
policy_dict['expiration'] = build_gmt_expired_time(expired_time)
policy_dict['conditions'] = condition_list
policy = json.dumps(policy_dict).strip()
policy_encode = base64.b64encode(policy.encode())
return policy_encode
def build_signature(access_key_secret, date):
"""產生簽名
:param str access_key_secret: access key secret
:return str 請求籤名
"""
signing_key = "aliyun_v4" + access_key_secret
h1 = hmac.new(signing_key.encode(), date.encode(), hashlib.sha256)
h1_key = h1.digest()
h2 = hmac.new(h1_key, region.encode(), hashlib.sha256)
h2_key = h2.digest()
h3 = hmac.new(h2_key, product.encode(), hashlib.sha256)
h3_key = h3.digest()
h4 = hmac.new(h3_key, "aliyun_v4_request".encode(), hashlib.sha256)
h4_key = h4.digest()
h = hmac.new(h4_key, string_to_sign.encode(), hashlib.sha256)
signature = h.hexdigest()
return signature
def bulid_callback(cb_url, cb_body, cb_body_type=None, cb_host=None):
"""產生callback字串
:param str cb_url: 回調伺服器位址,檔案上傳成功後OSS向此url發送回調請求
:param str cb_body: 發起回調請求的Content-Type,預設application/x-www-form-urlencoded
:param str cb_body_type: 發起回調時請求body
:param str cb_host: 發起回調請求時Host頭的值
:return str 編碼後的Callback
"""
callback_dict = {}
callback_dict['callbackUrl'] = cb_url
callback_dict['callbackBody'] = cb_body
if cb_body_type is None:
callback_dict['callbackBodyType'] = 'application/x-www-form-urlencoded'
else:
callback_dict['callbackBodyType'] = cb_body_type
if cb_host is not None:
callback_dict['callbackHost'] = cb_host
callback_param = json.dumps(callback_dict).strip()
base64_callback = base64.b64encode(callback_param.encode());
return base64_callback.decode()
def build_post_url(endpoint, bucket_name):
"""產生POST請求URL
:param str endpoint: endpoint
:param str bucket_name: bucket name
:return str POST請求URL
"""
if endpoint.startswith('http://'):
return endpoint.replace('http://', 'http://{0}.'.format(bucket_name))
elif endpoint.startswith('https://'):
return endpoint.replace('https://', 'https://{0}.'.format(bucket_name))
else:
return 'http://{0}.{1}'.format(bucket_name, endpoint)
def build_post_body(field_dict, boundary):
"""產生POST請求Body
:param dict field_dict: POST請求表單域
:param str boundary: 表單域的邊界字串
:return str POST請求Body
"""
post_body = ''
# 編碼錶單域
for k,v in field_dict.items():
if k != 'content' and k != 'content-type':
post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"{1}\"\r\n\r\n{2}\r\n'''.format(boundary, k, v)
# 上傳檔案的內容,必須作為最後一個表單域
post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"{1}\"\r\nContent-Type: {2}\r\n\r\n{3}'''.format(
boundary, field_dict['key'], field_dict['content-type'], field_dict['content'])
# 加上表單域結束符
post_body += '\r\n--{0}--\r\n'.format(boundary)
return post_body.encode('utf-8')
def build_post_headers(body_len, boundary, headers=None):
"""生氣POST請求Header
:param str body_len: POST請求Body長度
:param str boundary: 表單域的邊界字串
:param dict 請求Header
"""
headers = headers if headers else {}
headers['Content-Length'] = str(body_len)
headers['Content-Type'] = 'multipart/form-data; boundary={0}'.format(boundary)
return headers
def encode_callback(callback_params):
cb_str = json.dumps(callback_params).strip()
return base64.b64encode(cb_str.encode()).decode()
# POST請求表單域,注意大小寫
field_dict = {}
# object名稱
field_dict['key'] = '0303/post.txt'
# access key id
field_dict['OSSAccessKeyId'] = access_key_id
product = "oss"
utc_time = datetime.datetime.utcnow()
# 到期時間設定3600秒
expiration = '2120-01-01T12:00:00.000Z'
date = utc_time.strftime("%Y%m%d")
policy_map = {
"expiration": expiration,
"conditions": [
{"bucket": bucket_name},
{"x-oss-signature-version": "OSS4-HMAC-SHA256"},
{"x-oss-credential": f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"},
{"x-oss-date": utc_time.strftime("%Y%m%dT%H%M%SZ")},
["content-length-range", 1, 1024]
]
}
policy = json.dumps(policy_map)
print(policy)
string_to_sign = base64.b64encode(policy.encode()).decode()
field_dict['policy'] = string_to_sign
field_dict['x-oss-signature-version'] = "OSS4-HMAC-SHA256"
field_dict['x-oss-credential'] = f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"
field_dict['x-oss-date'] = f"{utc_time.strftime('%Y%m%dT%H%M%SZ')}"
# 請求籤名
field_dict['x-oss-signature'] = build_signature(access_key_secret, date)
# 臨時使用者Token,當使用臨時使用者密鑰時Token必填;非臨時使用者填空或不填
# field_dict['x-oss-security-token'] = ''
# Content-Disposition
field_dict['Content-Disposition'] = 'attachment;filename=download.txt'
# 使用者自訂meta
field_dict['x-oss-meta-uuid'] = 'uuid-xxx'
# callback,沒有回調需求不填該域
field_dict['callback'] = bulid_callback(call_back_url,
'bucket=${bucket}&object=${object}&size=${size}&mimeType=${mimeType}&my_var_1=${x:my_var1}&my_var_2=${x:my_var2}',
'application/x-www-form-urlencoded')
# callback中的自訂變數,沒有回調不填該域
field_dict['x:my_var1'] = 'value1'
field_dict['x:my_var2'] = 'value2'
# 如果上傳檔案
# with open("", r) as f:
# content = f.read()
# field_dict['content'] = content
# 上傳檔案內容
field_dict['content'] = 'a'*64
# 上傳檔案類型
field_dict['content-type'] = 'text/plain'
# 表單域的邊界字串,一般為隨機字串
boundary = '9431149156168'
# 發送POST請求
body = build_post_body(field_dict, boundary)
headers = build_post_headers(len(body), boundary)
resp = requests.post(build_post_url(endpoint, bucket_name),
data=body,
headers=headers)
# 確認請求結果
print(resp.status_code)
assert resp.status_code == 200
assert resp.headers['x-oss-hash-crc64ecma'] == str(calculate_crc64(field_dict['content']))