All Products
Search
Document Center

Realtime Compute for Apache Flink:Flink SDK for Python

Last Updated:Jan 08, 2025

This topic describes how to install and use Realtime Compute for Apache Flink SDK for Python.

Prerequisites

  • An AccessKey pair is created. For more information, see Create an AccessKey pair.

    Note

    To protect the AccessKey pair of your Alibaba Cloud account, we recommend that you create a Resource Access Management (RAM) user, grant the RAM user the permissions to access Realtime Compute for Apache Flink, and then use the AccessKey pair of the RAM user to call Realtime Compute for Apache Flink SDK.

  • A Python environment is prepared. The Python version is 3.6 or later.

  • The account that you want to use has the required access and operation permissions. For more information, see Permission management.

Install Realtime Compute for Apache Flink SDK for Python

Install Realtime Compute for Apache Flink SDK for Python by using pip.

  • To perform operations such as draft development and deployment O&M, you must call API operations for the development console of Realtime Compute for Apache Flink. For more information about how to install and use the SDK, see SDK Center.

    pip3 install alibabacloud_ververica20220718==1.2.1
  • To perform operations such as viewing workspace information, purchasing a workspace, and reconfiguring resources in a workspace, you must call API operations for the management console of Realtime Compute for Apache Flink. For more information about how to install and use the SDK, see SDK Center.

    pip3 install alibabacloud_foasconsole20211028==1.0.2

Perform online debugging and generate of SDK sample code

You can use OpenAPI Explorer to call API operations online, search for API operations, and dynamically generate SDK sample code. This simplifies the use of the API operations. You can view and download the SDK sample code of the required API operations on the Realtime Compute for Apache Flink page and the Realtime Compute Selling Console page of OpenAPI Explorer. For more information, see Get started with Alibaba Cloud Darabonba SDK for Python.

image

Examples

Note

For more information about the endpoints of Realtime Compute Selling Console, see Endpoints. For more information about the endpoints of Realtime Compute for Apache Flink, see Endpoints.

View a purchased workspace

You can query the details of a purchased Realtime Compute for Apache Flink workspace in a specified region. Required request parameters:

  • Region: the region ID, such as cn-hangzhou.

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> foasconsole20211028Client:
        """
        Use your AccessKey ID and AccessKey secret to initialize the client.
        @return: Client
        @throws Exception
        """
        # If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use Security Token Service (STS) tokens. The following code is for reference only. 
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint based on your business requirements.
        config.endpoint = f'foasconsole.aliyuncs.com'
        return foasconsole20211028Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Call the API operation and print the return value.
            response=client.describe_instances_with_options(describe_instances_request, runtime)
            print(response)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Run the sample code to obtain the return value of the API operation.
            await client.describe_instances_with_options_async(describe_instances_request, runtime)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Obtain information about all deployments

You can obtain information about all deployments in a namespace. Required request parameters:

  • workspace: the ID of the workspace. You can obtain the value of this parameter from the return value of the ResourceId parameter in View a purchased workspace. Example: adf9e5147a****.

  • namespace: the name of the namespace, such as script****-default.

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Use the AccessKey ID and AccessKey secret to initialize the client.
        @return: Client
        @throws Exception
        """
        # If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use STS tokens. The following code is for reference only. 
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint based on your business requirements.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # Call the API operation and print the return value.
            request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
            print(request)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # Run the sample code to obtain the return value of the API operation. namespace indicates the name of the namespace.
            await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Start a deployment

You can start a deployment in a namespace. Required request parameters:

  • workspace: the ID of the workspace, such as adf9e5147a****.

  • namespace: the name of the namespace, such as script****-default.

  • deploymentId: the ID of the deployment. You can obtain the value of this parameter in Obtain information about all deployments. Example: 3171d4d1-5952-4d02-b978-e762493b****.

  • kind: the type of the start offset. Valid values: NONE, LATEST_SAVEPOINT, FROM_SAVEPOINT, and LATEST_STATE.

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Use your AccessKey ID and AccessKey secret to initialize the client.
        @return: Client
        @throws Exception
        """
        # If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use STS tokens. The following code is for reference only. 
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint based on your business requirements.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Run the sample code to obtain the return value of the API operation.
            client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            # The startup strategy of the deployment.
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Run the sample code to obtain the return value of the API operation.
            await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Obtain information about all jobs

You can obtain information about all jobs in a deployment. Required request parameters:

  • workspace: the ID of the workspace, such as adf9e5147a****.

  • namespace: the name of the namespace, such as script****-default.

  • deploymentId: the ID of the deployment, which can be obtained in Obtain information about all deployments. Example: 3171d4d1-5952-4d02-b978-e762493b****.

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Use your AccessKey ID and AccessKey secret to initialize the client.
        @return: Client
        @throws Exception
        """
        # If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use STS tokens. The following code is for reference only. 
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint based on your business requirements.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Call the API operation and print the return value.
            request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
            print(request)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Run the sample code to obtain the return value of the API operation.
            await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Stop a job

You can stop a job in a deployment. Parameter description:

  • workspace: the ID of the workspace, such as adf9e5147a****.

  • namespace: the name of the namespace, such as script****-default.

  • jobId: the ID of the job, which can be obtained in Obtain information about all jobs. Example: 3171d4d1-5952-4d02-b978-e762493b****.

  • stopStrategy: the strategy that is used to stop the job. Valid values: NONE, STOP_WITH_SAVEPOINT, and STOP_WITH_DRAIN.

# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Use your AccessKey ID and AccessKey secret to initialize the client.
        @return: Client
        @throws Exception
        """
        # If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use STS tokens. The following code is for reference only. 
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. 
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. 
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint based on your business requirements.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            # The strategy that is used to stop the job.
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Run the sample code to obtain the return value of the API operation.
            client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Run the sample code to obtain the return value of the API operation.
            await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only. 
            # Print error messages.
            print(error.message)
            # Show the URL for troubleshooting.
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

References

For more information about the SDK for Java, see Flink SDK for Java (latest).