All Products
Search
Document Center

ApsaraVideo Media Processing:Manage MPS queues

Last Updated:Nov 15, 2024

An ApsaraVideo Media Processing (MPS) queue is a queue for processing jobs. After you submit asynchronous jobs, the jobs are queued for running based on the job priorities and the sequence in which the jobs are submitted. This topic provides examples on how to use the API operations that are encapsulated in MPS SDK V2.0 for Python to manage MPS queues, such as creating, updating, deleting, and querying an MPS queue.

Query MPS queues in a specific state

You can call the SearchPipeline operation to query MPS queues in a specific state.

import os
import sys

from typing import List

from alibabacloud_mts20140618.client import Client as Mts20140618Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_mts20140618 import models as mts_20140618_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> Mts20140618Client:
        """
        Use your AccessKey ID and AccessKey secret to initialize a client.
        @return: Client
        @throws Exception
        """
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        config.endpoint = f'mts.cn-hangzhou.aliyuncs.com'
        return Mts20140618Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        search_pipeline_request = mts_20140618_models.SearchPipelineRequest(
            # The status of the MPS queues that you want to query.
            state='Paused',
            # The number of entries per page.
            page_size=10,
            # The page number.
            page_number=1
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy and run the sample code, write your own code to display the response of the API operation.
            client.search_pipeline_with_options(search_pipeline_request, runtime)
        except Exception as error:
            # Handle exceptions with caution based on your actual business scenario and do not ignore exceptions in your project. The error messages displayed in this example are for reference only. 
            # Display error messages.
            print(error.message)
            # Provide the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        search_pipeline_request = mts_20140618_models.SearchPipelineRequest(
            # The status of the MPS queues that you want to query.
            state='Paused',
            # The number of entries per page.
            page_size=10,
            # The page number.
            page_number=1
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy and run the sample code, write your own code to display the response of the API operation.
            await client.search_pipeline_with_options_async(search_pipeline_request, runtime)
        except Exception as error:
            # Handle exceptions with caution based on your actual business scenario and do not ignore exceptions in your project. The error messages displayed in this example are for reference only. 
            # Display error messages.
            print(error.message)
            # Provide the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Query MPS queues based on queue IDs

You can call the QueryPipelineList operation to query one or more MPS queues based on the MPS queue IDs.

import os
import sys

from typing import List

from alibabacloud_mts20140618.client import Client as Mts20140618Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_mts20140618 import models as mts_20140618_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> Mts20140618Client:
        """
        Use your AccessKey ID and AccessKey secret to initialize a client.
        @return: Client
        @throws Exception
        """
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        config.endpoint = f'mts.cn-hangzhou.aliyuncs.com'
        return Mts20140618Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        query_pipeline_list_request = mts_20140618_models.QueryPipelineListRequest(
            # The IDs of the MPS queues that you want to query.
            pipeline_ids='d1ce4d3efcb549419193f50f1fcd****,72dfa5e679ab4be9a3ed9974c736****'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy and run the sample code, write your own code to display the response of the API operation.
            client.query_pipeline_list_with_options(query_pipeline_list_request, runtime)
        except Exception as error:
            # Handle exceptions with caution based on your actual business scenario and do not ignore exceptions in your project. The error messages displayed in this example are for reference only. 
            # Display error messages.
            print(error.message)
            # Provide the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        query_pipeline_list_request = mts_20140618_models.QueryPipelineListRequest(
            # The IDs of the MPS queues that you want to query.
            pipeline_ids='d1ce4d3efcb549419193f50f1fcd****,72dfa5e679ab4be9a3ed9974c736****'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy and run the sample code, write your own code to display the response of the API operation.
            await client.query_pipeline_list_with_options_async(query_pipeline_list_request, runtime)
        except Exception as error:
            # Handle exceptions with caution based on your actual business scenario and do not ignore exceptions in your project. The error messages displayed in this example are for reference only. 
            # Display error messages.
            print(error.message)
            # Provide the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Update an MPS queue

You can call the UpdatePipeline operation to update an MPS queue, such as modifying the name and status of an MPS queue. The states of an MPS queue include Active and Paused.

import os
import sys

from typing import List

from alibabacloud_mts20140618.client import Client as Mts20140618Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_mts20140618 import models as mts_20140618_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> Mts20140618Client:
        """
        Use your AccessKey ID and AccessKey secret to initialize a client.
        @return: Client
        @throws Exception
        """
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        config.endpoint = f'mts.cn-hangzhou.aliyuncs.com'
        return Mts20140618Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        update_pipeline_request = mts_20140618_models.UpdatePipelineRequest(
            # The ID of the MPS queue that you want to update.
            pipeline_id='d1ce4d3efcb549419193f50f1fcd****',
            # The new name of the MPS queue.
            name='example-pipeline-****',
            # The new status of the MPS queue.
            state='Paused',
            # The Simple Message Queue (SMQ, formerly MNS) configuration.
            notify_config='{"Topic":"example-topic-****"}',
            # The role that is assigned to the current Resource Access Management (RAM) user.
            role='AliyunMTSDefaultRole'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy and run the sample code, write your own code to display the response of the API operation.
            client.update_pipeline_with_options(update_pipeline_request, runtime)
        except Exception as error:
            # Handle exceptions with caution based on your actual business scenario and do not ignore exceptions in your project. The error messages displayed in this example are for reference only. 
            # Display error messages.
            print(error.message)
            # Provide the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        update_pipeline_request = mts_20140618_models.UpdatePipelineRequest(
            pipeline_id='d1ce4d3efcb549419193f50f1fcd****',
            name='example-pipeline-****',
            state='Paused',
            notify_config='{"Topic":"example-topic-****"}',
            role='AliyunMTSDefaultRole'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy and run the sample code, write your own code to display the response of the API operation.
            await client.update_pipeline_with_options_async(update_pipeline_request, runtime)
        except Exception as error:
            # Handle exceptions with caution based on your actual business scenario and do not ignore exceptions in your project. The error messages displayed in this example are for reference only. 
            # Display error messages.
            print(error.message)
            # Provide the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

References