全部产品
Search
文档中心

ApsaraVideo Media Processing:Buat alur kerja media

更新时间:Jun 21, 2025

Topik ini menjelaskan cara menggunakan ApsaraVideo Media Processing (MPS) SDK untuk Python V2.0 dalam membuat alur kerja media.

Kode contoh

import os
import sys

from typing import List

from alibabacloud_mts20140618.client import Client as Mts20140618Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_mts20140618 import models as mts_20140618_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> Mts20140618Client:
        """
        Gunakan ID AccessKey dan Rahasia AccessKey Anda untuk menginisialisasi klien.
        @return: Klien
        @throws Exception
        """
        config = open_api_models.Config(
            # Diperlukan. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dikonfigurasi. ,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Diperlukan. Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. ,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        config.endpoint = f'mts.cn-hangzhou.aliyuncs.com'
        return Mts20140618Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        add_media_workflow_request = mts_20140618_models.AddMediaWorkflowRequest(
            # Nama alur kerja media.
            name='mediaworkflow-example',
            # Topologi alur kerja media.
            topology='{"Activities": {"mediaworkflow-example": {"Parameters": {"Outputs": "[{\"OutputObject\":\"examplebucket/output/{RunId}/TRANSCODE_165941222****/{FileName}\",\"TemplateId\":\"S00000001-200010\",\"TemplateName\":\"MP4-Low definition\"}]","OutputBucket": "examplebucket","OutputLocation": "oss-cn-shanghai"},"Type": "Transcode"},"Act-Start": {"Parameters": {"PipelineId": "a7d481f07d8c45da88c71853ce7d****","InputFile": "{\"Bucket\":\"example-input\",\"Location\":\"oss-cn-shanghai\",\"ObjectPrefix\":\"mps-test/input/\"}"},"Type": "Start"},"Act-Report": {"Parameters": {"PublishType": "Manual"},"Type": "Report"}},"Dependencies": {"mediaworkflow-example": ["Act-Report"],"Act-Start": ["mediaworkflow-example"],"Act-Report": []}}',
            # Mode pemicu.
            trigger_mode='OssAutoTrigger'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Tulis kode Anda sendiri untuk menampilkan respons dari Operasi API jika diperlukan.
            client.add_media_workflow_with_options(add_media_workflow_request, runtime)
        except Exception as error:
            # Tangani pengecualian dengan hati-hati dalam skenario bisnis nyata dan jangan pernah abaikan pengecualian dalam proyek Anda. Dalam contoh ini, pesan kesalahan ditampilkan hanya untuk referensi. 
            # Pesan kesalahan.
            print(error.message)
            # URL halaman diagnostik kesalahan yang sesuai.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        add_media_workflow_request = mts_20140618_models.AddMediaWorkflowRequest(
            # Nama alur kerja media.
            name='mediaworkflow-example',
            # Topologi alur kerja media.
            topology='{"Activities": {"mediaworkflow-example": {"Parameters": {"Outputs": "[{\"OutputObject\":\"examplebucket/output/{RunId}/TRANSCODE_165941222****/{FileName}\",\"TemplateId\":\"S00000001-200010\",\"TemplateName\":\"MP4-Low definition\"}]","OutputBucket": "examplebucket","OutputLocation": "oss-cn-shanghai"},"Type": "Transcode"},"Act-Start": {"Parameters": {"PipelineId": "a7d481f07d8c45da88c71853ce7d****","InputFile": "{\"Bucket\":\"example-input\",\"Location\":\"oss-cn-shanghai\",\"ObjectPrefix\":\"mps-test/input/\"}"},"Type": "Start"},"Act-Report": {"Parameters": {"PublishType": "Manual"},"Type": "Report"}},"Dependencies": {"mediaworkflow-example": ["Act-Report"],"Act-Start": ["mediaworkflow-example"],"Act-Report": []}}',
            # Mode pemicu.
            trigger_mode='OssAutoTrigger'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Tulis kode Anda sendiri untuk menampilkan respons dari Operasi API jika diperlukan.
            await client.add_media_workflow_with_options_async(add_media_workflow_request, runtime)
        except Exception as error:
            # Tangani pengecualian dengan hati-hati dalam skenario bisnis nyata dan jangan pernah abaikan pengecualian dalam proyek Anda. Dalam contoh ini, pesan kesalahan ditampilkan hanya untuk referensi. 
            # Pesan kesalahan.
            print(error.message)
            # URL halaman diagnostik kesalahan yang sesuai.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])