Background information
You can create a workflow for HTTP-Live-Streaming (HLS) encryption by calling an API operation. For more information about the complete procedure from creating a workflow for HLS encryption to playing an encrypted video, see Encrypt a video and play the encrypted video. You can also use ApsaraVideo Media Processing (MPS) SDK for Python. For more information about how to install the SDK, see Installation.
Sample code
import json
from aliyunsdkmts.request.v20140618 import AddMediaWorkflowRequest
from aliyunsdkcore import client
REGION_ID = '<region>'
ACCESS_KEY_ID = '<accessKeyId>'
ACCESS_KEY_SECRET = '<accessKeySecret>'
PIPELINE_ID = "<PipelineId>"
TEMPLATE_ID = "S00000001-100020" # The ID of the transcoding template that uses the M3U8 format. Set this parameter as needed.
OSS_LOCATION = "<OssLocation>"
INPUT_BUCKET = "<InputBucket>"
INPUT_PATH = "<InputPath>" # The input path. Example: HLS-Encryption.
OUTPUT_BUCKET = "<OutputBucket>"
ENCRYPTION_TYPE = "hls-aes-128"
HLS_KEY_URI = "<URI of the decryption key>" # Example: http://example.aliyundoc.com.
ACT_START = "Act-Start"
ACT_ENCRYPTION = "Act-HLS-Encryption"
ACT_REPORT = "Act-Report"
def addMediaWorkflow():
global client
client = client.AcsClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET, REGION_ID)
request = AddMediaWorkflowRequest.AddMediaWorkflowRequest()
request.set_Topology(buildWorkflowTopology())
request.setName("Workflow py for HLS encryption")
response = client.do_action_with_exception(request)
print json.loads(response)
def buildWorkflowTopology():
workflow = {}
workflow["Activities"] = buildActivities()
workflow["Dependencies"] = buildDependencies()
print json.dumps(workflow)
return json.dumps(workflow)
def buildActivities():
activities = {}
activities[ACT_START] = buildStartActivity()
activities[ACT_ENCRYPTION] = buildTranscodeActivity()
activities[ACT_REPORT] = buildReportActivity()
return activities
def buildStartActivity():
startActivity = {}
startActivity["Name"] = ACT_START
startActivity["Type"] = "Start"
startActivity["Parameters"] = buildStartParameters()
return startActivity
def buildStartParameters():
startParameters = {}
startParameters["PipelineId"] = PIPELINE_ID
startParameters["InputFile"] = buildInputFile();
return startParameters
def buildInputFile():
inputFile = {}
inputFile["Bucket"] = INPUT_BUCKET
inputFile["Location"] = OSS_LOCATION
inputFile["ObjectPrefix"] = INPUT_PATH
return inputFile
def buildTranscodeActivity():
transcodeActivity = {}
transcodeActivity["Name"] = ACT_ENCRYPTION
transcodeActivity["Type"] = "Transcode"
transcodeActivity["Parameters"] = buildTranscodeParameters()
return transcodeActivity
def buildTranscodeParameters():
transcodeParameters = {}
transcodeParameters["OutputBucket"] = OUTPUT_BUCKET
transcodeParameters["OutputLocation"] = OSS_LOCATION
transcodeParameters["Outputs"] = buildOutputsConfig()
return transcodeParameters
def buildOutputsConfig():
outputs = []
output = {}
output["ObjectRegex"] = ACT_ENCRYPTION + "/{RunId}/{FileName}"
output["TemplateId"] = TEMPLATE_ID
output["Encryption"] = buildEncryption()
outputs.append(output)
return outputs
def buildEncryption():
encryption = {}
encryption["Type"] = ENCRYPTION_TYPE
encryption["KeyUri"] = HLS_KEY_URI
return encryption
def buildReportActivity():
reportActivity = {}
reportActivity["Name"] = ACT_REPORT
reportActivity["Parameters"] = buildReportParameters()
reportActivity["Type"] = "Report"
return reportActivity
def buildReportParameters():
parameters = {}
parameters["PublishType"] = "Auto"
return parameters
def buildDependencies():
dependencies ={}
subActivityOfStart = [ACT_ENCRYPTION]
dependencies[ACT_START] = subActivityOfStart
subActivityOfTranscode = [ACT_REPORT]
dependencies[ACT_ENCRYPTION] = subActivityOfTranscode
dependencies[ACT_REPORT] = []
return dependencies
if __name__ == "__main__":
addMediaWorkflow()