OSS form upload allows web applications to upload files directly to OSS using standard HTML forms. This topic describes how to use Python SDK V2 to generate information, such as Post signatures and Post Policies, and upload files to OSS by calling the HTTP POST method.
Precautions
The sample code in this topic uses the China (Hangzhou) region ID,
cn-hangzhou, as an example. The public endpoint is used by default. If you want to access OSS from other Alibaba Cloud products in the same region, use the internal endpoint. For more information about the mappings between regions and endpoints supported by OSS, see OSS regions and endpoints.The size of an object uploaded using form upload cannot exceed 5 GB.
Sample code
The following sample code shows the complete process of a form upload. The main steps are as follows:
Create a Post Policy: Define the validity period and conditions for the upload request. The conditions include the bucket name, signature version, credential information, request date, and request body length range.
Serialize and encode the Policy: Serialize the Policy into a JSON string and then encode it using Base64.
Generate a signature key: Use the HMAC-SHA256 algorithm to generate a signature key. The signature key includes the date, region, product, and request type.
Calculate the signature: Use the generated key to sign the Base64-encoded Policy string and then convert the signature result into a hexadecimal string.
Build the request body: Add the object key, policy, signature version, credential information, request date, and signature to the form. Then, write the data to be uploaded to the form.
Create and execute the request: Create an HTTP POST request, set the request header, and send the request. Then, check the response status code to ensure that the request is successful.
import argparse
import base64
import hashlib
import hmac
import json
import random
import requests
from datetime import datetime, timedelta
import alibabacloud_oss_v2 as oss
# Create a command-line argument parser for the POST object upload sample.
parser = argparse.ArgumentParser(description="post object sample")
# Add the command-line argument --region, which specifies the region where the bucket is located. This parameter is required.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# Add the command-line argument --bucket, which specifies the name of the bucket. This parameter is required.
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# Add the command-line argument --endpoint, which specifies the domain name that other services can use to access OSS. This parameter is optional.
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# Add the command-line argument --key, which specifies the name of the object. This parameter is required.
parser.add_argument('--key', help='The name of the object.', required=True)
def main():
# Define the content to upload.
content = "hi oss"
product = "oss" # The product identifier, which is OSS.
# Parse command-line arguments.
args = parser.parse_args()
region = args.region # The region information.
bucket_name = args.bucket # The bucket name.
object_name = args.key # The object name.
# Load credential information from environment variables for identity verification.
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
credential = credentials_provider.get_credentials()
access_key_id = credential.access_key_id # The AccessKey ID.
access_key_secret = credential.access_key_secret # The AccessKey secret.
# Obtain the current UTC time and format it.
utc_time = datetime.utcnow()
date = utc_time.strftime("%Y%m%d")
# Set the expiration time to one hour later and create a Policy map.
expiration = utc_time + timedelta(hours=1)
policy_map = {
"expiration": expiration.strftime("%Y-%m-%dT%H:%M:%S.000Z"), # The expiration time of the policy.
"conditions": [
{"bucket": bucket_name}, # Specify the bucket.
{"x-oss-signature-version": "OSS4-HMAC-SHA256"}, # Specify the signature version.
{"x-oss-credential": f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"}, # The credential information.
{"x-oss-date": utc_time.strftime("%Y%m%dT%H%M%SZ")}, # The request date.
["content-length-range", 1, 1024] # The content length range limit.
]
}
# Convert the policy to a JSON string and perform Base64 encoding.
policy = json.dumps(policy_map)
string_to_sign = base64.b64encode(policy.encode()).decode()
def build_post_body(field_dict, boundary):
"""
Build the POST request body and encode the form fields into the multipart/form-data format.
:param field_dict: The dictionary of form fields.
:param boundary: The separator string.
:return: The encoded POST request body.
"""
post_body = ''
# Encode the form fields, except for the file content and content type.
for k, v in field_dict.items():
if k != 'content' and k != 'content-type':
post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"{1}\"\r\n\r\n{2}\r\n'''.format(boundary, k, v)
# The file content must be the last form field.
post_body += '''--{0}\r\nContent-Disposition: form-data; name=\"file\"\r\n\r\n{1}'''.format(
boundary, field_dict['content'])
# Add the form field terminator.
post_body += '\r\n--{0}--\r\n'.format(boundary)
return post_body.encode('utf-8') # Return the POST request body encoded in UTF-8.
# Construct the signature key and use the HMAC-SHA256 algorithm to generate the signature.
signing_key = "aliyun_v4" + access_key_secret
h1 = hmac.new(signing_key.encode(), date.encode(), hashlib.sha256)
h1_key = h1.digest()
h2 = hmac.new(h1_key, region.encode(), hashlib.sha256)
h2_key = h2.digest()
h3 = hmac.new(h2_key, product.encode(), hashlib.sha256)
h3_key = h3.digest()
h4 = hmac.new(h3_key, "aliyun_v4_request".encode(), hashlib.sha256)
h4_key = h4.digest()
h = hmac.new(h4_key, string_to_sign.encode(), hashlib.sha256)
signature = h.hexdigest() # Convert the signature result to a hexadecimal string.
# Build the dictionary of form fields required for the POST request.
field_dict = {}
field_dict['key'] = object_name
field_dict['policy'] = string_to_sign
field_dict['x-oss-signature-version'] = "OSS4-HMAC-SHA256"
field_dict['x-oss-credential'] = f"{access_key_id}/{date}/{region}/{product}/aliyun_v4_request"
field_dict['x-oss-date'] = f"{utc_time.strftime('%Y%m%dT%H%M%SZ')}"
field_dict['x-oss-signature'] = signature
field_dict['content'] = content
# Generate a random string as the form separator.
boundary = ''.join(random.choice('0123456789') for _ in range(11))
# Use the build_post_body function to build the POST request body.
body = build_post_body(field_dict, boundary)
# Construct the destination URL for the POST request.
url = f"http://{bucket_name}.oss-{region}.aliyuncs.com"
# Set the HTTP header, specify Content-Type as multipart/form-data, and include the boundary string.
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
}
# Send the POST request to OSS.
response = requests.post(url, data=body, headers=headers)
# Determine whether the upload is successful based on the response status code.
if response.status_code // 100 != 2:
print(f"Post Object Fail, status code: {response.status_code}, reason: {response.reason}")
else:
print(f"post object done, status code: {response.status_code}, request id: {response.headers.get('X-Oss-Request-Id')}")
if __name__ == "__main__":
main() # The script entry point. The main function is called when the file is run directly.Common scenarios
References
For a complete example of form upload, see post_object.py.