This topic describes how to install and use Realtime Compute for Apache Flink SDK for Python.
Prerequisites
An AccessKey pair is created. For more information, see Create an AccessKey pair.
NoteTo protect the AccessKey pair of your Alibaba Cloud account, we recommend that you create a Resource Access Management (RAM) user, grant the RAM user the permissions to access Realtime Compute for Apache Flink, and then use the AccessKey pair of the RAM user to call Realtime Compute for Apache Flink SDK.
For more information about how to create a RAM user and obtain an AccessKey pair for the RAM user, see Create a RAM user or Obtain an AccessKey pair.
For more information about how to grant permissions to a RAM user, see Grant permissions to a RAM user.
A Python environment is prepared. The Python version is 3.6 or later.
The account that you want to use has the required access and operation permissions. For more information, see Permission management.
Install Realtime Compute for Apache Flink SDK for Python
Install Realtime Compute for Apache Flink SDK for Python by using pip.
To perform operations such as draft development and deployment O&M, you must call API operations for the development console of Realtime Compute for Apache Flink. For more information about how to install and use the SDK, see SDK Center.
pip3 install alibabacloud_ververica20220718==1.2.1To perform operations such as viewing workspace information, purchasing a workspace, and reconfiguring resources in a workspace, you must call API operations for the management console of Realtime Compute for Apache Flink. For more information about how to install and use the SDK, see SDK Center.
pip3 install alibabacloud_foasconsole20211028==1.0.2
Perform online debugging and generate of SDK sample code
You can use OpenAPI Explorer to call API operations online, search for API operations, and dynamically generate SDK sample code. This simplifies the use of the API operations. You can view and download the SDK sample code of the required API operations on the Realtime Compute for Apache Flink page and the Realtime Compute Selling Console page of OpenAPI Explorer. For more information, see Get started with Alibaba Cloud Darabonba SDK for Python.

Examples
View a purchased workspace
You can query the details of a purchased Realtime Compute for Apache Flink workspace in a specified region. Required request parameters:
Region: the region ID, such as cn-hangzhou.
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> foasconsole20211028Client:
"""
Use your AccessKey ID and AccessKey secret to initialize the client.
@return: Client
@throws Exception
"""
# If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use Security Token Service (STS) tokens. The following code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint based on your business requirements.
config.endpoint = f'foasconsole.aliyuncs.com'
return foasconsole20211028Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# Call the API operation and print the return value.
response=client.describe_instances_with_options(describe_instances_request, runtime)
print(response)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# Run the sample code to obtain the return value of the API operation.
await client.describe_instances_with_options_async(describe_instances_request, runtime)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Obtain information about all deployments
You can obtain information about all deployments in a namespace. Required request parameters:
workspace: the ID of the workspace. You can obtain the value of this parameter from the return value of the ResourceId parameter in View a purchased workspace. Example: adf9e5147a****.namespace: the name of the namespace, such as script****-default.
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Use the AccessKey ID and AccessKey secret to initialize the client.
@return: Client
@throws Exception
"""
# If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use STS tokens. The following code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint based on your business requirements.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# Call the API operation and print the return value.
request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
print(request)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# Run the sample code to obtain the return value of the API operation. namespace indicates the name of the namespace.
await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Start a deployment
You can start a deployment in a namespace. Required request parameters:
workspace: the ID of the workspace, such as adf9e5147a****.namespace: the name of the namespace, such as script****-default.deploymentId: the ID of the deployment. You can obtain the value of this parameter in Obtain information about all deployments. Example: 3171d4d1-5952-4d02-b978-e762493b****.kind: the type of the start offset. Valid values: NONE, LATEST_SAVEPOINT, FROM_SAVEPOINT, and LATEST_STATE.
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Use your AccessKey ID and AccessKey secret to initialize the client.
@return: Client
@throws Exception
"""
# If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use STS tokens. The following code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint based on your business requirements.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# Run the sample code to obtain the return value of the API operation.
client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
# The startup strategy of the deployment.
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# Run the sample code to obtain the return value of the API operation.
await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Obtain information about all jobs
You can obtain information about all jobs in a deployment. Required request parameters:
workspace: the ID of the workspace, such as adf9e5147a****.namespace: the name of the namespace, such as script****-default.deploymentId: the ID of the deployment, which can be obtained in Obtain information about all deployments. Example: 3171d4d1-5952-4d02-b978-e762493b****.
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Use your AccessKey ID and AccessKey secret to initialize the client.
@return: Client
@throws Exception
"""
# If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use STS tokens. The following code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. ,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. ,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint based on your business requirements.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# Call the API operation and print the return value.
request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
print(request)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# Run the sample code to obtain the return value of the API operation.
await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])Stop a job
You can stop a job in a deployment. Parameter description:
workspace: the ID of the workspace, such as adf9e5147a****.namespace: the name of the namespace, such as script****-default.jobId: the ID of the job, which can be obtained in Obtain information about all jobs. Example: 3171d4d1-5952-4d02-b978-e762493b****.stopStrategy: the strategy that is used to stop the job. Valid values: NONE, STOP_WITH_SAVEPOINT, and STOP_WITH_DRAIN.
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
Use your AccessKey ID and AccessKey secret to initialize the client.
@return: Client
@throws Exception
"""
# If the project code is leaked, the AccessKey pair may be leaked and security issues may occur on all resources of your account. We recommend that you use STS tokens. The following code is for reference only.
config = open_api_models.Config(
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Modify the endpoint based on your business requirements.
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
# The strategy that is used to stop the job.
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# Run the sample code to obtain the return value of the API operation.
client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# Run the sample code to obtain the return value of the API operation.
await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# Handle exceptions with caution in your actual business scenario and never ignore exceptions in your project. In this example, error messages are printed for reference only.
# Print error messages.
print(error.message)
# Show the URL for troubleshooting.
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])References
For more information about the SDK for Java, see Flink SDK for Java (latest).