The Realtime Compute for Apache Flink Python SDK lets you manage workspaces, deployments, and job instances programmatically — without the console.
Prerequisites
Before you begin, ensure that you have:
An Alibaba Cloud AccessKey. See Create an AccessKey.
ImportantDo not use the AccessKey of your root account. Create a RAM user, grant the RAM user the required Flink access permissions, and use the RAM user's AccessKey instead. See Create a RAM user or Create an AccessKey and Authorize a RAM user in the management console.
Python 3.6 or later
The required access and operation permissions. See Permission management
Install the SDK
Realtime Compute for Apache Flink exposes two sets of APIs, each with its own SDK package:
| API set | Purpose | Package |
|---|---|---|
| Development console APIs | Job development and O&M | alibabacloud_ververica20220718 |
| Management console APIs | Workspace management, purchasing, and resource reconfiguration | alibabacloud_foasconsole20211028 |
Install the packages you need:
# Development console SDK
pip3 install alibabacloud_ververica20220718==1.2.1
# Management console SDK
pip3 install alibabacloud_foasconsole20211028==1.0.2For full SDK documentation and additional installation options, see Development Console SDK Center and Management Console SDK Center.
Online debugging and SDK example generation
OpenAPI Explorer lets you call APIs online, generate dynamic SDK example code, and search APIs. View and download SDK examples on the development console APIs and management console APIs pages. See Quick start for a guided walkthrough.

Examples
All examples read credentials from environment variables and use RuntimeOptions for runtime configuration. Each example includes a synchronous and an asynchronous version.
The examples follow a typical workflow:
Call View purchased workspaces to get a workspace ID (
ResourceId).Use that workspace ID to call Get the list of deployed jobs and retrieve a deployment ID.
Use the deployment ID to Start a job or Get the list of job instances.
Use a job instance ID to Stop a job instance.
Endpoint references:
Management console endpoints: Endpoints
Development console endpoints: Service endpoints
View purchased workspaces
Queries details of a purchased Flink workspace in a specified region.
Required parameter:
| Parameter | Description | Example |
|---|---|---|
region | The region ID. | cn-hangzhou |
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> foasconsole20211028Client:
"""
Initializes the client with an AccessKey ID and an AccessKey secret.
@return: Client
@throws Exception
"""
# Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint as needed.
config.endpoint = f'foasconsole.aliyuncs.com'
return foasconsole20211028Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# Call the API and print the return value.
response=client.describe_instances_with_options(describe_instances_request, runtime)
print(response)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# If you copy the code to run it, print the API return value yourself.
await client.describe_instances_with_options_async(describe_instances_request, runtime)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Get the list of deployed jobs
Retrieves information about all deployments in a namespace.
Required parameters:
| Parameter | Description | Example |
|---|---|---|
workspace | The workspace ID. Get this value from the ResourceId field returned by View purchased workspaces. | adf9e5147a**** |
namespace | The project name. | script****-default |
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Initializes the client with an AccessKey ID and an AccessKey secret.
@return: Client
@throws Exception
"""
# Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint as needed.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# Call the API and print the return value.
request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
print(request)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# If you copy the code to run it, print the API return value yourself. namespace is the project name.
await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Start a job
Starts a deployed job in a project.
Required parameters:
| Parameter | Description | Example |
|---|---|---|
workspace | The workspace ID. | adf9e5147a**** |
namespace | The project name. | script****-default |
deploymentId | The deployment ID. Get this value from Get the list of deployed jobs. | 3171d4d1-5952-4d02-b978-e762493b**** |
kind | The start offset type. Valid values: NONE (stateless start), LATEST_SAVEPOINT (start from the latest snapshot), FROM_SAVEPOINT (start from a specified snapshot), LATEST_STATE (start from the latest state). | NONE |
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Initializes the client with an AccessKey ID and an AccessKey secret.
@return: Client
@throws Exception
"""
# Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint as needed.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# If you copy the code to run it, print the API return value yourself.
client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
# Job start policy
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# If you copy the code to run it, print the API return value yourself.
await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Get the list of job instances
Retrieves information about all job instances in a deployment.
Required parameters:
| Parameter | Description | Example |
|---|---|---|
workspace | The workspace ID. | adf9e5147a**** |
namespace | The project name. | script****-default |
deploymentId | The deployment ID. Get this value from Get the list of deployed jobs. | 3171d4d1-5952-4d02-b978-e762493b**** |
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Initializes the client with an AccessKey ID and an AccessKey secret.
@return: Client
@throws Exception
"""
# Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint as needed.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# Call the API and print the return value.
request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
print(request)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# If you copy the code to run it, print the API return value yourself.
await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Stop a job instance
Stops a running job instance.
Required parameters:
| Parameter | Description | Example |
|---|---|---|
workspace | The workspace ID. | adf9e5147a**** |
namespace | The project name. | script****-default |
jobId | The job instance ID. Get this value from Get the list of job instances. | 3171d4d1-5952-4d02-b978-e762493b**** |
stopStrategy | The stop policy. Valid values: NONE (stop immediately), STOP_WITH_SAVEPOINT (create a snapshot and then stop), STOP_WITH_DRAIN (stop with drain). | NONE |
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Initializes the client with an AccessKey ID and an AccessKey secret.
@return: Client
@throws Exception
"""
# Leaking project code may cause the AccessKey to be leaked and threaten the security of all resources under your account. Use a more secure method, such as using STS. The following sample code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint as needed.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
# Job stop policy
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# If you copy the code to run it, print the API return value yourself.
client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# If you copy the code to run it, print the API return value yourself.
await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# This is for demonstration only. Handle exceptions with caution. Do not ignore exceptions in your project.
# Error message
print(error.message)
# Troubleshooting URL
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])References
For more information, see Java SDK Reference.