All Products
Search
Document Center

Realtime Compute for Apache Flink:Python SDK

Last Updated:Mar 26, 2026

The Realtime Compute for Apache Flink Python SDK lets you manage workspaces, deployments, and job instances programmatically — without the console.

Prerequisites

Before you begin, ensure that you have:

Install the SDK

Realtime Compute for Apache Flink exposes two sets of APIs, each with its own SDK package:

API setPurposePackage
Development console APIsJob development and O&Malibabacloud_ververica20220718
Management console APIsWorkspace management, purchasing, and resource reconfigurationalibabacloud_foasconsole20211028

Install the packages you need:

# Development console SDK
pip3 install alibabacloud_ververica20220718==1.2.1

# Management console SDK
pip3 install alibabacloud_foasconsole20211028==1.0.2

For full SDK documentation and additional installation options, see Development Console SDK Center and Management Console SDK Center.

Online debugging and SDK example generation

OpenAPI Explorer lets you call APIs online, generate dynamic SDK example code, and search APIs. View and download SDK examples on the development console APIs and management console APIs pages. See Quick start for a guided walkthrough.

image

Examples

All examples read credentials from environment variables and use RuntimeOptions for runtime configuration. Each example includes a synchronous and an asynchronous version.

The examples follow a typical workflow:

  1. Call View purchased workspaces to get a workspace ID (ResourceId).

  2. Use that workspace ID to call Get the list of deployed jobs and retrieve a deployment ID.

  3. Use the deployment ID to Start a job or Get the list of job instances.

  4. Use a job instance ID to Stop a job instance.

Endpoint references:

View purchased workspaces

Queries details of a purchased Flink workspace in a specified region.

Required parameter:

ParameterDescriptionExample
regionThe region ID.cn-hangzhou
# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> foasconsole20211028Client:
        """
        Initializes the client with an AccessKey ID and an AccessKey secret.
        @return: Client
        @throws Exception
        """
        # Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint as needed.
        config.endpoint = f'foasconsole.aliyuncs.com'
        return foasconsole20211028Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Call the API and print the return value.
            response=client.describe_instances_with_options(describe_instances_request, runtime)
            print(response)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
            region='cn-hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy the code to run it, print the API return value yourself.
            await client.describe_instances_with_options_async(describe_instances_request, runtime)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Get the list of deployed jobs

Retrieves information about all deployments in a namespace.

Required parameters:

ParameterDescriptionExample
workspaceThe workspace ID. Get this value from the ResourceId field returned by View purchased workspaces.adf9e5147a****
namespaceThe project name.script****-default
# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Initializes the client with an AccessKey ID and an AccessKey secret.
        @return: Client
        @throws Exception
        """
        # Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint as needed.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # Call the API and print the return value.
            request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
            print(request)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
            workspace='workspace'
        )
        list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy the code to run it, print the API return value yourself. namespace is the project name.
            await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Start a job

Starts a deployed job in a project.

Required parameters:

ParameterDescriptionExample
workspaceThe workspace ID.adf9e5147a****
namespaceThe project name.script****-default
deploymentIdThe deployment ID. Get this value from Get the list of deployed jobs.3171d4d1-5952-4d02-b978-e762493b****
kindThe start offset type. Valid values: NONE (stateless start), LATEST_SAVEPOINT (start from the latest snapshot), FROM_SAVEPOINT (start from a specified snapshot), LATEST_STATE (start from the latest state).NONE
# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Initializes the client with an AccessKey ID and an AccessKey secret.
        @return: Client
        @throws Exception
        """
        # Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint as needed.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy the code to run it, print the API return value yourself.
            client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
            workspace='workspace'
        )
        job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
            # Job start policy
            kind='NONE'
        )
        job_start_parameters = ververica_20220718_models.JobStartParameters(
            deployment_id='deploymentId',
            restore_strategy=job_start_parameters_deployment_restore_strategy
        )
        start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
            body=job_start_parameters
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy the code to run it, print the API return value yourself.
            await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Get the list of job instances

Retrieves information about all job instances in a deployment.

Required parameters:

ParameterDescriptionExample
workspaceThe workspace ID.adf9e5147a****
namespaceThe project name.script****-default
deploymentIdThe deployment ID. Get this value from Get the list of deployed jobs.3171d4d1-5952-4d02-b978-e762493b****
# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Initializes the client with an AccessKey ID and an AccessKey secret.
        @return: Client
        @throws Exception
        """
        # Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint as needed.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # Call the API and print the return value.
            request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
            print(request)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
            workspace='workspace'
        )
        list_jobs_request = ververica_20220718_models.ListJobsRequest(
            deployment_id='deploymentId'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy the code to run it, print the API return value yourself.
            await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

Stop a job instance

Stops a running job instance.

Required parameters:

ParameterDescriptionExample
workspaceThe workspace ID.adf9e5147a****
namespaceThe project name.script****-default
jobIdThe job instance ID. Get this value from Get the list of job instances.3171d4d1-5952-4d02-b978-e762493b****
stopStrategyThe stop policy. Valid values: NONE (stop immediately), STOP_WITH_SAVEPOINT (create a snapshot and then stop), STOP_WITH_DRAIN (stop with drain).NONE
# -*- coding: utf-8 -*-
import os
import sys

from typing import List

from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client() -> ververica20220718Client:
        """
        Initializes the client with an AccessKey ID and an AccessKey secret.
        @return: Client
        @throws Exception
        """
        # Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
        config = open_api_models.Config(
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Modify the endpoint as needed.
        config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
        return ververica20220718Client(config)

    @staticmethod
    def main(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            # Job stop policy
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy the code to run it, print the API return value yourself.
            client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)

    @staticmethod
    async def main_async(
        args: List[str],
    ) -> None:
        client = Sample.create_client()
        stop_job_headers = ververica_20220718_models.StopJobHeaders(
            workspace='workspace'
        )
        stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
            stop_strategy='stopStrategy'
        )
        stop_job_request = ververica_20220718_models.StopJobRequest(
            body=stop_job_request_body
        )
        runtime = util_models.RuntimeOptions()
        try:
            # If you copy the code to run it, print the API return value yourself.
            await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
        except Exception as error:
            # This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
            # Error message
            print(error.message)
            # Troubleshooting URL
            print(error.data.get("Recommend"))
            UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
    Sample.main(sys.argv[1:])

References

For more information, see Java SDK Reference.